clientext.cpp

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 /** \file clientext.cpp Implementation of Client class extensions */
00020 
00021 #include <hn/hnprec.h>
00022 #include "clientext.h"
00023 #include <hn/partdata.h>
00024 #include <hn/sharedfile.h>
00025 #include <hn/metadb.h>
00026 #include <hn/metadata.h>
00027 #include "clients.h"
00028 #include "zutils.h"
00029 
00030 namespace Detail {
00031 const std::string TRACE_CLIENT = "Ed2kClient";
00032 
00033 /**
00034  * How much data to send to a single client during an upload session.
00035  * Current ED2K clients seem to use ED2K_PARTSIZE here, which means
00036  * 9.28mb, so that's what we should use.
00037  *
00038  * If you want faster Queue rotation, lowering this value would give the
00039  * desired result. It is recommended to keep this value at least in
00040  * n*ED2K_CHUNKSIZE, to promote full chunks sending.
00041  */
00042 const uint32_t SEND_TO_ONE_CLIENT = ED2K_PARTSIZE;
00043 
00044 //! \name Object counters
00045 //!@{
00046 size_t s_queueInfoCnt = 0;
00047 size_t s_uploadInfoCnt = 0;
00048 size_t s_downloadInfoCnt = 0;
00049 size_t s_sourceInfoCnt = 0;
00050 //!@}
00051 
00052 // QueueInfo class
00053 // ---------------
00054 QueueInfo::QueueInfo(Client *parent, SharedFile *req, const Hash<ED2KHash> &h)
00055 : ClientExtBase(parent), m_queueRanking(), m_reqFile(req), m_reqHash(h),
00056 m_waitStartTime(Utils::getTick()), m_lastQueueReask() {
00057         ++s_queueInfoCnt;
00058 }
00059 
00060 QueueInfo::QueueInfo(Client *parent, UploadInfoPtr nfo) : ClientExtBase(parent),
00061 m_queueRanking(), m_reqFile(nfo->getReqFile()), m_reqHash(nfo->getReqHash()),
00062 m_waitStartTime(Utils::getTick()), m_lastQueueReask() {
00063         ++s_queueInfoCnt;
00064 }
00065 
00066 QueueInfo::~QueueInfo() {
00067         --s_queueInfoCnt;
00068 }
00069 
00070 size_t QueueInfo::count() {
00071         return s_queueInfoCnt;
00072 }
00073 
00074 // UploadInfo class
00075 // ------------------
00076 UploadInfo::UploadInfo(Client *parent, QueueInfoPtr nfo)
00077 : ClientExtBase(parent), m_reqFile(nfo->getReqFile()),
00078 m_reqHash(nfo->getReqHash()), m_curPos(), m_endPos(), m_compressed(false),
00079 m_sent() {
00080         CHECK_THROW(m_reqFile);
00081         CHECK_THROW(m_reqHash);
00082         CHECK_THROW(MetaDb::instance().findSharedFile(m_reqHash) == m_reqFile);
00083         ++s_uploadInfoCnt;
00084 }
00085 
00086 // Dummy destructor
00087 UploadInfo::~UploadInfo() {
00088         logTrace(TRACE_CLIENT,
00089                 boost::format("[%s] UploadSessionEnd: Total sent: %d bytes")
00090                 % m_parent->getIpPort() % m_sent
00091         );
00092         --s_uploadInfoCnt;
00093 }
00094 
00095 size_t UploadInfo::count() {
00096         return s_uploadInfoCnt;
00097 }
00098 
00099 // buffer first data chunk into memory
00100 void UploadInfo::bufferData() {
00101         CHECK_THROW(m_reqFile);
00102         CHECK_THROW(m_reqChunks.size());
00103 
00104         m_curPos = m_reqChunks.front().begin();
00105         m_endPos = m_reqChunks.front().end();
00106         m_buffer = m_reqFile->read(m_curPos, m_endPos);
00107 
00108         logTrace(TRACE_CLIENT,
00109                 boost::format("[%s] Buffering upload data (%d..%d, size=%d)") %
00110                 m_parent->getIpPort() % m_curPos % m_endPos % m_buffer.size()
00111         );
00112         CHECK_THROW_MSG(m_buffer.size(), "UploadBuffering failed!");
00113 }
00114 
00115 // Compress the upload buffer
00116 bool UploadInfo::compress() {
00117         CHECK_THROW(m_buffer.size());
00118 
00119         Utils::StopWatch s;
00120         std::string tmp = Zlib::compress(m_buffer);
00121         logTrace(TRACE_CLIENT, boost::format("Data compression took %dms.") %s);
00122 
00123         if (tmp.size() < m_buffer.size()) {
00124                 m_buffer = tmp;
00125                 m_compressed = true;
00126                 m_endPos = m_buffer.size();
00127                 return true;
00128         } else {
00129                 m_compressed = false;
00130                 return false;
00131         }
00132 }
00133 
00134 // retrieve next chunks
00135 boost::tuple<uint32_t, uint32_t, std::string>
00136 UploadInfo::getNext(uint32_t amount) {
00137         if (m_buffer.size() < amount) {
00138                 amount = m_buffer.size();
00139         }
00140 
00141         boost::tuple<uint32_t, uint32_t, std::string> ret;
00142         ret.get<0>() = m_curPos;
00143         if (m_compressed) {
00144                 ret.get<1>() = m_reqChunks.front().length();
00145         } else {
00146                 ret.get<1>() = m_curPos + amount;
00147         }
00148         ret.get<2>() = m_buffer.substr(0, amount);
00149 
00150         m_buffer.erase(0, amount);
00151         m_curPos += amount;
00152 
00153         if (m_buffer.size() == 0 && m_reqChunks.size()) {
00154                 m_reqChunks.pop_front();
00155         }
00156 
00157         m_sent += amount; // update sent count
00158 
00159         return ret;
00160 }
00161 
00162 // Add a new requested chunk (but only if we don't have this in our list
00163 // already).
00164 //
00165 // Since we only want to send 9.28mb to each client at a time, requests for
00166 // chunks that would exceed that amount will be denied. The result is that
00167 // in reality, we, in worst-case scenario, 9710001 bytes to a client instead of
00168 // full 9728000 chunk.
00169 void UploadInfo::addReqChunk(Range32 r) {
00170         std::list<Range32>::iterator i = m_reqChunks.begin();
00171 
00172         uint32_t requested = 0;
00173         while (i != m_reqChunks.end()) {
00174                 if ((*i) == r) {
00175                         return; // already here
00176                 }
00177                 requested += (*i++).length();
00178         }
00179 
00180         if (m_sent + requested + r.length() <= SEND_TO_ONE_CLIENT) {
00181                 logTrace(TRACE_CLIENT,
00182                         boost::format("Adding chunk request %d..%d")
00183                         % r.begin() % r.end()
00184                 );
00185                 m_reqChunks.push_back(r);
00186         }
00187 }
00188 
00189 // SourceInfo class
00190 // ----------------
00191 SourceInfo::SourceInfo(Client *parent, Download *file) :
00192 ClientExtBase(parent), m_reqFile(file), m_qr(), m_needParts(), m_lastSrcExch() {
00193         addOffered(file);
00194         ++s_sourceInfoCnt;
00195 }
00196 
00197 SourceInfo::~SourceInfo() {
00198         typedef std::set<Download*>::iterator Iter;
00199         for (Iter i = m_offered.begin(); i != m_offered.end(); ++i) {
00200                 if (!DownloadList::instance().valid(*i)) {
00201                         continue;
00202                 }
00203                 PartData *pd = (*i)->getPartData();
00204                 if (pd == m_reqFile->getPartData() && m_partMap) {
00205                         try {
00206                                 pd->delSourceMask(ED2K_PARTSIZE, *m_partMap);
00207                         } catch (std::exception &e) {
00208                                 logDebug(
00209                                         boost::format("[%s] ~SourceInfo: %s")
00210                                         % m_parent->getIpPort() % e.what()
00211                                 );
00212                         }
00213                 }
00214                 (*i)->delSource(m_parent);
00215         }
00216         --s_sourceInfoCnt;
00217 }
00218 
00219 size_t SourceInfo::count() {
00220         return s_sourceInfoCnt;
00221 }
00222 
00223 void SourceInfo::setPartMap(const std::vector<bool> &partMap) {
00224         if (m_partMap) {
00225                 m_reqFile->getPartData()->delSourceMask(
00226                         ED2K_PARTSIZE, *m_partMap
00227                 );
00228                 *m_partMap = partMap;
00229         } else {
00230                 m_partMap.reset(new std::vector<bool>(partMap));
00231         }
00232         m_reqFile->getPartData()->addSourceMask(ED2K_PARTSIZE, partMap);
00233         checkNeedParts();
00234 }
00235 
00236 void SourceInfo::setReqFile(Download *file) {
00237         m_reqFile = file;
00238         checkNeedParts();
00239 }
00240 
00241 void SourceInfo::addOffered(Download *file) {
00242         bool ret = m_offered.insert(file).second;
00243         if (m_offered.size() == 1 && !m_reqFile) {
00244                 setReqFile(file);
00245         }
00246         if (ret) {
00247                 file->addSource(m_parent);
00248         }
00249 }
00250 
00251 void SourceInfo::remOffered(Download *file, bool cleanUp) {
00252         CHECK_THROW(file);
00253 
00254         typedef std::set<Download*>::iterator Iter;
00255         Iter it = m_offered.find(file);
00256         if (it == m_offered.end()) {
00257                 return;
00258         }
00259 
00260         m_offered.erase(it);
00261 
00262         file->delSource(m_parent);
00263 
00264         if (file == m_reqFile) {
00265                 if (m_partMap && cleanUp) {
00266                         m_reqFile->getPartData()->delSourceMask(
00267                                 ED2K_PARTSIZE, *m_partMap
00268                         );
00269                         m_partMap.reset();
00270                 }
00271 
00272                 if (m_offered.size()) {
00273                         logTrace(TRACE_CLIENT,
00274                                 boost::format(
00275                                         "[%s] Switching source to other file."
00276                                 ) % m_parent->getIpPort()
00277                         );
00278                         setReqFile(*m_offered.begin());
00279                 } else {
00280                         m_reqFile = 0;
00281                         m_partMap.reset();
00282                         m_needParts = false;
00283                 }
00284         }
00285 }
00286 
00287 void SourceInfo::checkNeedParts() {
00288         if (m_partMap) {
00289                 m_needParts = false;
00290                 if (m_partMap->empty()) {
00291                         m_needParts = true;
00292                 } else try {
00293                         m_reqFile->getPartData()->getRange(
00294                                 ED2K_PARTSIZE, *m_partMap
00295                         );
00296                         m_needParts = true;
00297                 } catch (...) {}
00298         } else {
00299                 // set true for now - we'll get an update once we get partmap
00300                 m_needParts = true;
00301         }
00302 }
00303 
00304 // DownloadInfo class
00305 // ------------------
00306 DownloadInfo::DownloadInfo(Client *parent, PartData *pd, PartMapPtr partMap)
00307 : ClientExtBase(parent), m_reqPD(pd), m_partMap(partMap), m_packedBegin(),
00308 m_received() {
00309         CHECK_THROW(pd);
00310         CHECK_THROW(partMap);
00311         ++s_downloadInfoCnt;
00312 }
00313 
00314 DownloadInfo::~DownloadInfo() {
00315         if (m_packedBuffer.size() && m_reqPD && m_reqChunks.size()) try {
00316                 logTrace(TRACE_CLIENT,
00317                         boost::format(
00318                                 "[%s] DownloadInfo: LastMinute packed "
00319                                 "data flushing."
00320                         ) % m_parent->getIpPort()
00321                 );
00322 
00323                 std::string tmp(Zlib::decompress(m_packedBuffer));
00324                 uint32_t end = m_packedBegin + tmp.size() - 1;
00325                 write(Range32(m_packedBegin, end), tmp);
00326         } catch (std::exception &) {
00327                 logTrace(TRACE_CLIENT,
00328                         boost::format(
00329                                 "[%s] DownloadInfo: LastMinute flushing failed."
00330                         ) % m_parent->getIpPort()
00331                 );
00332         }
00333         logTrace(TRACE_CLIENT,
00334                 boost::format(
00335                         "[%s] DownloadSessionEnd: Total received: %d bytes"
00336                 ) % m_parent->getIpPort() % m_received
00337         );
00338         --s_downloadInfoCnt;
00339 }
00340 
00341 size_t DownloadInfo::count() {
00342         return s_downloadInfoCnt;
00343 }
00344 
00345 std::list<Range32> DownloadInfo::getChunkReqs() try {
00346         CHECK_THROW(m_reqPD);
00347 
00348         if (!m_curPart) {
00349                 m_curPart = m_reqPD->getRange(ED2K_PARTSIZE, *m_partMap);
00350         }
00351         while (m_reqChunks.size() < 3) try {
00352                 m_reqChunks.push_back(m_curPart->getLock(ED2K_CHUNKSIZE));
00353         } catch (PartData::LockError&) {
00354                 m_curPart.reset();
00355                 return getChunkReqs();
00356         }
00357         std::list<Range32> tmp;
00358         for (Iter i = m_reqChunks.begin(); i != m_reqChunks.end(); ++i) {
00359                 tmp.push_back(*(*i));
00360         }
00361         return tmp;
00362 } catch (std::exception&) {
00363         if (m_reqChunks.size()) {
00364                 std::list<Range32> tmp;
00365                 for (Iter i = m_reqChunks.begin(); i != m_reqChunks.end(); ++i) {
00366                         tmp.push_back(*(*i));
00367                 }
00368                 return tmp;
00369         } else {
00370                 throw;
00371         }
00372 }
00373 MSVC_ONLY(;)
00374 
00375 bool DownloadInfo::write(Range32 r, const std::string &data) try {
00376         CHECK_THROW(m_reqPD);
00377         CHECK_THROW(m_reqChunks.size());
00378         CHECK(data.size());
00379 
00380         if (data.size() == 0) {
00381                 return false; // nothing to do
00382         }
00383 
00384         Iter i = m_reqChunks.begin();
00385         for (; i != m_reqChunks.end() && !(*i)->contains(r); ++i);
00386         CHECK_THROW(i != m_reqChunks.end());
00387 
00388         (*i)->write(r.begin(), data);
00389         m_received += data.size();
00390 
00391         if ((*i)->isComplete()) {
00392                 logTrace(TRACE_CLIENT,
00393                         boost::format("[%s] Completed chunk %d..%d")
00394                         % m_parent->getIpPort() % (*i)->begin() % (*i)->end()
00395                 );
00396                 m_reqChunks.erase(i);
00397                 return true;
00398         }
00399 
00400         return false;
00401 } catch (std::exception &e) {
00402         logDebug(
00403                 boost::format("[%s] Writing data: %s")
00404                 % m_parent->getIpPort() % e.what()
00405         );
00406         return false;
00407 }
00408 MSVC_ONLY(;)
00409 
00410 bool DownloadInfo::writePacked(
00411         uint32_t begin, uint32_t size, const std::string &data
00412 ) {
00413         CHECK_THROW(m_reqPD);
00414         CHECK_THROW(m_reqChunks.size());
00415 
00416         m_packedBegin = begin;
00417         m_packedBuffer.append(data);
00418 
00419         if (m_packedBuffer.size() == size) {
00420                 std::string tmp(Zlib::decompress(m_packedBuffer));
00421                 logTrace(TRACE_CLIENT,
00422                         boost::format("[%s] Completed packed chunk %d..%d")
00423                         % m_parent->getIpPort() % begin
00424                         % (begin + tmp.size() - 1)
00425                 );
00426 
00427                 if (tmp.size() > m_packedBuffer.size()) {
00428                         logTrace(TRACE_CLIENT, boost::format(
00429                                 "[%s] Saved %d bytes by download compression."
00430                         ) % m_parent->getIpPort() %
00431                         (tmp.size() - m_packedBuffer.size()));
00432                 }
00433 
00434                 return write(Range32(begin, begin + tmp.size() - 1), tmp);
00435         }
00436 
00437         return false;
00438 }
00439 
00440 } // namespace detail