serverlist.cpp

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 /** @file serverlist.cpp Implementation of ServerList and Server classes */
00020 
00021 #include <hn/hnprec.h>
00022 #include <hn/utils.h>                      // Utils::hexDump
00023 #include <hn/log.h>                        // log functions
00024 #include <hn/ssocket.h>                    // Sockets ...
00025 #include <hn/fileslist.h>                  // FilesList et al
00026 #include <hn/sharedfile.h>                 // SharedFile
00027 #include <hn/partdata.h>                   // PartData
00028 #include <hn/metadb.h>                     // for debug selfcheck
00029 #include "serverlist.h"                    // Class interface
00030 #include "server.h"                        // Server class
00031 #include "ed2k.h"                          // ED2K class
00032 #include "clientlist.h"                    // needed during callback requests
00033 #include "parser.h"                        // ed2kparser
00034 #include "downloadlist.h"                  // Download / DownloadList
00035 #include <boost/lambda/bind.hpp>           // bind
00036 #include <boost/multi_index_container.hpp>
00037 #include <boost/multi_index/key_extractors.hpp>
00038 #include <boost/multi_index/ordered_index.hpp>
00039 
00040 /**
00041  * Constants used internally by ServerList
00042  */
00043 enum ED2K_ServerCommConstants {
00044         SOURCEREASKTIME = 20*60*1000, //!< UDP queries time, 20 minutes
00045         SERVERPINGTIME  = 20*60*1000, //!< Server ping time, 20 minutes
00046         SERVERTIMEOUT   = 30000       //!< Server connection timeout
00047 };
00048 
00049 const std::string TRACE = "Ed2k::ServerList";
00050 
00051 namespace Detail {
00052 
00053 //! ServerName extractor functor
00054 struct ServerNameExtractor {
00055         typedef std::string result_type;
00056         result_type operator()(const Server *const s) const {
00057                 return s->getName();
00058         }
00059 };
00060 
00061 //! ServerAddr extractor functor
00062 struct ServerAddrExtractor {
00063         typedef IPV4Address result_type;
00064         result_type operator()(const Server *const s) const {
00065                 return s->getAddr();
00066         }
00067 };
00068 
00069 //! last udp query time extractor functor
00070 struct QueryTimeExtractor {
00071         typedef uint64_t result_type;
00072         result_type operator()(const Server *const s) const {
00073                 return s->getLastUdpQuery();
00074         }
00075 };
00076 
00077 //! Server list structure, sorted by IP address and name
00078 struct ServerListIndices : boost::multi_index::indexed_by<
00079         boost::multi_index::ordered_unique<
00080                 boost::multi_index::identity<Server*>
00081         >,
00082         boost::multi_index::ordered_non_unique<ServerAddrExtractor>,
00083         boost::multi_index::ordered_non_unique<ServerNameExtractor>,
00084         boost::multi_index::ordered_non_unique<QueryTimeExtractor>
00085 > {};
00086 struct MIServerList : boost::multi_index_container<
00087         Server*, ServerListIndices
00088 > {};
00089 typedef MIServerList::nth_index<0>::type::iterator ServIter;
00090 typedef MIServerList::nth_index<1>::type::iterator AddrIter;
00091 typedef MIServerList::nth_index<2>::type::iterator NameIter;
00092 struct QTIter : MIServerList::nth_index<3>::type::iterator {
00093         template<typename T>
00094         QTIter(const T &t) : MIServerList::nth_index<3>::type::iterator(t) {}
00095 };
00096 
00097 } // namespace Detail
00098 
00099 using namespace Detail;
00100 
00101 // ServerList class
00102 // ----------------
00103 IMPLEMENT_EVENT_TABLE(ServerList, ServerList*, ServerList::ServerListEvent);
00104 DECLARE_PACKET_HANDLER(ServerList, ServerMessage);
00105 DECLARE_PACKET_HANDLER(ServerList, IdChange     );
00106 DECLARE_PACKET_HANDLER(ServerList, ServerStatus );
00107 DECLARE_PACKET_HANDLER(ServerList, ServerList   );
00108 DECLARE_PACKET_HANDLER(ServerList, ServerIdent  );
00109 DECLARE_PACKET_HANDLER(ServerList, SearchResult );
00110 DECLARE_PACKET_HANDLER(ServerList, CallbackReq  );
00111 DECLARE_PACKET_HANDLER(ServerList, FoundSources );
00112 
00113 ServerList::ServerList() : Object(&ED2K::instance(), "serverlist")
00114 , m_serverSocket(), m_currentServer(),
00115 m_parser(new ED2KParser<ServerList>(this)), m_status(),
00116 m_list(new MIServerList) {
00117         // Annouce we are capable of performing searches too
00118         Search::addQueryHandler(
00119                 boost::bind(&ServerList::performSearch, this, _1)
00120         );
00121 
00122         // Set up handlers for ALL SharedFile's events
00123         SharedFile::getEventTable().addAllHandler(
00124                 this, &ServerList::onSharedFileEvent
00125         );
00126 
00127         // used for delayed callback events
00128         getEventTable().addHandler(this, this, &ServerList::onServerListEvent);
00129         Server::getEventTable().addHandler(0, this, &ServerList::onServerEvent);
00130 }
00131 
00132 void ServerList::init() {
00133         using boost::bind;
00134 
00135         m_udpSocket = new ED2KUDPSocket;
00136 
00137         ED2KUDPSocket::getEventTable().addHandler(
00138                 m_udpSocket, this, &ServerList::onUdpData
00139         );
00140         m_udpSocket->listen(0, ED2K::instance().getTcpPort() + 3);
00141 
00142         logMsg(boost::format(
00143                 COL_BGREEN "ED2K ServerUDP listener " COL_NONE "started on "
00144                 "port " COL_BCYAN "%d" COL_NONE
00145         ) % (ED2K::instance().getTcpPort() + 3));
00146 
00147         Detail::foundServer.connect(bind(&ServerList::addServer, this, _1));
00148         DownloadList::instance().onAdded.connect(
00149                 bind(&ServerList::reqSources, this, _1)
00150         );
00151 
00152         connect();
00153         queryNextServer();
00154 }
00155 
00156 void ServerList::exit() {
00157         delete m_serverSocket;
00158         m_serverSocket = 0;
00159         m_udpSocket->destroy();
00160         m_udpSocket = 0;
00161         for (ServIter i = m_list->begin(); i != m_list->end(); ++i) {
00162                 delete *i;
00163         }
00164 }
00165 
00166 ServerList::~ServerList() {}
00167 
00168 void ServerList::load(const std::string &file) {
00169         std::ifstream ifs(file.c_str(), std::ios::binary);
00170         if (!ifs) {
00171                 // Can't open ...
00172                 if (m_list->size() == 0) {
00173                         // No servers in list.. add hardcoded ones.
00174                         addDefaultServers();
00175                         return;
00176                 }
00177         }
00178 
00179         uint8_t ver = Utils::getVal<uint8_t>(ifs);
00180         if (ver != ST_METVERSION && ver != ST_METHEADER) {
00181                 logError(
00182                         boost::format(
00183                                 "Corruption found in server list "
00184                                 "(invalid versiontag %s)"
00185                         ) % Utils::hexDump(ver)
00186                 );
00187                 return;
00188         }
00189 
00190         uint32_t count = Utils::getVal<uint32_t>(ifs);
00191         Utils::StopWatch t;
00192         while (count--) {
00193                 try {
00194                         Server *s = new Server(ifs);
00195                         AddrIter it = m_list->get<1>().find(s->getAddr());
00196                         if (it == m_list->get<1>().end()) {
00197                                 m_list->insert(s);
00198                         } else {
00199                                 logTrace(TRACE,
00200                                         "Ignoring duplicate server.met entry"
00201                                 );
00202                                 delete s;
00203                         }
00204                 } catch (std::exception &err) {
00205                         logWarning(
00206                                 boost::format(
00207                                         "Corruption found in server.met: %s"
00208                                 ) % err.what()
00209                         );
00210                         break;
00211                 }
00212         }
00213 
00214         if (m_list->size() == 0) {
00215                 addDefaultServers();
00216         }
00217 
00218         logMsg(
00219                 boost::format(
00220                         "ServerList loaded, %d servers are known (%dms)"
00221                 ) % m_list->size() % t
00222         );
00223 }
00224 
00225 void ServerList::save(const std::string &file) const {
00226         std::ofstream ofs(file.c_str(), std::ios::binary);
00227         if (!ofs) {
00228                 logError(
00229                         boost::format(
00230                                 "Unable to save server list (opening "
00231                                 "file %s failed)"
00232                         ) % file
00233                 );
00234                 return;
00235         }
00236 
00237         Utils::putVal<uint8_t>(ofs, ST_METVERSION);
00238         Utils::putVal<uint32_t>(ofs, m_list->size());
00239         Utils::StopWatch t;
00240 
00241         for (ServIter i = m_list->begin(); i != m_list->end(); ++i) {
00242                 ofs << *(*i);
00243         }
00244 
00245         logMsg(
00246                 boost::format("ServerList saved, %d servers written (%dms)")
00247                 % m_list->size() % t
00248         );
00249 }
00250 
00251 void ServerList::addDefaultServers() {
00252         IPV4Address razorback("195.245.244.243", 4661);
00253         if (m_list->get<1>().find(razorback) == m_list->get<1>().end()) {
00254                 Server *s = new Server(razorback);
00255                 m_list->insert(s);
00256         }
00257 
00258         IPV4Address donkeyserver("62.241.53.2", 4242);
00259         if (m_list->get<1>().find(donkeyserver) == m_list->get<1>().end()) {
00260                 Server *s = new Server(donkeyserver);
00261                 m_list->insert(s);
00262         }
00263 }
00264 
00265 void ServerList::connect() {
00266         if (!m_list->size()) {
00267                 addDefaultServers();
00268         }
00269         ServIter i = m_list->begin();
00270         if (m_currentServer) {
00271                 i = m_list->find(m_currentServer);
00272                 // increase by one, but only if not end-of-list
00273                 ServIter tmp = i;
00274                 if (++tmp != m_list->end()) {
00275                         i = tmp;
00276                 }
00277         }
00278         m_currentServer = 0;
00279         delete m_serverSocket;
00280         m_serverSocket = 0;
00281         m_status = ST_CONNECTING;
00282 
00283         // remove dead servers
00284         while (i != m_list->end() && (*i)->getFailedCount() > 2) {
00285                 logTrace(TRACE,
00286                         boost::format("Removing dead server %s (%s)")
00287                         % (*i)->getAddr() % (*i)->getName()
00288                 );
00289                 ServIter tmp = i++;
00290                 Server *serv = *tmp;
00291                 m_list->erase(tmp);
00292                 delete serv;
00293         }
00294 
00295         if (!m_list->size() || i == m_list->end()) {
00296                 return connect(); // re-try (re-adds default servers)
00297         }
00298 
00299         connect(*i);
00300 }
00301 
00302 void ServerList::connect(Server *s) {
00303         m_currentServer = s;
00304         delete m_serverSocket;
00305 
00306         m_serverSocket = new ED2KClientSocket();
00307         m_serverSocket->setHandler(this, &ServerList::onServerSocketEvent);
00308 
00309         IPV4Address addr = m_currentServer->getAddr();
00310         m_serverSocket->connect(addr, 3000);
00311 
00312         logMsg(boost::format("Connecting to %s") % addr);
00313         m_status = ST_CONNECTING;
00314 }
00315 
00316 void ServerList::onServerSocketEvent(ED2KClientSocket *c, SocketEvent evt) {
00317         CHECK_RET(m_currentServer);
00318         CHECK_THROW(c == m_serverSocket);
00319 
00320         switch (evt) {
00321                 case SOCK_CONNECTED:
00322                         logMsg(
00323                                 "[ed2k] Server connection established. Sending "
00324                                 "login request."
00325                         );
00326                         sendLoginRequest();
00327                         break;
00328                 case SOCK_READ: {
00329                         try {
00330                                 m_parser->parse(c->getData());
00331                         } catch (std::exception &er) {
00332                                 logDebug(
00333                                         boost::format(
00334                                                 "Error parsing Server "
00335                                                 "stream: %s"
00336                                         ) % er.what()
00337                                 );
00338                                 connect(); // Next server
00339                                 break;
00340                         }
00341 
00342                         break;
00343                 }
00344                 case SOCK_ERR:
00345                 case SOCK_LOST:
00346                 case SOCK_TIMEOUT:
00347                 case SOCK_CONNFAILED:
00348                         logTrace(TRACE, "Server connection lost.");
00349                         m_currentServer->addFailedCount();
00350                         delete m_serverSocket;
00351                         m_serverSocket = 0;
00352                         m_currentServer = 0;
00353 
00354                         // try again after a while
00355                         getEventTable().postEvent(this, EVT_CONNECT, 5000);
00356                         break;
00357                 case SOCK_WRITE:
00358                         break; // ignored
00359                 default:
00360                         logDebug(
00361                                 boost::format(
00362                                         "Unknown socket event %p in ServerList."
00363                                 ) % evt
00364                         );
00365                         break;
00366         }
00367 }
00368 
00369 void ServerList::sendLoginRequest() {
00370         CHECK_THROW(m_serverSocket);
00371         CHECK_THROW(m_serverSocket->isConnected());
00372         CHECK_THROW(m_currentServer);
00373 
00374         // not sure if this is the best place to reset it ...
00375         m_currentServer->setFailedCount(0);
00376 
00377         *m_serverSocket << ED2KPacket::LoginRequest();
00378 
00379         // Login timeout
00380         getEventTable().postEvent(this, EVT_LOGINTIMEOUT, SERVERTIMEOUT);
00381 }
00382 
00383 void ServerList::publishFiles(bool useZlib) {
00384         CHECK_THROW(m_serverSocket);
00385         CHECK_THROW(m_serverSocket->isConnected());
00386 
00387         ED2KPacket::OfferFiles packet(useZlib ? PR_ZLIB : PR_ED2K);
00388 
00389         uint32_t cnt = 0;         // Don't publish over 300 files
00390         FilesList::CSFIter i = FilesList::instance().begin();
00391         for (; i != FilesList::instance().end() && cnt <= 300; ++i, ++cnt) {
00392                 SharedFile *sf = *i;
00393 
00394                 // Only a partial solution, but avoids publishing files which
00395                 // have less than 9500kb downloaded. To be 100% correct, we
00396                 // should check here if the file has at least one complete chunk
00397                 if (sf->isPartial()) {
00398                         if (sf->getPartData()->getCompleted() < ED2K_PARTSIZE) {
00399                                 continue;
00400                         }
00401                 }
00402 
00403                 MetaData *md = sf->getMetaData();
00404                 if (md == 0) {
00405                         continue; // No metadata - can't do anything
00406                 }
00407                 if (sf->getSize() > std::numeric_limits<uint32_t>::max()) {
00408                         // It's larger than we can support on
00409                         // ed2k (32-bit integer - 4gb) :(
00410                         continue;
00411                 }
00412 
00413                 for (uint32_t j = 0; j < md->getHashSetCount(); ++j) {
00414                         HashSetBase* hs = md->getHashSet(j);
00415                         if (hs->getFileHashTypeId() == CGComm::OP_HT_ED2K) {
00416                                 packet.push(makeED2KFile(sf, md, hs, useZlib));
00417                         }
00418                 }
00419                 // FilesList/MetaDb integrity self-check
00420                 CHECK(MetaDb::instance().findSharedFile(sf->getName()).size()>0);
00421         }
00422 
00423         if (cnt) { // only send if we have smth to offer
00424                 *m_serverSocket << packet;
00425         }
00426 }
00427 
00428 void ServerList::performSearch(SearchPtr search) {
00429         logTrace(TRACE, "[ed2k] ServerList::performSearch");
00430 
00431         if (!(m_status & ST_CONNECTED)) {
00432                 logError(
00433                         "Unable to perform a search at this time: not "
00434                         "connected to a server."
00435                 );
00436         } else {
00437                 *m_serverSocket << ED2KPacket::Search(search);
00438                 m_curSearch = search;
00439         }
00440 }
00441 
00442 // Implement packet handlers
00443 void ServerList::onPacket(const ED2KPacket::ServerMessage &p) {
00444         std::string msg = p.getMsg();
00445         while (msg.size() && *--msg.end() == '\n') {
00446                 msg.erase(--msg.end()); // erase trailing empty lines
00447         }
00448 
00449         if (msg.substr(0, 8) == "ERROR : ") {
00450                 logError("[Ed2kServer] " + msg.substr(8));
00451                 connect();
00452         } else if (msg.substr(0, 10) == "WARNING : ") {
00453                 logWarning("[Ed2kServer] " + msg.substr(10));
00454         } else {
00455                 logMsg("[Ed2kServer] " + msg);
00456         }
00457 }
00458 
00459 void ServerList::onPacket(const ED2KPacket::IdChange &p) {
00460         CHECK_THROW_MSG(m_currentServer, "Received IdChange, but from where?");
00461 
00462         logMsg("Received ID change packet.");
00463         logMsg(
00464                 boost::format("New ID: %d (%s)") % p.getId()
00465                 % (p.getId() > 0x00ffffff ? "high" : "low")
00466         );
00467 
00468         ED2K::instance().setId(p.getId());
00469         m_currentServer->setTcpFlags(p.getFlags());
00470 
00471         if (m_status != ST_CONNECTED) {
00472                 m_status = ST_CONNECTED;
00473                 logMsg("We are now connected to eDonkey2000 network.");
00474                 logMsg(
00475                         boost::format("Server: %s(%s)")
00476                         % m_currentServer->getName()
00477                         % m_currentServer->getAddr()
00478                 );
00479                 // Publish our shared files
00480                 publishFiles(m_currentServer->getTcpFlags() & FL_ZLIB);
00481                 m_lastSourceRequest = 0;
00482                 reqSources      ();
00483                 getEventTable().postEvent(
00484                         this, EVT_PINGSERVER, SERVERPINGTIME
00485                 );
00486                 *m_serverSocket << ED2KPacket::GetServerList();
00487         }
00488 }
00489 
00490 void ServerList::onPacket(const ED2KPacket::ServerStatus &p) {
00491         boost::format fmt(
00492                 "Received server status update: Users: " COL_BCYAN "%d"
00493                 COL_NONE " Files: " COL_BGREEN "%d" COL_NONE
00494         );
00495         logMsg(fmt % p.getUsers() % p.getFiles());
00496         CHECK_THROW(m_currentServer);
00497 
00498         m_currentServer->setUsers(p.getUsers());
00499         m_currentServer->setFiles(p.getFiles());
00500 }
00501 
00502 void ServerList::onPacket(const ED2KPacket::ServerIdent &p) {
00503         logMsg("Received server ident:");
00504         logMsg(boost::format("Hash: %s") % p.getHash().decode());
00505         logMsg(boost::format("Addr: %s") % p.getAddr().getStr());
00506         logMsg(boost::format("Name: %s") % p.getName());
00507         logMsg(boost::format("Desc: %s") % p.getDesc());
00508 
00509         CHECK_THROW(m_currentServer);
00510         ServIter it = m_list->find(m_currentServer);
00511         CHECK_THROW(it != m_list->end());
00512 
00513         m_list->modify(it, bind(&Server::setName, __1, p.getName()));
00514         m_list->modify(it, bind(&Server::setDesc, __1, p.getDesc()));
00515 }
00516 
00517 void ServerList::onPacket(const ED2KPacket::ServerList &p) {
00518         uint32_t added = 0;
00519 
00520         for (uint32_t i = 0; i < p.getCount(); ++i) {
00521                 AddrIter it = m_list->get<1>().find(p.getServer(i));
00522                 if (it == m_list->get<1>().end()) {
00523                         Server *s = new Server(p.getServer(i));
00524                         m_list->insert(s);
00525                         ++added;
00526                 }
00527         }
00528 
00529         logMsg(
00530                 boost::format("Received %d new servers (and %d duplicates)")
00531                 % added % (p.getCount() - added)
00532         );
00533 }
00534 
00535 void ServerList::onPacket(const ED2KPacket::SearchResult &p) {
00536         logMsg(
00537                 boost::format("Received %d search results from server.")
00538                 % p.getCount()
00539         );
00540         CHECK_THROW_MSG(m_curSearch, "Search results, but no search pending?");
00541 
00542         for (uint32_t i = 0; i < p.getCount(); ++i) {
00543                 m_curSearch->addResult(p.getResult(i));
00544         }
00545 
00546         if (p.getCount()) {
00547                 m_curSearch->notifyResults();
00548         }
00549 }
00550 
00551 void ServerList::onPacket(const ED2KPacket::CallbackReq &p) {
00552         logTrace(TRACE,
00553                 boost::format("Received callback request to %s") % p.getAddr()
00554         );
00555         ClientList::instance().addClient(p.getAddr());
00556 }
00557 
00558 void ServerList::onPacket(const ED2KPacket::FoundSources &p) {
00559         CHECK_THROW(m_currentServer);
00560 
00561         uint32_t connLimit = SchedBase::instance().getConnLimit();
00562         uint32_t connCnt = SchedBase::instance().getConnCount();
00563         connCnt += SchedBase::instance().getConnReqCount();
00564         bool doConn = false;
00565         uint32_t cnt = 0;
00566 
00567         for (uint32_t i = 0; i < p.getCount(); ++i) {
00568                 doConn = connCnt + p.getLowCount() < connLimit;
00569                 cnt += Detail::foundSource(
00570                         p.getHash(), p.getSource(i),
00571                         m_currentServer->getAddr(), doConn
00572                 );
00573                 connCnt += doConn;
00574         }
00575 
00576         logDebug(
00577                 boost::format(
00578                         "Received %d new sources from server "
00579                         "(and %d duplicates)"
00580                 ) % cnt % (p.getCount() - cnt)
00581         );
00582 }
00583 
00584 void ServerList::reqCallback(uint32_t id) {
00585         if (ED2K::instance().getId() < 0x00ffffff) {
00586                 throw std::runtime_error("Cannot do lowid <-> lowid callback!");
00587         }
00588 
00589         CHECK_THROW(m_serverSocket);
00590         CHECK_THROW(m_serverSocket->isConnected());
00591 
00592         *m_serverSocket << ED2KPacket::ReqCallback(id);
00593 }
00594 
00595 void ServerList::onSharedFileEvent(SharedFile *sf, int evt) {
00596         if (!m_serverSocket) {
00597                 return;
00598         } else if (!m_serverSocket->isConnected()) {
00599                 return;
00600         }
00601 
00602         if (evt == SF_METADATA_ADDED) {
00603                 if (!sf->getMetaData()) {
00604                         return; // can't publish w/o metadata
00605                 }
00606 
00607                 if (!sf->getMetaData()->getHashSetCount()) {
00608                         return; // can't publish w/o hashes
00609                 }
00610 
00611                 // Only a partial solution, but avoids publishing files which
00612                 // have less than 9500kb downloaded. To be 100% correct, we
00613                 // should check here if the file has at least one complete chunk
00614                 if (sf->isPartial()) {
00615                         if (sf->getPartData()->getCompleted() < ED2K_PARTSIZE) {
00616                                 return;
00617                         }
00618                 }
00619 
00620                 publishFile(sf);
00621         }
00622 }
00623 
00624 void ServerList::publishFile(SharedFile *sf) {
00625         CHECK_THROW(m_serverSocket);
00626         CHECK_THROW(m_serverSocket->isConnected());
00627         CHECK_THROW(sf->getMetaData());
00628         CHECK_THROW(sf->getMetaData()->getHashSetCount());
00629 
00630         logTrace(TRACE, boost::format("Publishing file %s") % sf->getName());
00631 
00632         MetaData *md = sf->getMetaData();
00633         for (uint32_t i = 0; i < md->getHashSetCount(); ++i) {
00634                 HashSetBase *hs = md->getHashSet(i);
00635                 if (hs->getFileHashTypeId() == CGComm::OP_HT_ED2K) {
00636                         bool useZlib = m_currentServer->getTcpFlags() & FL_ZLIB;
00637                         *m_serverSocket << ED2KPacket::OfferFiles(
00638                                 makeED2KFile(sf, md, hs, useZlib),
00639                                 useZlib ? PR_ZLIB : PR_ED2K
00640                         );
00641                         break;
00642                 }
00643         }
00644 }
00645 
00646 void ServerList::reqSources(Download &d) {
00647         if (m_serverSocket && m_serverSocket->isConnected()) {
00648                 *m_serverSocket << ED2KPacket::ReqSources(d.getHash());
00649                 logTrace(TRACE,
00650                         boost::format("Requesting sources for %s")
00651                         % d.getPartData()->getDestination().leaf()
00652                 );
00653         }
00654 }
00655 
00656 /**
00657  * Scan files list, skipping all entries that are not partial, do not have
00658  * MetaData or do not have hashes, and request sources for the rest of them.
00659  */
00660 void ServerList::reqSources() {
00661         CHECK_THROW(m_serverSocket);
00662         CHECK_THROW(m_serverSocket->isConnected());
00663 
00664         if (m_lastSourceRequest + SOURCEREASKTIME > Utils::getTick()) {
00665                 logTrace(TRACE, "Delaying Source Reask.");
00666                 return; // don't reask  not just yet
00667         }
00668 
00669         DownloadList& list = DownloadList::instance();
00670         for (DownloadList::Iter it = list.begin(); it != list.end(); ++it) {
00671                 reqSources(*it);
00672         }
00673 
00674         m_lastSourceRequest = Utils::getTick();
00675         getEventTable().postEvent(this, EVT_REQSOURCES, SOURCEREASKTIME);
00676 }
00677 
00678 void ServerList::onServerListEvent(ServerList *sl, ServerListEvent evt) {
00679         CHECK_THROW(sl == this);
00680 
00681         if (evt == EVT_REQSOURCES) {
00682                 if (m_serverSocket && m_serverSocket->isConnected()) {
00683                         reqSources();
00684                 }
00685         } else if (evt == EVT_PINGSERVER) {
00686                 if (m_serverSocket && m_serverSocket->isConnected()) {
00687                         logTrace(TRACE, "Pinging server with OfferFiles...");
00688                         *m_serverSocket << ED2KPacket::OfferFiles();
00689                 }
00690         } else if (evt == EVT_LOGINTIMEOUT) {
00691                 if (!(m_status & ST_CONNECTED)) {
00692                         logMsg("[ed2k] Server login attempt timed out.");
00693                         if (m_currentServer) {
00694                                 m_currentServer->addFailedCount();
00695                         }
00696                         connect();
00697                 }
00698         } else if (evt == EVT_CONNECT) {
00699                 connect();
00700         } else if (evt == EVT_QUERYSERVER) {
00701                 queryNextServer();
00702         } else {
00703                 logWarning("Unknown ServerList event.");
00704         }
00705 }
00706 
00707 void ServerList::onUdpData(ED2KUDPSocket *sock, SocketEvent evt) try {
00708         if (evt != SOCK_READ) {
00709                 logWarning("ServerList: Unknown UDP socket event.");
00710                 return;
00711         }
00712         char tmp[1024];
00713         IPV4Address from;
00714         uint32_t amount = sock->recv(tmp, 1024, &from);
00715         CHECK_RET(amount > 2);
00716 
00717         std::istringstream data(std::string(tmp, amount));
00718         uint8_t prot = Utils::getVal<uint8_t>(data);
00719         uint8_t opcode = Utils::getVal<uint8_t>(data);
00720 
00721         CHECK_RET(prot == PR_ED2K);
00722 
00723         if (opcode == OP_GLOBFOUNDSOURCES) {
00724                 handleGlobSources(data, from);
00725         } else if (opcode == OP_GLOBSTATRES) {
00726                 handleGlobStatRes(data, from);
00727         } else {
00728                 logWarning(
00729                         boost::format(
00730                                 "Received unknown Server UDP data from %s: "
00731                                 "protocol=%s opcode=%s data:%s"
00732                         ) % from % Utils::hexDump(prot) % Utils::hexDump(opcode)
00733                         % Utils::hexDump(data.str().substr(3))
00734                 );
00735         }
00736 } catch (SocketError &e) {
00737         logDebug(std::string("Reading from UDP socket: ") + e.what());
00738 }
00739 
00740 void ServerList::handleGlobSources(std::istringstream &i, IPV4Address from) try{
00741         ED2KPacket::GlobFoundSources p(i);
00742         uint32_t total = p.size();
00743         uint32_t added = 0;
00744 
00745         ED2KPacket::GlobFoundSources::Iter it = p.begin();
00746         while (it != p.end()) {
00747                 added += Detail::foundSource(
00748                         p.getHash(), *it++, IPV4Address(), true
00749                 );
00750         }
00751 
00752         logDebug(
00753                 boost::format(
00754                         "GlobGetSources: Received %d sources for hash "
00755                         "%s from %s (and %d duplicates)."
00756                 ) % added % p.getHash().decode() % from
00757                 % (total - added)
00758         );
00759         logTrace(TRACE,
00760                 boost::format("GlobGetSources: tellg()=%d size=%d")
00761                 % i.tellg() % i.str().size()
00762         );
00763         if (static_cast<size_t>(i.tellg()) < i.str().size() - 1) {
00764                 uint8_t prot = Utils::getVal<uint8_t>(i);
00765                 uint8_t opcode = Utils::getVal<uint8_t>(i);
00766                 if (prot == PR_ED2K && opcode == OP_GLOBFOUNDSOURCES) {
00767                         handleGlobSources(i, from);
00768                 } else {
00769                         logDebug(
00770                                 boost::format(
00771                                         "Extra data in GlobFoundSources frame:%s"
00772                                 ) % Utils::hexDump(i.str().substr(i.tellg()))
00773                         );
00774                 }
00775         }
00776 } catch (Utils::ReadError&) {
00777         logDebug("Unexpected EOF found while parsing GlobFoundSources packet.");
00778 } catch (std::exception &e) {
00779         logDebug(
00780                 boost::format(
00781                         "Unexpected exception while handling GlobFoundSources "
00782                         "packet: %s"
00783                 ) % e.what()
00784         );
00785 }
00786 MSVC_ONLY(;)
00787 void ServerList::handleGlobStatRes(std::istringstream &i, IPV4Address from) try{
00788         ED2KPacket::GlobStatRes packet(i);
00789         from.setPort(from.getPort() - 4); // to get the TCP port
00790 
00791         AddrIter it = m_list->get<1>().find(from);
00792         if (it == m_list->get<1>().end()) {
00793                 logTrace(TRACE,
00794                         boost::format("Passivly adding server %s.")
00795                         % from
00796                 );
00797                 Server *s = new Server(from);
00798                 it = m_list->get<1>().insert(s).first;
00799         }
00800 
00801         // ignore packets with wrong challenge
00802         CHECK_RET(packet.getChallenge() == (*it)->getChallenge());
00803 
00804         // prevent's ping-time showing enormously large pings
00805         if (!(*it)->pingInProgress()) {
00806                 (*it)->setPing(0);
00807         } else {
00808                 (*it)->setPingInProgress(false);
00809                 (*it)->setPing(Utils::getTick() - (*it)->getLastUdpQuery());
00810         }
00811 
00812         (*it)->setUsers(packet.getUsers());
00813         (*it)->setFiles(packet.getFiles());
00814         (*it)->setMaxUsers(packet.getMaxUsers());
00815         (*it)->setSoftLimit(packet.getSoftLimit());
00816         (*it)->setHardLimit(packet.getHardLimit());
00817         (*it)->setUdpFlags(packet.getUdpFlags());
00818         (*it)->setLowIdUsers(packet.getLowIdUsers());
00819 
00820         (*it)->setFailedCount(0);
00821 
00822         boost::format fmt(
00823                 "[%s] Received GlobStatRes: Users=%d Files=%d "
00824                 "LowUsers=%d MaxUsers=%d SoftLimit=%d HardLimit=%d "
00825                 "UdpFlags=%s Ping: %dms"
00826         );
00827         fmt % (*it)->getAddr() % (*it)->getUsers() % (*it)->getFiles();
00828         fmt % (*it)->getLowIdUsers() % (*it)->getMaxUsers();
00829         fmt % (*it)->getSoftLimit() % (*it)->getHardLimit();
00830         fmt % Utils::hexDump((*it)->getUdpFlags()) % (*it)->getPing();
00831         logTrace(TRACE, fmt);
00832 
00833         QTIter tmp(m_list->project<3>(it));
00834         udpGetSources(tmp);
00835 } catch (Utils::ReadError&) {
00836         logDebug("Unexpected EOF while parsing GlobStatRes packet.");
00837 } catch (std::exception &e) {
00838         logDebug(
00839                 boost::format(
00840                         "Unexpected exception while handling "
00841                         "GlobStatRes packet: %s"
00842                 ) % e.what()
00843         );
00844 }
00845 MSVC_ONLY(;)
00846 
00847 void ServerList::queryNextServer() {
00848         // no servers
00849         if (!m_list->size()) {
00850                 return;
00851         }
00852 
00853         MIServerList::nth_index<3>::type &list = m_list->get<3>();
00854         QTIter it = list.lower_bound(0);
00855 
00856         while (it != list.end()) {
00857                 if (!(*it)->getAddr().getIp() || !(*it)->getAddr().getPort()) {
00858                         ++it; // null ip or port
00859                 } else {
00860                         break; // found valid one
00861                 }
00862         }
00863         CHECK_RET(it != list.end());
00864 
00865         uint64_t lastQuery = (*it)->getLastUdpQuery();
00866         uint64_t curTick = Utils::getTick();
00867 
00868         if (lastQuery + SOURCEREASKTIME > curTick) {
00869                 uint64_t timeDiff = lastQuery + SOURCEREASKTIME - curTick;
00870                 getEventTable().postEvent(this, EVT_QUERYSERVER, timeDiff);
00871                 logTrace(TRACE,
00872                         "Delaying UDPServerQuery: lastQuery was less "
00873                         "than 20 minutes ago."
00874                 );
00875                 return; // query is delayed
00876         }
00877 
00878         QTIter tmp(it);
00879         pingServer(tmp);
00880 
00881         m_list->get<3>().modify(
00882                 it, bind(&Server::setLastUdpQuery, __1, curTick)
00883         );
00884 
00885         // schedule next query
00886         it = list.lower_bound(0);
00887         uint64_t nextQuery = (*it)->getLastUdpQuery() + SOURCEREASKTIME;
00888         if (nextQuery < curTick) {
00889                 nextQuery = curTick + SOURCEREASKTIME / m_list->size();
00890         }
00891         getEventTable().postEvent(this, EVT_QUERYSERVER, nextQuery-curTick);
00892         logTrace(TRACE,
00893                 boost::format("Next GlobGetSources scheduled to %.2fs")
00894                 % ((nextQuery - curTick) / 1000.0)
00895         );
00896 }
00897 
00898 void ServerList::pingServer(QTIter &it) try {
00899         CHECK_THROW(it != m_list->get<3>().end());
00900 
00901         IPV4Address to((*it)->getAddr().getIp(), (*it)->getUdpPort());
00902         ED2KPacket::GlobStatReq packet;
00903         m_udpSocket->send(packet, to);
00904 
00905         (*it)->setPingInProgress(true);
00906         (*it)->setChallenge(packet.getChallenge());
00907 
00908         // 10sec ping timeout
00909         Server::getEventTable().postEvent(*it, EVT_PINGTIMEOUT, 10*1000);
00910 
00911         logTrace(TRACE, boost::format("[%s] Sending GlobStatReq.") % to);
00912 } catch (SocketError &e) {
00913         logTrace(TRACE,
00914                 boost::format("[%s] Fatal error sending UDP Ping: %s")
00915                 % (*it)->getAddr() % e.what()
00916         );
00917 }
00918 MSVC_ONLY(;)
00919 
00920 void ServerList::udpGetSources(QTIter &it) try {
00921         CHECK_THROW(it != m_list->get<3>().end());
00922 
00923         uint32_t limit = 1; // how many requests to do
00924         IPV4Address to((*it)->getAddr().getIp(), (*it)->getUdpPort());
00925         bool sendSize = (*it)->getUdpFlags() & Server::FL_GETSOURCES2;
00926 
00927         if ((*it)->getUdpFlags() & Server::FL_GETSOURCES) {
00928                 // Server accepts 512-byte UDP packets, so when sending size,
00929                 // max count is 25 (25*20+2), w/o size it's 31 (31*16+2)
00930                 limit = sendSize ? 25 : 31;
00931         }
00932 
00933         uint32_t cnt = 0;
00934         ED2KPacket::GlobGetSources packet(sendSize);
00935         while (cnt <= limit) {
00936                 Download *d = DownloadList::instance().getNextForUdpQuery();
00937                 if (d) {
00938                         packet.addHash(d->getHash(), d->getSize());
00939                         ++cnt;
00940                 } else {
00941                         break;
00942                 }
00943         }
00944 
00945         if (!cnt) {
00946                 return; // no files queried ...
00947         }
00948 
00949         boost::format fmt("[%s] Sending GlobGetSources %s");
00950         fmt % to;
00951         if ((*it)->getUdpFlags() & Server::FL_GETSOURCES2) {
00952                 fmt % "(NewFormat)";
00953         } else if ((*it)->getUdpFlags() & Server::FL_GETSOURCES) {
00954                 fmt % "(ManyFiles)";
00955         } else {
00956                 fmt % "";
00957         }
00958         logTrace(TRACE, fmt);
00959 
00960         m_udpSocket->send(packet, to);
00961 } catch (SocketError &) {
00962         logTrace(TRACE,
00963                 boost::format("[%s] Fatal error sending UDPGetSources: %s")
00964                 % (*it)->getAddr()
00965         );
00966 }
00967 MSVC_ONLY(;)
00968 
00969 void ServerList::onServerEvent(Server *s, int evt) {
00970         if (evt != EVT_PINGTIMEOUT) {
00971                 logWarning("Unknown event in ServerList::onServerEvent");
00972                 return;
00973         }
00974 
00975         if (m_list->find(s) == m_list->end()) {
00976                 return; // invalid, outdated pointer
00977         } else if (s == m_currentServer) {
00978                 return; // ping timeouts/failcounts not applied to curserver
00979         }
00980 
00981         if (s->pingInProgress()) {
00982                 logTrace(TRACE,
00983                         boost::format("[%s] GlobStatReq: Ping timed out.")
00984                         % s->getAddr()
00985                 );
00986                 s->setPingInProgress(false);
00987                 s->addFailedCount();
00988                 if (s->getFailedCount() > 2) {
00989                         logTrace(TRACE,
00990                                 boost::format(
00991                                         "[%s] GlobStatReq: Removing dead server"
00992                                         " (3 ping timeouts)."
00993                                 ) % s->getAddr()
00994                         );
00995                         m_list->erase(s);
00996                         delete s;
00997                 }
00998         }
00999 }
01000 
01001 IPV4Address ServerList::getCurServerAddr() const {
01002         if (m_currentServer == 0) {
01003                 throw std::runtime_error("Not connected.");
01004         }
01005         return m_currentServer->getAddr();
01006 }
01007 
01008 void ServerList::addServer(IPV4Address srv) {
01009         AddrIter it = m_list->get<1>().find(srv);
01010         if (it == m_list->get<1>().end()) {
01011                 Server *s = new Server(srv);
01012                 m_list->insert(s);
01013         }
01014 }
01015 
01016 uint8_t ServerList::getOperCount() const { return 1; }
01017 
01018 Object::Operation ServerList::getOper(uint8_t n) const {
01019         CHECK_THROW(n == 0);
01020 
01021         Operation op("connect", true);
01022         op.addArg(Operation::Argument("server", true, ODT_STRING));
01023 
01024         return op;
01025 }
01026 
01027 void ServerList::doOper(const Object::Operation &op) {
01028         CHECK_THROW(op.getName() == "connect");
01029 
01030         std::string name = op.getArg("server").getValue();
01031         std::pair<NameIter, NameIter> i = m_list->get<2>().equal_range(name);
01032         if (std::distance(i.first, i.second) > 1) {
01033                 throw std::runtime_error("Ambiguous server name.");
01034         } else if (i.first == m_list->get<2>().end()) {
01035                 throw std::runtime_error("No such server.");
01036         } else {
01037                 connect(*i.first);
01038         }
01039 }