clientlist.cpp

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 /** \file clientlist.cpp Implementation of ClientList class */
00020 
00021 #include <hn/hnprec.h>
00022 #include "clientlist.h"
00023 #include "parser.h"
00024 #include "ed2k.h"
00025 #include "clientext.h"
00026 #include "downloadlist.h"
00027 #include <hn/ssocket.h>
00028 #include <boost/lambda/bind.hpp>
00029 #include <boost/multi_index_container.hpp>
00030 #include <boost/multi_index/ordered_index.hpp>
00031 #include <boost/multi_index/key_extractors.hpp>
00032 
00033 enum Ed2k_Constants {
00034         /**
00035          * Size of the udp listener's input buffer size; UDP packets exceeding
00036          * this size will be truncated. Since the UDP buffer is reused for new
00037          * packets (and not reallocated), changing this shouldn't have any
00038          * effect on performance.
00039          */
00040 
00041         UDP_BUFSIZE = 1024,
00042         /**
00043          * How often to recalculate the queue scores and resort the queue. Since
00044          * this is rather time-consuming operation if the queue is big, this is
00045          * done on regular intervals.
00046          */
00047 
00048         QUEUE_UPDATE_TIME = 10000,
00049         /**
00050          * When previous UDP attempt timeouts, how long to wait until next try?
00051          */
00052 
00053         UDP_REASK_INTERVAL = 10*60*1000,
00054         /**
00055          * How long we keep clients in queue who haven't performed any kind of
00056          * reask.
00057          */
00058 
00059         QUEUE_DROPTIME = 60*60*1000,
00060 
00061         /**
00062          * Specifies the TCP connection attempt timeout.
00063          */
00064         CONNECT_TIMEOUT = 15000
00065 };
00066 
00067 IMPLEMENT_EVENT_TABLE(ClientList, ClientList*, ClientList::ClientListEvt);
00068 
00069 const std::string TRACE_CLIENT = "Ed2kClient";
00070 const std::string TRACE_SECIDENT = "Ed2kClient::SecIdent";
00071 const std::string TRACE_DEADSRC = "ED2KClient::DeadSource";
00072 const std::string TRACE_CLIST = "Ed2k::ClientList";
00073 const std::string TRACE_SRCEXCH = "ED2KClient::SourceExchange";
00074 
00075 namespace Detail {
00076 //! ClientList indexes
00077 struct ClientListIndices : boost::multi_index::indexed_by<
00078         boost::multi_index::ordered_unique<
00079                 boost::multi_index::identity<Client*>
00080         >,
00081         boost::multi_index::ordered_non_unique<
00082                 boost::multi_index::const_mem_fun<
00083                         Client, uint32_t, &Client::getId
00084                 >
00085         >,
00086         boost::multi_index::ordered_non_unique<
00087                 boost::multi_index::const_mem_fun<
00088                         Client, Hash<MD4Hash>, &Client::getHash
00089                 >
00090         >
00091 > {};
00092 //! ClientList type
00093 struct CList : boost::multi_index_container<Client*, ClientListIndices> {};
00094 
00095 //! Different index ID's
00096 enum { ID_Client, ID_Id, ID_Hash };
00097 typedef CList::nth_index<ID_Client>::type CMap;
00098 typedef CList::nth_index<ID_Id>::type IDMap;
00099 typedef CList::nth_index<ID_Hash>::type HashMap;
00100 typedef CMap::iterator CIter;
00101 typedef IDMap::iterator IIter;
00102 typedef HashMap::iterator HashIter;
00103 
00104 } // namespace Detail
00105 using namespace Detail;
00106 
00107 // dummy constructors/destructors. Don't do anything fancy here - do in init()!
00108 ClientList::ClientList() : m_clients(new CList), m_listener(),
00109 m_udpBuffer(new char[UDP_BUFSIZE]) {
00110         // regen queue every 10 seconds
00111         getEventTable().postEvent(this, EVT_REGEN_QUEUE, QUEUE_UPDATE_TIME);
00112         getEventTable().addHandler(this, this, &ClientList::onClientListEvent);
00113         Client::getEventTable().addAllHandler(this, &ClientList::onClientEvent);
00114 
00115         // for debugging
00116 //      Log::instance().enableTraceMask(TRACE_CLIENT);
00117 //      Log::instance().enableTraceMask(TRACE_CLIST);
00118 //      Log::instance().enableTraceMask(TRACE_SECIDENT);
00119 //      Log::instance().enableTraceMask(TRACE_DEADSRC);
00120 //      Log::instance().enableTraceMask(TRACE_SRCEXCH);
00121 }
00122 
00123 ClientList::~ClientList() {}
00124 
00125 void ClientList::exit() {
00126         logDebug("ClientList exiting.");
00127         Log::instance().disableTraceMask(TRACE_CLIENT);
00128         Log::instance().disableTraceMask(TRACE_CLIST);
00129         Log::instance().disableTraceMask(TRACE_SECIDENT);
00130 
00131         delete m_listener;
00132         m_listener = 0;
00133         Client::getUdpSocket()->destroy();
00134         Client::getUdpSocket() = 0;
00135 
00136         for (CIter i = m_clients->begin(); i != m_clients->end(); ++i) {
00137                 delete *i;
00138         }
00139 }
00140 
00141 void ClientList::init() {
00142         if (m_listener) {
00143                 delete m_listener;
00144         }
00145         // bring up TCP listener
00146         m_listener = new ED2KServerSocket(this, &ClientList::onServerEvent);
00147         uint16_t port = ED2K::instance().getTcpPort();
00148         uint16_t upperLimit = port + 10; // how many fallback ports
00149 
00150         while (port < upperLimit) {
00151                 try {
00152                         m_listener->listen(0, port);
00153                         break;
00154                 } catch (SocketError &e) {
00155                         logWarning(boost::format(
00156                                 "Unable to start ED2K listener on port %d: %s"
00157                         ) % port % e.what());
00158                         ++port;
00159                 }
00160         }
00161 
00162         if (!m_listener->isListening()) {
00163                 throw std::runtime_error(
00164                         "Unable to start ED2K listener, giving up."
00165                 );
00166         }
00167         ED2K::instance().setTcpPort(port);
00168         logMsg(
00169                 boost::format(
00170                         COL_BGREEN "ED2K TCP listener " COL_NONE "started on "
00171                         COL_BCYAN "%d" COL_NONE
00172                 ) % port
00173         );
00174 
00175         // bring up UDP listener
00176         Client::getUdpSocket() = new ED2KUDPSocket;
00177         port = ED2K::instance().getUdpPort();
00178         upperLimit = port + 10; // how many fallback ports
00179         while (port < upperLimit) {
00180                 try {
00181                         Client::getUdpSocket()->listen(0, port);
00182                         break;
00183                 } catch (SocketError &e) {
00184                         logWarning(boost::format(
00185                                 "Unable to start ED2K UDP listener "
00186                                 "on port %d: %s"
00187                         ) % port % e.what());
00188                         ++port;
00189                 }
00190         }
00191         ED2K::instance().setUdpPort(port);
00192         ED2KUDPSocket::getEventTable().addHandler(
00193                 Client::getUdpSocket(), this, &ClientList::onUdpData
00194         );
00195         logMsg(
00196                 boost::format(
00197                         COL_BGREEN "ED2K ClientUDP listener " COL_NONE
00198                         "started on port " COL_BCYAN "%d" COL_NONE
00199                 ) % port
00200         );
00201 
00202         Detail::changeId.connect(
00203                 boost::bind(&ClientList::onIdChange, this, _1, _2)
00204         );
00205         Detail::foundSource.connect(
00206                 boost::bind(&ClientList::addSource, this, _1, _2, _3, _4)
00207         );
00208 }
00209 
00210 void ClientList::onServerEvent(ED2KServerSocket *s, SocketEvent evt) {
00211         assert(s == m_listener);
00212 
00213         if (evt == SOCK_ACCEPT) {
00214                 ED2KClientSocket *sock = s->accept();
00215                 logTrace(TRACE_CLIST,
00216                         boost::format("Client connected from %s")
00217                         % sock->getPeer()
00218                 );
00219                 Client *client = new Client(sock);
00220                 m_clients->insert(client);
00221         } else {
00222                 logWarning("Unknown event in ClientList::onServerEvent");
00223         }
00224 }
00225 
00226 //! handles Client::changeId signal
00227 //! \todo Also compare hashes; doesn't work correctly now
00228 //! because hash-index isn't correctly updated right now.
00229 void ClientList::onIdChange(Client *c, uint32_t newId) {
00230         using boost::lambda::bind;
00231 
00232         logTrace(TRACE_CLIST,
00233                 boost::format("[%s] %p: Client ID changed to %d")
00234                 % c->getIpPort() % c % newId
00235         );
00236 
00237         IDMap &idList = m_clients->get<ID_Id>();
00238 
00239         CIter i = m_clients->get<ID_Client>().find(c);
00240         CHECK_THROW(i != m_clients->get<ID_Client>().end());
00241 
00242         IIter idIt = m_clients->project<ID_Id>(i);
00243         idList.modify(idIt, bind(&Client::m_id, __1(__1)) = newId);
00244 
00245         std::pair<IIter, IIter> r = idList.equal_range(c->m_id);
00246         logTrace(TRACE_CLIST,
00247                 boost::format("[%s] Found %d candidates")
00248                 % c->getIpPort() % std::distance(r.first, r.second)
00249         );
00250         for (IIter j = r.first; j != r.second; ++j) {
00251                 CHECK_FAIL((*j)->getId() == c->getId());
00252                 boost::format fmt("[%s] Candidate: %s %s ... %s");
00253                 fmt % c->getIpPort() % (*j)->getIpPort() % *j;
00254                 if (*j == c) {
00255                         logTrace(TRACE_CLIST, fmt % "Is myself ...");
00256                         continue;
00257                 } else if ((*j)->getTcpPort() != c->getTcpPort()) {
00258                         logTrace(TRACE_CLIST, fmt % "Wrong TCPPort");
00259                 } else {
00260                         logTrace(TRACE_CLIST, fmt % "Merging...");
00261                         (*j)->merge(c);
00262                         c->destroy();
00263                         c = *j;
00264                         break;
00265                 }
00266         }
00267 
00268         if (c->m_uploadInfo) {
00269                 m_uploading.insert(c);
00270         }
00271 
00272         // older emules need also to do MuleInfo packet before considering
00273         // handshake completed - hence the check
00274         if (!c->isMule() || (c->isMule() && c->getVerMin() >= 42)) {
00275                 c->handshakeCompleted();
00276         }
00277 }
00278 
00279 void ClientList::removeClient(Client *c) {
00280         logTrace(TRACE_CLIST, boost::format("Destroying client %p") % c);
00281         CHECK_RET(m_clients->find(c) != m_clients->end());
00282         m_clients->erase(c);
00283         m_uploading.erase(c);
00284         m_queued.erase(c);
00285         delete c;
00286         if (m_uploading.size() < 3) {
00287                 startNextUpload();
00288         }
00289 }
00290 
00291 // event handler for events emitted from Client object
00292 void ClientList::onClientEvent(Client *c, ClientEvent evt) {
00293         switch (evt) {
00294                 case EVT_DESTROY:
00295                         removeClient(c);
00296                         break;
00297                 case EVT_UPLOADREQ: try {
00298                         m_uploading.erase(c); // just in case
00299 
00300                         // no uploading/queued clients -> start one
00301                         if (m_uploading.size() < 3 && !m_queued.size()) {
00302                                 c->startUpload();
00303                                 m_uploading.insert(c);
00304                         } else if (m_queued.find(c) == m_queued.end()) {
00305                                 // wasn't found already in queue - push back
00306                                 m_queued.insert(c);
00307                                 m_queue.push_back(c);
00308                                 CHECK_THROW(c->m_queueInfo);
00309                                 c->m_queueInfo->setQR(m_queued.size());
00310                                 if (c->isConnected()) {
00311                                         c->sendQR();
00312                                 }
00313                         } else {
00314                                 // was found in queue - just send current rank
00315                                 c->sendQR();
00316                         }
00317                         break;
00318                 } catch (std::exception &e) {
00319                         logDebug(
00320                                 boost::format("[%s] OnUploadRequest: %s")
00321                                 % c->getIpPort() % e.what()
00322                         );
00323                         if (m_uploading.size() < 3) {
00324                                 startNextUpload();
00325                         }
00326                         break;
00327                 }
00328                 case EVT_CANCEL_UPLOADREQ:
00329                         m_uploading.erase(c);
00330                         m_queued.erase(c);
00331                         if (m_uploading.size() < 3) {
00332                                 startNextUpload();
00333                         }
00334                         break;
00335                 case EVT_CALLBACK_T:
00336                         if (c->callbackInProgress() && !c->isConnected()) {
00337                                 logTrace(TRACE_CLIENT, boost::format(
00338                                         "[%s] %p LowID callback timed out."
00339                                 ) % c->getIpPort() % c);
00340                                 removeClient(c);
00341                         }
00342                         break;
00343                 case EVT_REASKFILEPING:
00344                         if (c->m_sourceInfo && !c->m_downloadInfo) {
00345                                 c->reaskForDownload();
00346                         }
00347                         break;
00348                 case EVT_REASKTIMEOUT:
00349                         if (c->reaskInProgress() && !c->isConnected()) {
00350                                 logTrace(TRACE_CLIENT, boost::format(
00351                                         "[%s] UDP Reask #%d timed out."
00352                                 ) % c->getIpPort() % static_cast<int>(
00353                                         c->m_failedUdpReasks
00354                                 ));
00355 
00356                                 c->m_reaskInProgress = false;
00357 
00358                                 // try UDP 2 times, if that fails, try TCP
00359                                 // callback, if that also fails, drop the client
00360                                 if (++c->m_failedUdpReasks > 2) try {
00361                                         logTrace(TRACE_CLIENT, boost::format(
00362                                                 "[%s] Attempting TCP Reask..."
00363                                         ) % c->getIpPort());
00364                                         c->establishConnection();
00365                                 } catch (std::exception &e) {
00366                                         logDebug(boost::format(
00367                                                 "[%s] Fatal error attempting "
00368                                                 "to establish connection: %s"
00369                                         ) % c->getIpPort() % e.what());
00370                                         c->destroy();
00371                                 } else {
00372                                         // re-try in 10 minutes
00373                                         Client::getEventTable().postEvent(
00374                                                 c, EVT_REASKFILEPING,
00375                                                 UDP_REASK_INTERVAL
00376                                         );
00377                                 }
00378                         }
00379                         break;
00380                 default:
00381                         logWarning("ClientList::onClientEvent: Unknown event.");
00382                         break;
00383         }
00384 }
00385 
00386 /**
00387  * Attempts to start next upload from the queue. If there are no clients in
00388  * queue, this method does nothing. Otherwise, it attempts to start uploading
00389  * to the highest-rated client in the queue (first erasing all invalid entries).
00390  * If something goes wrong during all that, and exception is thrown, the process
00391  * starts again, discarding obsolete entries and attempting to start upload to
00392  * next valid entry. This loop goes around until there are no more entries left
00393  * in the queue, or no exception is thrown anymore.
00394  */
00395 void ClientList::startNextUpload() {
00396         logTrace(TRACE_CLIST, "Starting next upload.");
00397 
00398         while (m_queue.size()) {
00399                 // remove obsolete entries
00400                 while (m_queued.find(m_queue.front()) == m_queued.end()) {
00401                         m_queue.erase(m_queue.begin());
00402                 }
00403                 if (m_queue.size()) try {
00404                         Client *c = m_queue.front();
00405                         m_queued.erase(m_queue.front());
00406                         m_queue.pop_front();
00407                         c->startUpload();
00408                         m_uploading.insert(c);
00409                         if (m_uploading.size() >= 3) {
00410                                 break;
00411                         }
00412                 } catch (std::runtime_error &er) {
00413                         logDebug(
00414                                 boost::format("Error starting upload: %s")
00415                                 % er.what()
00416                         );
00417                 }
00418         }
00419 }
00420 
00421 void ClientList::onClientListEvent(ClientList *, ClientListEvt evt) {
00422         if (evt == EVT_REGEN_QUEUE) {
00423                 updateQueue();
00424         }
00425 }
00426 
00427 bool scoreComp(const Client *const x, const Client *const y) {
00428         return x->getScore() > y->getScore();
00429 }
00430 
00431 
00432 /**
00433  * Regenerate the upload queue, sorting and setting queue ranks as neccesery.
00434  * This method is called from event table every QUEUE_UPDATE_TIME interval.
00435  * The delayed event is also posted from this method.
00436  *
00437  * Three operations need to be performed within this function:
00438  * - Build up m_queue list from items in m_queued. Only use clients which have
00439  *   m_queueInfo member, since we need that member's presence later on. Note
00440  *   that the existing contents of m_queue list must be discarded prior to this
00441  *   operation.
00442  * - Sort the newly generate queue, based on the client's queue scores. The
00443  * - Now that we have a sorted queue, we must iterate over the entire queue once
00444  *   more, and set each client's queue ranking to reflect it's position in the
00445  *   sorted queue. The queue ranking is needed later on by the client itself
00446  *   to send the queue ranking to the remote client on request.
00447  */
00448 void ClientList::updateQueue() {
00449         typedef std::set<Client*>::iterator QIter;
00450         typedef std::list<Client*>::iterator QLIter;
00451 
00452         Utils::StopWatch s;
00453         uint64_t curTick = Utils::getTick();   // cache current tick count
00454         uint32_t dropped = 0;                  // num dropped entries
00455         m_queue.clear();
00456 
00457         for (QIter i = m_queued.begin(); i != m_queued.end(); ++i) {
00458                 Detail::QueueInfoPtr q = (*i)->m_queueInfo;
00459                 if (q && q->getLastQueueReask() + QUEUE_DROPTIME > curTick) {
00460                         m_queue.push_back(*i);
00461                 } else if (q) {
00462                         (*i)->removeFromQueue();
00463                         ++dropped;
00464                 }
00465         }
00466         m_queue.sort(&scoreComp);
00467 
00468         logTrace(TRACE_CLIST,
00469                 boost::format(
00470                         "Queue update: %d clients queued, %d dropped, took %dms"
00471                 ) % m_queue.size() % dropped % s
00472         );
00473 
00474         uint32_t cnt = 1;
00475         for (QLIter i = m_queue.begin(); i != m_queue.end(); ++i, ++cnt) {
00476                 (*i)->m_queueInfo->setQR(cnt);
00477         }
00478 
00479         getEventTable().postEvent(this, EVT_REGEN_QUEUE, QUEUE_UPDATE_TIME);
00480 
00481         // See if we need more slots
00482         checkOpenMoreSlots();
00483 }
00484 
00485 /**
00486  * Calculates last 10 seconds' average uploadrate, and if that is less than 90%
00487  * of allowed limit, opens up another upload slot.
00488  *
00489  * Note that we check & verify against first call to this method using a
00490  * static boolean. Problem is, during first call, we cannot accurately calculate
00491  * last 10 second's avg speed, since lastSent isn't initialized yet. Thus this
00492  * hack. Alternatively, we could ofcourse move lastSent outside this method,
00493  * and initialize it elsewhere, but I like keeping methods self-contained :)
00494  */
00495 void ClientList::checkOpenMoreSlots() {
00496         using Utils::bytesToString;
00497         static uint32_t lastSent = SchedBase::instance().getTotalUpstream();
00498         static uint32_t lastRecv = SchedBase::instance().getTotalDownstream();
00499         static uint64_t lastSentTime = Utils::getTick();
00500         static uint64_t lastRecvTime = Utils::getTick();
00501         static bool firstCall = true;
00502         static uint64_t lastOverheadDn = ED2KPacket::getOverheadDn();
00503         static uint64_t lastOverheadUp = ED2KPacket::getOverheadUp();
00504 
00505         if (firstCall) {
00506                 firstCall = false;
00507                 return;
00508         }
00509 
00510         uint64_t curTick = Utils::getTick();
00511 
00512         // do calculation with floating-point precision, and then convert the
00513         // final result to uint32_t, to get highest reasonable precision.
00514         uint32_t upDiff = SchedBase::instance().getTotalUpstream() - lastSent;
00515         uint32_t dnDiff = SchedBase::instance().getTotalDownstream() - lastRecv;
00516         float upTimeDiff = (curTick - lastSentTime) / 1000.0f;
00517         float dnTimeDiff = (curTick - lastRecvTime) / 1000.0f;
00518         uint32_t avgu = static_cast<uint32_t>(upDiff / upTimeDiff);
00519         uint32_t avgd = static_cast<uint32_t>(dnDiff / dnTimeDiff);
00520         uint32_t conns = SchedBase::instance().getConnCount();
00521         uint32_t overheadDn = (ED2KPacket::getOverheadDn()-lastOverheadDn) / 10;
00522         uint32_t overheadUp = (ED2KPacket::getOverheadUp()-lastOverheadUp) / 10;
00523 
00524         boost::format fmt(
00525                 "[Statistics] "
00526                 "Sources: "  COL_GREEN  "%4d " COL_NONE
00527                 "| Queued: " COL_YELLOW "%4d " COL_NONE
00528                 "| Up: "     COL_BCYAN  "%9s/s " COL_NONE
00529                 "| Down: "   COL_BGREEN "%9s/s " COL_NONE
00530         );
00531 
00532         fmt % SourceInfo::count() % m_queue.size();
00533         fmt % bytesToString(avgu) % bytesToString(avgd);
00534 
00535         logMsg(fmt);
00536 
00537         // Write to statistics log
00538         boost::filesystem::path p(ED2K::instance().getConfigDir());
00539         p /= "../statistics.log";
00540         std::ofstream ofs(p.native_file_string().c_str(), std::ios::app);
00541         ofs << curTick << " [ED2KStatistics] ";
00542         ofs << SourceInfo::count() << ":" << m_queue.size() << ":";
00543         ofs << (avgu - overheadUp) << ":" << (avgd - overheadDn) << ":";
00544         ofs << conns << ":" << overheadUp << ":" << overheadDn << std::endl;
00545 
00546         // conditions for opening new slots
00547         uint32_t curUpSpeed = SchedBase::instance().getUpSpeed();
00548         bool openSlot = m_queue.size();
00549         openSlot &= curUpSpeed < SchedBase::instance().getUpLimit() * .9;
00550         openSlot &= avgu < SchedBase::instance().getUpLimit() * .85;
00551 
00552         if (openSlot) {
00553                 startNextUpload();
00554         }
00555 
00556         lastSent = SchedBase::instance().getTotalUpstream();
00557         lastRecv = SchedBase::instance().getTotalDownstream();
00558         lastSentTime = curTick;
00559         lastRecvTime = curTick;
00560         lastOverheadDn = ED2KPacket::getOverheadDn();
00561         lastOverheadUp = ED2KPacket::getOverheadUp();
00562 }
00563 
00564 bool ClientList::addSource(
00565         const Hash<ED2KHash> &h, IPV4Address caddr,
00566         IPV4Address saddr, bool doConn
00567 ) try {
00568         if (caddr.getIp() == ED2K::instance().getId()) {
00569                 if (caddr.getPort() == ED2K::instance().getTcpPort()) {
00570                         logDebug("Cowardly refusing to talk to myself.");
00571                         return false;
00572                 }
00573         }
00574 
00575         Client *c = findClient(caddr);
00576         Download *d = DownloadList::instance().find(h);
00577         if (c && d) {
00578                 c->addOffered(d, doConn);
00579                 c->setServerAddr(saddr);
00580                 return false;
00581         } else if (d) {
00582                 c = new Client(caddr, d);
00583                 c->setServerAddr(saddr);
00584                 m_clients->insert(c);
00585                 if (doConn) {
00586                         c->establishConnection();
00587                 } else {
00588                         // delay connection a bit
00589                         Client::getEventTable().postEvent(
00590                                 c, EVT_REASKFILEPING, 60*1000
00591                         );
00592                 }
00593                 return true;
00594         } else {
00595                 return false;
00596         }
00597 } catch (std::exception &e) {
00598         logDebug(boost::format("Error adding source: %s") % e.what());
00599         return false;
00600 }
00601 MSVC_ONLY(;)
00602 
00603 Client* ClientList::findClient(IPV4Address addr) {
00604         uint32_t id = addr.getIp();
00605         IDMap &list = m_clients->get<ID_Id>();
00606         std::pair<IIter, IIter> ret = list.equal_range(id);
00607         for (IIter i = ret.first; i != ret.second; ++i) {
00608                 CHECK_FAIL((*i)->getId() == addr.getIp());
00609                 if ((*i)->getTcpPort() == addr.getPort()) {
00610                         return *i;
00611                 }
00612         }
00613         return 0;
00614 }
00615 
00616 Client* ClientList::findClientByUdp(IPV4Address addr) {
00617         uint32_t id = addr.getIp();
00618         IDMap &list = m_clients->get<ID_Id>();
00619         std::pair<IIter, IIter> ret = list.equal_range(id);
00620         for (IIter i = ret.first; i != ret.second; ++i) {
00621                 CHECK_FAIL((*i)->getId() == addr.getIp());
00622                 if ((*i)->getUdpPort() == addr.getPort()) {
00623                         return *i;
00624                 }
00625         }
00626         return 0;
00627 }
00628 
00629 void ClientList::addClient(IPV4Address addr) {
00630         if (addr.getIp() == ED2K::instance().getId()) {
00631                 if (addr.getPort() == ED2K::instance().getTcpPort()) {
00632                         logDebug("Cowardly refusing to talk to myself.");
00633                         return;
00634                 }
00635         }
00636 
00637         Client *c = findClient(addr);
00638         if (c) {
00639                 c->establishConnection();
00640         } else {
00641                 ED2KClientSocket *sock = new ED2KClientSocket();
00642                 sock->connect(addr, CONNECT_TIMEOUT);
00643                 if (sock->isConnecting()) {
00644                         c = new Client(sock);
00645                         m_clients->insert(c);
00646                 } else {
00647                         delete sock;
00648                 }
00649         }
00650 }
00651 
00652 void ClientList::onUdpData(ED2KUDPSocket *src, SocketEvent evt) try {
00653         CHECK(src == Client::getUdpSocket());
00654         if (evt != SOCK_READ) {
00655                 return;
00656         }
00657 
00658         IPV4Address from;
00659         uint32_t amount = src->recv(m_udpBuffer.get(), UDP_BUFSIZE, &from);
00660         std::string buf(m_udpBuffer.get(), amount);
00661 
00662         if (buf.size() < 2) {
00663                 return;
00664         } else if (static_cast<uint8_t>(buf[0]) != PR_EMULE) {
00665                 logWarning(
00666                         boost::format(
00667                                 "Received unknown UDP packet: protocol=%s "
00668                                 "opcode=%s %s"
00669                         ) % Utils::hexDump(buf[0]) % Utils::hexDump(buf[1])
00670                         % Utils::hexDump(buf)
00671                 );
00672                 return;
00673         }
00674         Client *c = findClientByUdp(from);
00675         if (c == 0) {
00676                 return; // source is unknown - ignore this packet
00677         }
00678 
00679         std::istringstream packet(buf.substr(2));
00680         switch (static_cast<uint8_t>(buf[1])) {
00681                 case OP_REASKFILEPING: {
00682                         ED2KPacket::ReaskFilePing p(packet);
00683                         if (c) {
00684                                 c->onPacket(p);
00685                         }
00686                         break;
00687                 }
00688                 case OP_QUEUEFULL:
00689                         if (c) {
00690                                 ED2KPacket::QueueFull p(packet);
00691                                 c->onPacket(p);
00692                         }
00693                         break;
00694                 case OP_FILENOTFOUND:
00695                         if (c) {
00696                                 ED2KPacket::FileNotFound p(packet);
00697                                 c->onPacket(p);
00698                         }
00699                         break;
00700                 case OP_REASKACK:
00701                         if (c) {
00702                                 ED2KPacket::ReaskAck p(packet);
00703                                 c->onPacket(p);
00704                         }
00705                         break;
00706                 default:
00707                         logWarning(
00708                                 boost::format(
00709                                         "Received unknown UDP packet: "
00710                                         "protocol=%s opcode=%s %s"
00711                                 ) % Utils::hexDump(buf[0])
00712                                 % Utils::hexDump(buf[1]) % Utils::hexDump(buf)
00713                         );
00714                         break;
00715         }
00716 } catch (std::runtime_error &e) {
00717         logDebug(
00718                 boost::format("While parsing/handling ClientUDP packet: %s")
00719                 % e.what()
00720         );
00721 }