scheduler.h

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 /**
00020  * \page Scheduler Networking Scheduler API
00021  *
00022  * \section intro Introduction
00023  *
00024  * HydraNode Networking Scheduler is divided into three major parts:
00025  * - The user frontend (SSocket class)
00026  * - Translation unit (Scheduler class)
00027  * - The backend (SchedBase class)
00028  *
00029  * \section front User frontend
00030  *
00031  * SSocket template class provides a generic typesafe frontend to users of the
00032  * API. Providing accessors for all possible socket operations, it forwards all
00033  * calls to the <b>Translation Unit</b>. No operations are performed in the
00034  * frontend. Due to the template design of the frontend, we can have all
00035  * possible socket functions declared there, but they won't get instanciated
00036  * until they are actually used. As such, attempts to perform operations on
00037  * sockets that do not have the specified operation defined will result in
00038  * compile-time errors.
00039  *
00040  * The underlying implementation for SSocket class is chosen based on the first
00041  * two template parameters by default, however can be overridden by clients
00042  * wishing to define their own underlying socket implementations.
00043  *
00044  * The last parameter to SSocket template class is the scheduler class to use.
00045  * This parameter is provided here merely for completeness, and is not allowed
00046  * (nor recommended) to be overridden by module developers.
00047  *
00048  * \section transl Translation Unit
00049  *
00050  * The <b>translation unit</b> abstracts away the specifics of the underlying
00051  * implementation of the sockets by wrapping the requests into generic objects,
00052  * and submits them to the backend for processing when the time is ready. The
00053  * Translation unit also handles data buffering and events from the underlying
00054  * sockets. As such, <b>it is also the driver</b> of the scheduler engine.
00055  * And last, but not least, <b>it also bridges the backend and the frontend</b>
00056  * through virtual function and callback mechanisms. This unit is responsible
00057  * for sending the actual I/O requests to the underlying socket implementation,
00058  * as specified by the backend. No I/O may occour without explicit permission
00059  * from the backend (implemented through virtual function mechanisms).
00060  *
00061  * \section backend The backend
00062  *
00063  * The backend works only using abstract request objects of types UploadReqBase,
00064  * DownloadReqBase and ConnReqBase, and performs the I/O scheduling. The backend
00065  * decides when a request may be fulfilled, and how much of the request may be
00066  * fulfilled at this moment. Note that the backend also owns the request objects
00067  * and is responsible for deleting those once they have been fulfilled.
00068  *
00069  * \todo The Translation unit is useless overhead, and could be dropped in
00070  *       future versions; requests should be constructed only once, in SSocket
00071  *       class, and passed to SchedBase as ref-counted pointers to avoid
00072  *       excessive construction/destruction overheads.
00073  * \bug State variables aren't working in SSocket
00074  *      Since SSocket doesn't keep any information of it's own, and calls to
00075  *      implementation are passed through Request engine, queries on socket
00076  *      state right after issuing a function, e.g. connect() on it, return
00077  *      wrong state, since the call hasn't been passed to implementation yet.
00078  *      Possible fixes include setState method in implementation classes, or an
00079  *      additional state variable in SSocket class (latter is prefered).
00080  */
00081 
00082 #ifndef __SCHEDULER_H__
00083 #define __SCHEDULER_H__
00084 
00085 #include <hn/schedbase.h>          // scheduler base
00086 #include <hn/sockets.h>            // xplatform socket api
00087 #include <hn/log.h>                // for logging functions
00088 #include <hn/rangelist.h>          // for ranges
00089 #include <boost/function.hpp>      // function objects
00090 #include <boost/scoped_array.hpp>  // scoped_array
00091 
00092 namespace SchedEventHandler {
00093 
00094 /**
00095  * Policy class used by Scheduler for events handling.
00096  *
00097  * @param Source       Source class type generating the events
00098  * @param Scheduler    Parenting scheduler for this socket type
00099  */
00100 template<typename Source, typename Scheduler>
00101 class ClientEventHandler {
00102         typedef typename Scheduler::SIter SIter;
00103         typedef typename Scheduler::RIter RIter;
00104         typedef typename Scheduler::UploadReq UploadReq;
00105         typedef typename Scheduler::DownloadReq DownloadReq;
00106         typedef typename Scheduler::ConnReq ConnReq;
00107 public:
00108         /**
00109          * Socket event handler. Note that the actual events handling is
00110          * directed to smaller helper functions for simplicity and clarity.
00111          *
00112          * @param src        Source of the event
00113          * @param evt        The event itself
00114          *
00115          * \note This function is called from main event loop, and should never
00116          *       be called directly.
00117          */
00118         static void onEvent(Source *src, SocketEvent evt) {
00119                 switch (evt) {
00120                         case SOCK_READ:
00121                                 handleRead(src);
00122                                 break;
00123                         case SOCK_WRITE:
00124                                 handleWrite(src);
00125                                 break;
00126                         case SOCK_CONNECTED:
00127                                 handleConnected(src);
00128                                 break;
00129                         case SOCK_LOST:
00130                         case SOCK_ERR:
00131                         case SOCK_TIMEOUT:
00132                         case SOCK_CONNFAILED:
00133                                 handleErr(src, evt);
00134                                 break;
00135                         default:
00136                                 logWarning(boost::format(
00137                                         "Scheduler: Unhandled socket event %p"
00138                                 ) % evt);
00139                                 break;
00140                 }
00141         }
00142 
00143         //! Handles "connection established" type of events
00144         static void handleConnected(Source *src) {
00145                 SIter i = Scheduler::s_sockets.find(src);
00146                 CHECK_FAIL(i != Scheduler::s_sockets.end());
00147                 if ((*i).m_outBuffer->size()) {
00148                         UploadReq *r = new UploadReq(*i);
00149                         SchedBase::instance().addUploadReq(r);
00150                 }
00151                 (*i).notify(SOCK_CONNECTED);
00152         }
00153 
00154         //! Handles "socket became writable" type of events
00155         static void handleWrite(Source *src) {
00156                 SIter i = Scheduler::s_sockets.find(src);
00157                 CHECK_FAIL(i != Scheduler::s_sockets.end());
00158                 if ((*i).m_outBuffer->size()) {
00159                         RIter j = Scheduler::s_upReqs.find(src);
00160                         if (j != Scheduler::s_upReqs.end()) {
00161                                 (*j).second->setValid(true);
00162                         } else {
00163                                 UploadReq *req = new UploadReq(*i);
00164                                 SchedBase::instance().addUploadReq(req);
00165                         }
00166                 } else {
00167                         (*i).notify(SOCK_WRITE);
00168                 }
00169         }
00170 
00171         //! Handles "socket became readable" type of events
00172         static void handleRead(Source *src) {
00173                 RIter j = Scheduler::s_downReqs.find(src);
00174                 if (j != Scheduler::s_downReqs.end()) {
00175                         (*j).second->setValid(true);
00176                 } else {
00177                         SIter i = Scheduler::s_sockets.find(src);
00178                         CHECK_FAIL(i != Scheduler::s_sockets.end());
00179                         DownloadReq *r = new DownloadReq(*i);
00180                         SchedBase::instance().addDloadReq(r);
00181                 }
00182         }
00183 
00184         /**
00185          * Handles all error conditions, e.g. timeouts, connfailed etc.
00186          *
00187          * @param src     Event source
00188          * @param evt     The error event
00189          */
00190         static void handleErr(Source *src, SocketEvent evt) {
00191                 SchedBase::instance().delConn();
00192                 Scheduler::invalidateReqs(src);
00193                 SIter i = Scheduler::s_sockets.find(src);
00194                 CHECK_FAIL(i != Scheduler::s_sockets.end());
00195                 (*i).notify(evt);
00196         }
00197 };
00198 
00199 /**
00200  * Policy class for events emitted from server type sockets. This class is
00201  * used by Scheduler for events handling.
00202  *
00203  * @param Source       Event source object type
00204  * @param Scheduler    Scheduler corresponding to the source type
00205  */
00206 template<typename Source, typename Scheduler>
00207 class ServerEventHandler {
00208 public:
00209         typedef typename Scheduler::SIter SIter;
00210         typedef typename Scheduler::AcceptReq AcceptReq;
00211 
00212         /**
00213          * Event handler function for events emitted from servers
00214          *
00215          * @param Scheduler      Scheduler governing this socket
00216          * @param src            Source of the event
00217          * @param evt            The actual event itself
00218          *
00219          * \note This function is called from main event loop and should never
00220          *       be called directly by user code.
00221          */
00222         static void onEvent(Source *src, SocketEvent evt) {
00223                 switch (evt) {
00224                         case SOCK_ACCEPT: {
00225                                 SIter i = Scheduler::s_sockets.find(src);
00226                                 CHECK_FAIL(i != Scheduler::s_sockets.end());
00227                                 AcceptReq *ar = new AcceptReq(*i);
00228                                 SchedBase::instance().addConnReq(ar);
00229                                 break;
00230                         }
00231                         case SOCK_LOST:
00232                                 // Server losing connection ?
00233                                 break;
00234                         case SOCK_ERR:
00235                                 // Server became erronous ?
00236                                 break;
00237                         default:
00238                                 break;
00239                 }
00240         }
00241 };
00242 
00243 } // !SchedEventHandler
00244 
00245 /**
00246  * Traits class - selects the event handler policy class based on the event
00247  * source type. For example, SocketClient and UDPSocket use same event handler
00248  * policy, while SocketServer needs a specialized version of event handler
00249  * due to the radically different nature of it.
00250  *
00251  * The primary template of this class is not implemented. Instead, one of the
00252  * specialized versions below are chosen. If no possible specialization could
00253  * be chosen, this generates an undefined reference during linking, which is
00254  * expected.
00255  */
00256 template<typename Source, typename Scheduler>
00257 class GetEventHandler;
00258 
00259 template<typename Scheduler>
00260 class GetEventHandler<SocketClient, Scheduler> {
00261 public:
00262         typedef SchedEventHandler::ClientEventHandler<
00263                 SocketClient, Scheduler
00264         > Handler;
00265 };
00266 
00267 template<typename Scheduler>
00268 class GetEventHandler<UDPSocket, Scheduler> {
00269 public:
00270         typedef SchedEventHandler::ClientEventHandler<
00271                 UDPSocket, Scheduler
00272         > Handler;
00273 };
00274 
00275 template<typename Scheduler>
00276 class GetEventHandler<SocketServer, Scheduler> {
00277 public:
00278         typedef SchedEventHandler::ServerEventHandler<
00279                 SocketServer, Scheduler
00280         > Handler;
00281 };
00282 
00283 /**
00284  * Scheduler class, implementing second level of HydraNode Networking Scheduling
00285  * API, abstracts away modules part of the sockets by generating a priority
00286  * score (PS) for each of the pending requests. All requests are received from
00287  * the frontend, wrapped into generic containers, and buffered internally for
00288  * later processing. No direct action shall be taken in the functions directly
00289  * or indirectly called from frontend.
00290  *
00291  * @param Impl             Implemenetation class type
00292  * @param ImplPtr          Pointer to implementation class
00293  */
00294 template<typename Impl, typename ImplPtr = Impl*>
00295 class Scheduler {
00296         //! Function object that can be used to retrieve a module's score
00297         typedef typename boost::function<float (ImplPtr)> ScoreFunc;
00298         //! Accept type (saves some typing)
00299         typedef typename Impl::AcceptType AcceptType;
00300         //! Type of events emitted from Impl
00301         typedef typename Impl::EventType EventType;
00302         //! Function prototype for events emitted from Impl
00303         typedef boost::function<void (ImplPtr, EventType)> HandlerFunc;
00304         //! Event handler object
00305         typedef typename GetEventHandler<Impl, Scheduler>::Handler EventHandler;
00306 public:
00307         /**
00308          * Schedule outgoing data
00309          *
00310          * @param ptr   Implementation pointer where to send this data
00311          * @param data  Data buffer to be sent out
00312          *
00313          * If there is existing outgoing data scheduled for this socket, the
00314          * newly passed data must be appeneded to existing data buffer.
00315          * Otherwise, a new data buffer is allocated for this socket, and the
00316          * data copied into there. In the latter case, a new upload request
00317          * is generated and submitted to SchedBase for processing.
00318          *
00319          * \note The pointed socket is not required to be in connected state
00320          *       at the time of this call, since no actual I/O is performed
00321          *       here. In that case, the data will be sent as soon as the
00322          *       connection has been established.
00323          *
00324          * \pre ptr is previously added to scheduler using addSocket method
00325          */
00326         template<typename Module>
00327         static void write(ImplPtr ptr, const std::string &data) {
00328                 SIter s = s_sockets.find(ptr);
00329                 RIter i = s_upReqs.find(ptr);
00330                 CHECK_FAIL(s != s_sockets.end());
00331 
00332                 (*s).m_outBuffer->append(data);
00333 
00334                 if (i != s_upReqs.end()) {
00335                         (*i).second->setValid(true);
00336                 } else if ((*s).getSocket()->isConnected()) {
00337                         SchedBase::instance().addUploadReq(new UploadReq(*s));
00338                 }
00339         }
00340 
00341         /**
00342          * Read data from socket
00343          *
00344          * @param ptr       Socket to read data from
00345          * @param buf       Buffer to append the retrieved data to
00346          *
00347          * This function is only allowed to read data from previously allocated
00348          * internal buffer. No additional network I/O may be performed. The
00349          * internal buffer is located using the passed pointer, and the found
00350          * data (if any) appeneded to the designated buffer. The internal buffer
00351          * must then be deallocated.
00352          *
00353          * \note It is not required that the designated socket is in connected
00354          *       state at the time of this call, since no actual networking I/O
00355          *       is performed.
00356          */
00357         template<typename Module>
00358         static void read(ImplPtr ptr, std::string *buf) {
00359                 SIter i = s_sockets.find(ptr);
00360                 CHECK_FAIL(i != s_sockets.end());
00361                 buf->append(*(*i).m_inBuffer);
00362                 (*i).m_inBuffer->clear();
00363         }
00364 
00365         /**
00366          * Faster version of read(), this returns the input buffer directly,
00367          * performing two std::string copy operations (cheap on most impls),
00368          * avoiding (possibly costly) std::string::append() call.
00369          *
00370          * @param ptr     Pointer to socket to read data from
00371          * @return        The current input buffer
00372          *
00373          * \note Just as with read() method, the scheduler's internal buffer is
00374          *       cleared with this call.
00375          */
00376         template<typename Module>
00377         static std::string getData(ImplPtr ptr) {
00378                 SIter i = s_sockets.find(ptr);
00379                 CHECK_FAIL(i != s_sockets.end());
00380                 std::string tmp = *(*i).m_inBuffer;
00381                 (*i).m_inBuffer->clear();
00382                 return tmp;
00383         }
00384 
00385         /**
00386          * Accept a pending connection
00387          *
00388          * @param ptr    Socket to accept the connection from
00389          * @return       A new socket, which is in connected state
00390          *
00391          * The function searches the internal buffer for the socket designated
00392          * to by ptr parameter, and returns the dynamically allocated connection
00393          * to caller. If there are no pending accepted connections, exception
00394          * is thrown.
00395          */
00396         template<typename Module>
00397         static AcceptType* accept(ImplPtr ptr) {
00398                 SIter i = s_sockets.find(ptr);
00399                 CHECK_FAIL(i != s_sockets.end());
00400                 CHECK_THROW((*i).m_accepted->size());
00401                 AcceptType *tmp = (*i).m_accepted->front();
00402                 (*i).m_accepted->pop_front();
00403                 return tmp;
00404         }
00405 
00406         /**
00407          * Request an outgoing connection
00408          *
00409          * @param ptr       Socket requesting the connection
00410          * @aram addr       Address to connect to
00411          * @param timeout   Milliseconds to try to connect before giving up
00412          */
00413         template<typename Module>
00414         static void connect(ImplPtr ptr, IPV4Address addr, uint32_t timeout) {
00415                 SIter i = s_sockets.find(ptr);
00416                 CHECK_FAIL(i != s_sockets.end());
00417                 ConnReq *cr = new ConnReq(*i, addr, timeout);
00418                 SchedBase::instance().addConnReq(cr);
00419         }
00420 
00421         /**
00422          * Disconnect a connected socket
00423          *
00424          * @param ptr       Socket to be disconnected
00425          * @param lazy      If true, the actual disconnection is delayed until
00426          *                  all outgoing data is sent out.
00427          *
00428          * Note that disconnecting a socket invalidates all pending requests
00429          * for this socket, however does not clear already buffered data.
00430          */
00431         template<typename Module>
00432         static void disconnect(ImplPtr ptr, bool lazy) {
00433                 if (lazy) {
00434                         SIter i = s_sockets.find(ptr);
00435                         CHECK_FAIL(i != s_sockets.end());
00436                         if ((*i).m_outBuffer->size()) {
00437                                 s_toDisconnect.insert(ptr);
00438                         }
00439                 } else {
00440                         ptr->disconnect();
00441                         SchedBase::instance().delConn();
00442                         invalidateReqs(ptr);
00443                 }
00444         }
00445 
00446         /**
00447          * Add a socket to the scheduler
00448          *
00449          * @param Module      Module owning this socket
00450          * @param s           Socket to be added
00451          * @param h           Frontend event handler for notifications
00452          */
00453         template<typename Module>
00454         static void addSocket(ImplPtr s, HandlerFunc h) {
00455                 logTrace(
00456                         TRACE_SCHED,
00457                         boost::format("Adding socket %p to scheduler.") % s
00458                 );
00459 
00460                 ScoreFunc sf(&SchedBase::getScore<Module, ImplPtr>);
00461                 SSocketWrapper sw(s, h, sf);
00462                 HandlerFunc hfunc(&EventHandler::onEvent);
00463 
00464                 sw.setConn(Impl::getEventTable().addHandler(s, hfunc));
00465                 s_sockets.insert(sw);
00466         }
00467 
00468         /**
00469          * Remove a socket from the scheduler
00470          *
00471          * @param s         Socket to be removed
00472          *
00473          * \note All pending data buffers will be cleared for this socket.
00474          * \note All pending requests related to this socket will be deleted.
00475          */
00476         static void delSocket(ImplPtr s) {
00477                 logTrace(
00478                         TRACE_SCHED,
00479                         boost::format("Removing socket %p from scheduler.") % s
00480                 );
00481 
00482                 SIter i = s_sockets.find(s);
00483                 CHECK_FAIL(i != s_sockets.end());
00484 
00485                 const SSocketWrapper &w = *i;
00486                 Impl::getEventTable().delHandler(w.getSocket(), w.getConn());
00487                 s_sockets.erase(i);
00488                 if (s->isConnected()) {
00489                         SchedBase::instance().delConn();
00490                 }
00491                 invalidateReqs(s);
00492         }
00493 
00494 // We need the internal classes public so the policy classes (namely,
00495 // EventHandler) can access them. If only we could make the EventHandler a
00496 // friend of ours, we could move this to private sector.
00497 public:
00498         /**
00499          * Wrapper object for scheduled socket, contains all the useful
00500          * information we need, e.g. score, frontend event handler, and
00501          * underlying socket object. Used by requests to keep track of which
00502          * socket the request belongs to. This object can and should be held
00503          * on stack, since all members of this object are either cheap to copy,
00504          * or use reference-counting.
00505          *
00506          */
00507         class SSocketWrapper {
00508         public:
00509                 /**
00510                  * Construct new socket wrapper
00511                  *
00512                  * @param s     Implementation-defined pointer to underlying
00513                  *              socket
00514                  * @param h     Frontend event handler for notifications
00515                  * @param f     Score function object to retrieve this socket's
00516                  *              priority score
00517                  */
00518                 SSocketWrapper(ImplPtr s, HandlerFunc h = 0, ScoreFunc f = 0)
00519                 : m_socket(s), m_handler(h), m_scoreFunc(f),
00520                 m_outBuffer(new std::string), m_inBuffer(new std::string),
00521                 m_accepted(new std::deque<AcceptType*>) {}
00522 
00523                 //! @name Accessors
00524                 //@{
00525                 ImplPtr     getSocket()  const { return m_socket;              }
00526                 float       getScore()   const { return m_scoreFunc(m_socket); }
00527                 HandlerFunc getHandler() const { return m_handler;             }
00528                 boost::signals::connection getConn() const { return m_conn; }
00529                 void setConn(boost::signals::connection c) { m_conn = c; }
00530                 //@}
00531 
00532                 /**
00533                  * Pass event to frontend
00534                  *
00535                  * @param evt      Event to be sent
00536                  *
00537                  * \note If no handler is set, this function does nothing
00538                  */
00539                 void notify(EventType evt) const {
00540                         if (m_handler) {
00541                                 m_handler(m_socket, evt);
00542                         }
00543                 }
00544 
00545                 friend bool operator<(
00546                         const SSocketWrapper &x, const SSocketWrapper &y
00547                 ) {
00548                         return x.m_socket < y.m_socket;
00549                 }
00550 
00551         // Since this is an internal class, we keep data members public for
00552         // easier access
00553                 SSocketWrapper();         //!< Forbidden
00554 
00555                 ImplPtr     m_socket;     //!< Underlying socket
00556                 HandlerFunc m_handler;    //!< Frontend event handler
00557                 boost::signals::connection m_conn; //!< Backend event connection
00558                 ScoreFunc   m_scoreFunc;  //!< Function to retrieve the score
00559 
00560                 //! Outgoing data buffer
00561                 boost::shared_ptr<std::string> m_outBuffer;
00562                 //! Incoming data buffer
00563                 boost::shared_ptr<std::string> m_inBuffer;
00564                 //! Accepted connections
00565                 boost::shared_ptr<std::deque<AcceptType*> > m_accepted;
00566         };
00567 
00568         /**
00569          * Upload request is a request that indicates we wish to send out
00570          * data to a socket. The request is to be passed to SchedBase class
00571          * which in turn calls us back through virtual function doSend() when
00572          * we are allowed to perform the actual sending.
00573          */
00574         class UploadReq : public SchedBase::UploadReqBase {
00575         public:
00576                 /**
00577                  * Construct new upload request
00578                  *
00579                  * @param s       Socket where this request belongs to
00580                  * @param buf     Pointer to data buffer to be sent
00581                  */
00582                 UploadReq(const SSocketWrapper &s)
00583                 : UploadReqBase(s.getScore()), m_obj(s) {
00584                         bool ret = Scheduler::s_upReqs.insert(
00585                                 std::make_pair(s.getSocket(), this)
00586                         ).second;
00587                         CHECK_FAIL(ret);
00588                         (void)ret; // suppress warning in release build
00589                 }
00590 
00591                 //! Erase ourselves from s_upReqs map
00592                 ~UploadReq() {
00593                         Scheduler::s_upReqs.erase(m_obj.getSocket());
00594                 }
00595 
00596                 /**
00597                  * Send out data
00598                  *
00599                  * @param num      Number of bytes to send out
00600                  * @return         Number of bytes actually sent out
00601                  */
00602                 virtual uint32_t doSend(uint32_t num) {
00603                         uint32_t peer = m_obj.getSocket()->getPeer().getIp();
00604                         bool isLimited = SchedBase::instance().isLimited(peer);
00605 
00606                         if (!isLimited || num > m_obj.m_outBuffer->size()) {
00607                                 num = m_obj.m_outBuffer->size();
00608                         }
00609 
00610                         uint32_t ret = 0;
00611                         ret = m_obj.getSocket()->write(
00612                                 m_obj.m_outBuffer->data(), num
00613                         );
00614 
00615                         if (ret < m_obj.m_outBuffer->size()) {
00616                                 m_obj.m_outBuffer->erase(0, ret);
00617                         } else {
00618                                 m_obj.m_outBuffer->clear();
00619                                 invalidate();
00620                         }
00621 
00622                         return isLimited ? ret : 0;
00623                 }
00624 
00625                 //! Send notification to client code, requesting more data
00626                 virtual void notify() const {
00627                         m_obj.notify(SOCK_WRITE);
00628                 }
00629 
00630                 //! Retrieve number of pending bytes in this request
00631                 virtual uint32_t getPending() const {
00632                         return m_obj.m_outBuffer->size();
00633                 }
00634         private:
00635                 UploadReq();           //!< Forbidden
00636                 SSocketWrapper m_obj;  //!< Keeps reference data for socket
00637         };
00638 
00639         /**
00640          * Download request is an indication that we wish to receive data from
00641          * a peer. As a general rule, the Scheduler assumes that when this
00642          * request is submitted, there actually is pending data in the
00643          * underlying socket to be received.
00644          *
00645          * When the main scheduler decides we are allowed to receive data, it
00646          * will call doRecv() method, where we can perform actual I/O.
00647          */
00648         class DownloadReq : public SchedBase::DownloadReqBase {
00649         public:
00650                 //! Construct
00651                 DownloadReq(const SSocketWrapper &s)
00652                 : DownloadReqBase(s.getScore()), m_obj(s) {
00653                         bool ret = Scheduler::s_downReqs.insert(
00654                                 std::make_pair(s.getSocket(), this)
00655                         ).second;
00656                         CHECK_FAIL(ret);
00657                         (void)ret; // suppress warning in release build
00658                 }
00659 
00660                 //! Destructor
00661                 ~DownloadReq() {
00662                         Scheduler::s_downReqs.erase(m_obj.getSocket());
00663                 }
00664 
00665                 /**
00666                  * This method is called from SchedBase (as virtual function),
00667                  * and indicates that we may start performing actual data
00668                  * receiving on this socket. The received data is appended to
00669                  * sockets incoming data buffer.
00670                  *
00671                  * @param amount      Amount of data we are allowed to receive
00672                  * @return            Amount of data actually received
00673                  *
00674                  * If the remote peer is marked as no_limit in SchedBase, we
00675                  * ignore the limit here, and recevive max 100k with each block.
00676                  */
00677                 virtual uint32_t doRecv(uint32_t amount) {
00678                         // static input buffer
00679                         static char buf[SchedBase::INPUT_BUFSIZE];
00680 
00681                         uint32_t peer = m_obj.getSocket()->getPeer().getIp();
00682                         bool isLimited = SchedBase::instance().isLimited(peer);
00683 
00684                         if (!isLimited || amount > SchedBase::INPUT_BUFSIZE) {
00685                                 amount = SchedBase::INPUT_BUFSIZE;
00686                         }
00687 
00688                         int ret = m_obj.getSocket()->read(buf, amount);
00689 
00690                         if (ret == 0) {  // Got no data - mh ?
00691                                 return 0;
00692                         }
00693 
00694                         m_obj.m_inBuffer->append(buf, ret);
00695 
00696                         // if no limit is applied, don't return count either
00697                         return isLimited ? ret : 0;
00698                 }
00699 
00700                 //! Notify client code
00701                 virtual void notify() const {
00702                         m_obj.notify(SOCK_READ);
00703                 }
00704         private:
00705                 DownloadReq();             //!< Forbidden
00706                 SSocketWrapper m_obj;      //!< Underlying socket
00707         };
00708 
00709         /**
00710          * Accept request indicates we wish to accept an incoming connection
00711          * from one of the servers. It is of generic type Connection Request.
00712          * When SchedBase decides we may accept the connection, it will call
00713          * doConn() member function, where we may perform actual connection
00714          * accepting.
00715          */
00716         class AcceptReq : public SchedBase::ConnReqBase {
00717         public:
00718                 //! Construct
00719                 AcceptReq(const SSocketWrapper &s) : ConnReqBase(s.getScore()),
00720                 m_obj(s) {
00721                         bool ret = Scheduler::s_accReqs.insert(
00722                                 std::make_pair(s.getSocket(), this)
00723                         ).second;
00724                         CHECK_FAIL(ret);
00725                         (void)ret; // suppress warning in release build
00726                 }
00727                 //! Destroy
00728                 ~AcceptReq() {
00729                         Scheduler::s_accReqs.erase(m_obj.getSocket());
00730                 }
00731 
00732                 /**
00733                  * Perform actual connection accepting. The accepted connection
00734                  * must be buffered into s_accepted map for later retrieval
00735                  * by client code. Note that we may NOT notify client code here.
00736                  * Notifications are managed by SchedBase.
00737                  *
00738                  * @return       Bitfield of ConnReqBase::ConnRet values
00739                  */
00740                 virtual int doConn() {
00741                         AcceptType *s = m_obj.getSocket()->accept();
00742                         m_obj.m_accepted->push_back(s);
00743                         invalidate();
00744                         return ADDCONN | REMOVE | NOTIFY;
00745                 }
00746 
00747                 virtual void notify() const {
00748                         m_obj.notify(SOCK_ACCEPT);
00749                 }
00750         private:
00751                 AcceptReq();             //!< Forbidden
00752                 SSocketWrapper m_obj;    //!< Socket reference data
00753         };
00754 
00755         /**
00756          * Connection request indicates we wish to perform an outgoing
00757          * connection. When we are allowed to perform the actual connecting,
00758          * SchedBase calls our doConn() member, where we may perform the actual
00759          * connecting.
00760          */
00761         class ConnReq : public SchedBase::ConnReqBase {
00762         public:
00763                 //! Construct
00764                 ConnReq(
00765                         const SSocketWrapper &s, IPV4Address addr,
00766                         uint32_t timeout
00767                 ) : ConnReqBase(s.getScore()), m_obj(s), m_addr(addr),
00768                 m_timeout(timeout) {
00769                         bool ret = Scheduler::s_connReqs.insert(
00770                                 std::make_pair(s.getSocket(), this)
00771                         ).second;
00772                         CHECK_FAIL(ret);
00773                         (void)ret; // suppress warning in release build
00774                 }
00775 
00776                 //! Destroy
00777                 ~ConnReq() {
00778                         Scheduler::s_connReqs.erase(m_obj.getSocket());
00779                 }
00780 
00781                 /**
00782                  * Initialize the actual connection
00783                  *
00784                  * @return      If the attempt succeeded or not
00785                  */
00786                 virtual int doConn() try {
00787                         m_obj.getSocket()->connect(m_addr, m_timeout);
00788                         return REMOVE | ADDCONN;
00789                 } catch (...) {
00790                         return REMOVE | NOTIFY;
00791                 }
00792 
00793                 /**
00794                  * Notify client code; only SOCK_CONNFAILED is emitted here,
00795                  * because SOCK_CONNECTED is received via events from underlying
00796                  * socket API.
00797                  */
00798                 virtual void notify() const {
00799                         m_obj.notify(SOCK_CONNFAILED);
00800                 }
00801         private:
00802                 ConnReq();                  //!< Forbidden
00803                 SSocketWrapper m_obj;       //!< Socket reference data
00804                 IPV4Address    m_addr;      //!< Address to connect to
00805                 uint32_t       m_timeout;   //!< Optional connect timeout
00806         };
00807 
00808         //! set of all scheduled sockets
00809         static std::set<SSocketWrapper> s_sockets;
00810         typedef typename std::set<SSocketWrapper>::iterator SIter;
00811 
00812         //! Sockets to be disconnected as soon as all pending upstream data
00813         //! has been sent out.
00814         static std::set<ImplPtr> s_toDisconnect;
00815 
00816         /**
00817          * @name Maps to keep pointers to all pending requests for all sockets.
00818          *
00819          * The purpose of these maps is to keep backwards links to requests
00820          * submitted to SchedBase in case we need to invalidate them or
00821          * something. Note that these maps only keep weak references to the
00822          * requests. Elements of these maps should never be deleted directly.
00823          * The elements are owned by SchedBase class.
00824          *
00825          * To make things simpler and cleaner, the specific requests insert
00826          * themselves to these maps on construction, and erase themselves on
00827          * destruction.
00828          */
00829         //@{
00830         static std::map<ImplPtr, SchedBase::ReqBase*> s_upReqs;
00831         static std::map<ImplPtr, SchedBase::ReqBase*> s_downReqs;
00832         static std::map<ImplPtr, SchedBase::ReqBase*> s_connReqs;
00833         static std::map<ImplPtr, SchedBase::ReqBase*> s_accReqs;
00834         //@}
00835 
00836         typedef typename std::map<
00837                 ImplPtr, SchedBase::ReqBase*
00838         >::iterator RIter;
00839 
00840         /**
00841          * Invalidate all requests related to a specific socket
00842          *
00843          * @param ptr     Socket for which all requests should be invalidated
00844          */
00845         static void invalidateReqs(ImplPtr ptr) {
00846                 RIter i = s_upReqs.find(ptr);
00847                 if (i != s_upReqs.end()) {
00848                         logTrace(TRACE_SCHED, "Aborting upload request.");
00849                         (*i).second->invalidate();
00850                 }
00851                 i = s_downReqs.find(ptr);
00852                 if (i != s_downReqs.end()) {
00853                         logTrace(TRACE_SCHED, "Aborting download request.");
00854                         (*i).second->invalidate();
00855                 }
00856                 i = s_accReqs.find(ptr);
00857                 if (i != s_accReqs.end()) {
00858                         logTrace(TRACE_SCHED, "Aborting accept request.");
00859                         (*i).second->invalidate();
00860                 }
00861                 i = s_connReqs.find(ptr);
00862                 if (i != s_connReqs.end()) {
00863                         logTrace(TRACE_SCHED, "Aborting connection request.");
00864                         (*i).second->invalidate();
00865                 }
00866         }
00867 };
00868 
00869 // initialize static data
00870 template<typename P1, typename P2>
00871 std::set<typename Scheduler<P1, P2>::SSocketWrapper>
00872 Scheduler<P1, P2>::s_sockets;
00873 template<typename P1, typename P2>
00874 std::set<P2> Scheduler<P1, P2>::s_toDisconnect;
00875 template<typename P1, typename P2>
00876 std::map<P2, SchedBase::ReqBase*>
00877 Scheduler<P1, P2>::s_upReqs;
00878 template<typename P1, typename P2>
00879 std::map<P2, SchedBase::ReqBase*>
00880 Scheduler<P1, P2>::s_downReqs;
00881 template<typename P1, typename P2>
00882 std::map<P2, SchedBase::ReqBase*>
00883 Scheduler<P1, P2>::s_accReqs;
00884 template<typename P1, typename P2>
00885 std::map<P2, SchedBase::ReqBase*>
00886 Scheduler<P1, P2>::s_connReqs;
00887 
00888 #endif