partdata.cpp

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 #include <hn/hnprec.h>
00020 #include <hn/partdata.h>
00021 #include <hn/log.h>
00022 #include <hn/lambda_placeholders.h>
00023 #include <hn/metadata.h>
00024 #include <hn/hasher.h>
00025 #include <hn/hash.h>
00026 #include <boost/lambda/lambda.hpp>
00027 #include <boost/lambda/if.hpp>
00028 #include <boost/lambda/bind.hpp>
00029 #include <boost/filesystem/operations.hpp>
00030 #include <boost/algorithm/string/replace.hpp>
00031 #include <boost/multi_index_container.hpp>
00032 #include <boost/multi_index/key_extractors.hpp>
00033 #include <boost/multi_index/ordered_index.hpp>
00034 #include <fstream>
00035 #include <fcntl.h>
00036 
00037 using namespace boost::lambda;
00038 using namespace boost::multi_index;
00039 using namespace CGComm;
00040 
00041 static const uint32_t BUF_SIZE_LIMIT = 512*1024; //!< 512k buffer
00042 
00043 namespace CGComm {
00044         //! Opcodes used within PartData object I/O between streams
00045         enum PartDataOpCodes {
00046                 OP_PD_VER         = 0x01,  //!< uint8  File version
00047                 OP_PARTDATA       = 0x90,  //!< uint8  PartData object
00048                 OP_PD_DOWNLOADED  = 0x91,  //!< uint64 Downloaded data
00049                 OP_PD_DESTINATION = 0x92,  //!< string Destination location
00050                 OP_PD_COMPLETED   = 0x93,  //!< FullRangeList completed ranges
00051                 OP_PD_HASHSET     = 0x94   //!< FullRangeList<HashBase*> hashset
00052         };
00053 }
00054 
00055 namespace Detail {
00056 
00057 class Chunk : public Range64 {
00058 public:
00059         /**
00060          * Construct new chunk.
00061          *
00062          * @param parent      Parent object
00063          * @param begin       Begin offset (inclusive)
00064          * @param end         End offset (inclusive)
00065          * @param size        Ideal size for this chunk
00066          * @param hash        Optional chunkhash for this data
00067          */
00068         Chunk(
00069                 PartData *parent, uint64_t begin, uint64_t end,
00070                 uint32_t size, const HashBase *hash = 0
00071         );
00072 
00073         /**
00074          * Construct Chunk from pre-existing Range object
00075          *
00076          * @param parent      Parent object
00077          * @param range       The range
00078          * @param size        Ideal size for this chunk
00079          * @param hash        Optional chunkhash for this data
00080          */
00081         Chunk(
00082                 PartData *parent, Range64 range,
00083                 uint32_t size, const HashBase *hash = 0
00084         );
00085 
00086         //! \name Getters
00087         //! @{
00088         bool     isVerified() const { return m_verified; }
00089         bool     isPartial()  const { return m_partial;  }
00090         uint32_t getAvail()   const { return m_avail;    }
00091         uint32_t getUseCnt()  const { return m_useCnt;   }
00092         uint32_t getSize()    const { return m_size;     }
00093         bool     isComplete() const;
00094         //! @}
00095 
00096         //! \name Setters
00097         //! @{
00098         void setVerified(bool v)   { m_verified = v; }
00099         void setPartial(bool p)    { m_partial = p;  }
00100         void setAvail(uint32_t a)  { m_avail = a;    }
00101         void setUseCnt(uint32_t u) { m_useCnt = u;   }
00102         //! @}
00103 
00104         //! Schedules hash job to verify this chunk
00105         void verify();
00106 
00107 public: // Needed public for Boost::multi_index
00108         PartData       *m_parent;  //!< File this chunk belongs to
00109         const HashBase *m_hash;    //!< Optional
00110         bool            m_verified;//!< If it is verified.
00111         bool            m_partial; //!< If it is partially downloaded
00112         uint32_t        m_avail;   //!< Availability count
00113         uint32_t        m_useCnt;  //!< Use count
00114         //! Ideal size, e.g. 9500kb for ed2k-chunk etc. Note that the
00115         //! real size of the chunk may be lower (in case of last chunk)
00116         uint32_t        m_size;
00117 private:
00118         Chunk(); //!< Forbidden
00119         friend class Detail::LockedRange;
00120 
00121         //! Allowed by LockedRange class
00122         void write(uint64_t begin, const std::string &data);
00123         void onHashEvent(HashWorkPtr w, HashEvent evt);
00124 };
00125 
00126 struct ChunkMapIndices : boost::multi_index::indexed_by<
00127         boost::multi_index::ordered_non_unique<
00128                 boost::multi_index::identity<Chunk>
00129         >,
00130         boost::multi_index::ordered_non_unique<
00131                 boost::multi_index::member<
00132                         Chunk, bool, &Chunk::m_verified
00133                 >
00134         >,
00135         boost::multi_index::ordered_non_unique<
00136                 boost::multi_index::member<
00137                         Chunk, bool, &Chunk::m_partial
00138                 >
00139         >,
00140         boost::multi_index::ordered_non_unique<
00141                 boost::multi_index::member<
00142                         Chunk, uint32_t, &Chunk::m_avail
00143                 >
00144         >,
00145         boost::multi_index::ordered_non_unique<
00146                 boost::multi_index::member<
00147                         Chunk, uint32_t, &Chunk::m_useCnt
00148                 >
00149         >,
00150         boost::multi_index::ordered_non_unique<
00151                 boost::multi_index::member<
00152                         Chunk, uint32_t, &Chunk::m_size
00153                 >
00154         >
00155 > {};
00156 enum { ID_Pos, ID_Verified, ID_Partial, ID_Avail, ID_UseCnt, ID_Length};
00157 struct ChunkMap : public boost::multi_index_container<Chunk,ChunkMapIndices>{};
00158 typedef ChunkMap::nth_index<Detail::ID_Avail   >::type CMAvailIndex;
00159 typedef ChunkMap::nth_index<Detail::ID_Pos     >::type CMPosIndex;
00160 typedef ChunkMap::nth_index<Detail::ID_Verified>::type CMVerIndex;
00161 typedef ChunkMap::nth_index<Detail::ID_Partial >::type CMPartIndex;
00162 typedef ChunkMap::nth_index<Detail::ID_UseCnt  >::type CMUseIndex;
00163 typedef ChunkMap::nth_index<Detail::ID_Length  >::type CMLenIndex;
00164 struct AvailIter : public CMAvailIndex::iterator {
00165         template<typename T>
00166         AvailIter(const T &t) : CMAvailIndex::iterator(t) {}
00167 };
00168 
00169 // UsedRange class
00170 // -------------------------
00171 template<typename IterType>
00172 UsedRange::UsedRange(PartData *parent, IterType it) : Range64((*it).begin(),
00173 (*it).end()), m_parent(parent),
00174 m_chunk(new AvailIter(project<ID_Avail>(*m_parent->m_chunks, it))) {
00175         CHECK_THROW(parent != 0);
00176         CHECK_THROW(*m_chunk != get<ID_Avail>(*parent->m_chunks).end());
00177 
00178         logTrace(TRACE_PARTDATA,
00179                 boost::format("%s: Using range %d..%d")
00180                 % m_parent->m_dest.leaf() % begin() % end()
00181         );
00182 
00183         get<ID_Avail>(*m_parent->m_chunks).modify(
00184                 *m_chunk, ++bind(&Chunk::m_useCnt, __1(__1))
00185         );
00186 }
00187 
00188 UsedRange::UsedRange(PartData *parent, uint64_t begin, uint64_t end) :
00189 Range64(begin, end), m_parent(parent) {
00190         m_chunk.reset(new AvailIter(get<ID_Avail>(*m_parent->m_chunks).end()));
00191 //      m_chunk = new AvailIter(m_parent->m_chunks->get<ID_Avail>().end()));
00192         logTrace(TRACE_PARTDATA,
00193                 boost::format("%s: Using range %d..%d")
00194                 % m_parent->m_dest.leaf() % this->begin() % this->end()
00195         );
00196 }
00197 
00198 UsedRange::~UsedRange() {
00199         logTrace(TRACE_PARTDATA,
00200                 boost::format("%s: Un-using range %d..%d")
00201                 % m_parent->m_dest.leaf() % begin() % end()
00202         );
00203 
00204         if (*m_chunk != m_parent->m_chunks->get<ID_Avail>().end()) {
00205                 get<ID_Avail>(*m_parent->m_chunks).modify(
00206                         *m_chunk, --bind(&Chunk::m_useCnt, __1(__1))
00207                 );
00208         }
00209 }
00210 
00211 LockedRangePtr UsedRange::getLock(uint32_t size) {
00212         return m_parent->getLock(shared_from_this(), size);
00213 }
00214 
00215 bool UsedRange::isComplete() const {
00216         return m_parent->isComplete(*this);
00217 }
00218 
00219 // LockedRange class
00220 // ---------------------------
00221 LockedRange::LockedRange(PartData *parent, Range64 r)
00222 : Range64(r), m_parent(parent),
00223 m_chunk(new AvailIter(get<ID_Avail>(*m_parent->m_chunks).end())) {
00224         logTrace(TRACE_PARTDATA,
00225                 boost::format("%s: Locking range %d..%d")
00226                 % m_parent->m_dest.leaf() % begin() % end()
00227         );
00228 
00229         m_parent->m_locked.merge(*this);
00230 }
00231 
00232 LockedRange::LockedRange(PartData *parent, Range64 r, AvailIter &it)
00233 : Range64(r), m_parent(parent), m_chunk(new AvailIter(it)) {
00234         CHECK_THROW(parent != 0);
00235 
00236         logTrace(TRACE_PARTDATA,
00237                 boost::format("%s: Locking range %d..%d")
00238                 % m_parent->m_dest.leaf() % begin() % end()
00239         );
00240 
00241         m_parent->m_locked.merge(*this);
00242 }
00243 
00244 LockedRange::~LockedRange() {
00245         logTrace(TRACE_PARTDATA,
00246                 boost::format("%s: UnLocking range %d..%d")
00247                 % m_parent->m_dest.leaf() % begin() % end()
00248         );
00249 
00250         m_parent->m_locked.erase(*this);
00251 }
00252 
00253 void LockedRange::write(uint64_t begin, const std::string &data) {
00254         if (begin > end() || begin + data.size() - 1 > end()) {
00255                 throw PartData::LockError("Writing outside lock.");
00256         }
00257         if (*m_chunk != get<ID_Avail>(*m_parent->m_chunks).end()) {
00258                 m_parent->m_chunks->get<ID_Avail>().modify(
00259                         *m_chunk, bind(&Chunk::write, __1, begin, data)
00260                 );
00261         } else {
00262                 m_parent->doWrite(begin, data);
00263         }
00264 }
00265 
00266 // Chunk class
00267 // ---------------------
00268 Chunk::Chunk(
00269         PartData *parent, uint64_t begin, uint64_t end,
00270         uint32_t size, const HashBase *hash
00271 ) : Range64(begin, end), m_parent(parent), m_hash(hash), m_verified(),
00272 m_partial(!m_parent->isComplete(*this)), m_avail(), m_useCnt(), m_size(size) {
00273         logTrace(TRACE_PARTDATA,
00274                 boost::format("Constructing chunk: %d..%d (%s)")
00275                 % begin % end % (m_partial ? "partial" : "complete")
00276         );
00277 }
00278 
00279 Chunk::Chunk(
00280         PartData *parent, Range64 range, uint32_t size, const HashBase *hash
00281 ) : Range64(range), m_parent(parent), m_hash(hash), m_verified(),
00282 m_partial(!m_parent->isComplete(*this)), m_avail(), m_useCnt(), m_size(size) {
00283         logTrace(TRACE_PARTDATA,
00284                 boost::format("Constructing chunk: %d..%d (%s)")
00285                 % begin() % end() % (m_partial ? "partial" : "complete")
00286         );
00287 }
00288 
00289 void Chunk::write(uint64_t begin, const std::string &data) {
00290         m_parent->doWrite(begin, data);
00291         if (isComplete()) {
00292                 verify();
00293         }
00294 }
00295 void Chunk::verify() {
00296         logTrace(TRACE_PARTDATA,
00297                 boost::format("%s: Completed chunk %d..%d")
00298                 % m_parent->m_dest.leaf() % this->begin() % this->end()
00299         );
00300 
00301         m_partial = false;
00302         m_verified = false;
00303 
00304         if (m_hash) {
00305                 m_parent->save();
00306                 boost::shared_ptr<HashWork> c(
00307                         new HashWork(
00308                                 m_parent->m_loc.string(), this->begin(),
00309                                 this->end(), m_hash
00310                         )
00311                 );
00312                 HashWork::getEventTable().addHandler(
00313                         c, this, &Chunk::onHashEvent
00314                 );
00315                 WorkThread::instance().postWork(c);
00316                 ++m_parent->m_pendingHashes;
00317         } else if (m_parent->isComplete()) {
00318                 m_parent->doComplete();
00319         } else {
00320                 logWarning(
00321                         boost::format(
00322                                 "%s: Chunk %s..%s has no hash - "
00323                                 "shouldn't happen..."
00324                         ) % m_parent->m_dest.leaf() %this->begin() % this->end()
00325                 );
00326         }
00327 }
00328 
00329 void Chunk::onHashEvent(HashWorkPtr c, HashEvent evt){
00330         if (evt == HASH_FAILED) {
00331                 boost::format fmt("%s: Corruption found at %d..%d");
00332                 logWarning(fmt % m_parent->m_dest.leaf() % begin() % end());
00333                 m_parent->m_complete.erase(begin(), end());
00334                 m_verified = false;
00335                 if (m_parent->m_fullJob) {
00336                         m_parent->m_fullJob->cancel();
00337                         m_parent->m_fullJob.reset();
00338                 }
00339                 m_parent->m_partStatus[m_size][begin()/m_size] = false;
00340         } else if (evt == HASH_VERIFIED) {
00341                 m_verified = true;
00342                 m_partial = false;
00343                 m_parent->m_partStatus[m_size][begin()/m_size] = true;
00344                 logTrace(TRACE_PARTDATA,
00345                         boost::format("Verified chunk #%d") % (begin() / m_size)
00346                 );
00347         } else if (evt == HASH_FATAL_ERROR) {
00348                 logError(
00349                         boost::format("Fatal error hashing file `%s'")
00350                         % c->getFileName().native_file_string()
00351                 );
00352         }
00353         if (!--m_parent->m_pendingHashes && m_parent->isComplete()) {
00354                 m_parent->doComplete();
00355         }
00356 }
00357 bool Chunk::isComplete() const {
00358         return m_parent->isComplete(*this);
00359 }
00360 
00361 } // namespace Detail
00362 
00363 using namespace Detail;
00364 
00365 // PartData exception classes
00366 // --------------------------
00367 PartData::LockError::LockError(const std::string &msg):std::runtime_error(msg){}
00368 PartData::RangeError::RangeError(const std::string &mg):std::runtime_error(mg){}
00369 
00370 // PartData class
00371 // --------------
00372 IMPLEMENT_EVENT_TABLE(PartData, PartData*, int);
00373 
00374 PartData::PartData(
00375         uint64_t size,
00376         const boost::filesystem::path &loc,
00377         const boost::filesystem::path &dest
00378 ) : m_chunks(new ChunkMap), m_size(size), m_loc(loc), m_dest(dest), m_toFlush(),
00379 m_md(), m_pendingHashes(), m_sourceCnt(), m_fullSourceCnt() {
00380         std::ofstream o(loc.string().c_str(), std::ios::binary);
00381         o.flush();
00382         getEventTable().postEvent(this, PD_ADDED);
00383 }
00384 
00385 PartData::PartData(const boost::filesystem::path &p) try :
00386 m_chunks(new ChunkMap), m_size(), m_toFlush(), m_md(), m_pendingHashes(),
00387 m_sourceCnt(), m_fullSourceCnt() {
00388         logTrace(TRACE_PARTDATA,
00389                 boost::format("Loading temp file: %s") % p.string()
00390         );
00391         using namespace Utils;
00392 
00393         std::string tmp(p.string());
00394         boost::algorithm::replace_last(tmp, ".tmp.dat", ".tmp");
00395         m_loc = boost::filesystem::path(tmp, boost::filesystem::native);
00396 
00397         std::ifstream ifs(p.string().c_str(), std::ios::binary);
00398         CHECK_THROW(ifs);
00399         CHECK_THROW(Utils::getVal<uint8_t>(ifs) == OP_PARTDATA);
00400 
00401         Utils::getVal<uint16_t>(ifs);
00402         if (Utils::getVal<uint8_t>(ifs) != OP_PD_VER) {
00403                 logWarning("Unknown partdata version.");
00404         }
00405         m_size = Utils::getVal<uint64_t>(ifs);
00406         uint16_t tagc = Utils::getVal<uint16_t>(ifs);
00407 
00408         while (tagc-- && ifs) {
00409                 uint8_t   oc = getVal<uint8_t>(ifs);
00410                 uint16_t len = getVal<uint16_t>(ifs);
00411                 switch (oc) {
00412                         case OP_PD_DESTINATION:
00413                                 m_dest = getVal<std::string>(ifs);
00414                                 break;
00415                         case OP_PD_COMPLETED:
00416                                 if (Utils::getVal<uint8_t>(ifs)!=OP_RANGELIST) {
00417                                         logWarning("Invalid tag.");
00418                                         ifs.seekg(len, std::ios::cur);
00419                                 }
00420                                 Utils::getVal<uint16_t>(ifs);
00421                                 m_complete = RangeList64(ifs);
00422                                 break;
00423                         default:
00424                                 logWarning("Unhandled tag in PartData.");
00425                                 ifs.seekg(len, std::ios::cur);
00426                                 break;
00427                 }
00428         }
00429 
00430         if (ifs && Utils::getVal<uint8_t>(ifs) == OP_METADATA) {
00431                 Utils::getVal<uint16_t>(ifs);
00432                 m_md = new MetaData(ifs);
00433                 for (uint32_t i = 0; i < m_md->getHashSetCount(); ++i) {
00434                         HashSetBase *hs = m_md->getHashSet(i);
00435                         if (hs->getChunkSize() && hs->getChunkCnt()) {
00436                                 addHashSet(hs);
00437                         }
00438                 }
00439         }
00440 
00441         printCompleted();
00442 
00443         // It's possible we loaded a complete file - if so, re-try completing.
00444         if (isComplete()) {
00445                 doComplete();
00446         } else {
00447                 uint32_t modDate = Utils::getModDate(m_loc);
00448                 if (m_md && modDate != m_md->getModDate()) {
00449                         logMsg(
00450                                 boost::format(
00451                                         "%s: Modification date changed, "
00452                                         "rehashing completed parts."
00453                                 ) % m_dest.leaf()
00454                         );
00455                         rehashCompleted();
00456                 }
00457         }
00458 
00459         getEventTable().postEvent(this, PD_ADDED);
00460 } catch (std::runtime_error &) {
00461         delete m_md;
00462 }
00463 MSVC_ONLY(;)
00464 
00465 PartData::~PartData() {}
00466 
00467 void PartData::rehashCompleted() {
00468         CMPosIndex &idx = m_chunks->get<ID_Pos>();
00469         for (CMPosIndex::iterator i = idx.begin(); i != idx.end(); ++i) {
00470                 if ((*i).isComplete()) {
00471                         idx.modify(i, bind(&Chunk::verify, __1));
00472                 }
00473         }
00474 }
00475 
00476 void PartData::addSourceMask(
00477         uint32_t chunkSize, const std::vector<bool> &chunks
00478 ) {
00479         typedef CMLenIndex::iterator Iter;
00480 
00481         ++m_sourceCnt;
00482         if (chunks.empty()) {
00483                 addFullSource(chunkSize);
00484                 return;
00485         }
00486         CHECK_THROW(chunks.size() == getChunkCount(chunkSize));
00487         checkAddChunkMap(chunkSize);
00488         int i = 0;
00489         CMLenIndex& pi = m_chunks->get<ID_Length>();
00490         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
00491         for (Iter j = ret.first; j != ret.second; ++j) {
00492                 pi.modify(j, bind(&Chunk::m_avail, __1(__1)) += chunks[i++]);
00493         }
00494 }
00495 
00496 void PartData::addFullSource(uint32_t chunkSize) {
00497         typedef CMLenIndex::iterator Iter;
00498 
00499         checkAddChunkMap(chunkSize);
00500         CMLenIndex& pi = m_chunks->get<ID_Length>();
00501         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
00502         for (Iter i = ret.first; i != ret.second; ++i) {
00503                 pi.modify(i, ++bind(&Chunk::m_avail, __1(__1)));
00504         }
00505         ++m_fullSourceCnt;
00506 }
00507 void PartData::delSourceMask(
00508         uint32_t chunkSize, const std::vector<bool> &chunks
00509 ) {
00510         typedef CMLenIndex::iterator Iter;
00511 
00512         CHECK_THROW(m_sourceCnt);
00513         --m_sourceCnt;
00514         if (chunks.empty()) {
00515                 delFullSource(chunkSize);
00516                 return;
00517         }
00518         CHECK_THROW(chunks.size() == getChunkCount(chunkSize));
00519         int i = 0;
00520         CMLenIndex& pi = m_chunks->get<ID_Length>();
00521         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
00522         for (Iter j = ret.first; j != ret.second; ++j) {
00523                 if(chunks[i]) { // ensures we don't integer underflow
00524                         CHECK_THROW((*j).m_avail);
00525                 }
00526                 pi.modify(j, bind(&Chunk::m_avail, __1(__1)) -= chunks[i++]);
00527         }
00528 }
00529 void PartData::delFullSource(uint32_t chunkSize) {
00530         typedef CMLenIndex::iterator Iter;
00531 
00532         CHECK_THROW(m_fullSourceCnt);
00533         --m_fullSourceCnt;
00534         CMLenIndex& pi = m_chunks->get<ID_Length>();
00535         std::pair<Iter, Iter> ret = pi.equal_range(chunkSize);
00536         for (Iter i = ret.first; i != ret.second; ++i) {
00537                 CHECK_THROW((*i).m_avail);
00538                 pi.modify(i, --bind(&Chunk::m_avail, __1(__1)));
00539         }
00540 }
00541 
00542 /**
00543  * Unary function object for usage with doGetRange() method
00544  *
00545  * @param r  Ignored
00546  * @returns Always true
00547  */
00548 struct TruePred { bool operator()(const Range64 &) { return true; } } truepred;
00549 
00550 /**
00551  * Unary function object for usage with doGetRange() method, in order to check
00552  * if the generated range is contained within the rangelist passed within this
00553  * function object.
00554  *
00555  * @param r    Range candidate to be checked
00556  * @return     True if @param r is contained within m_rl, false otherwise
00557  */
00558 struct CheckPred {
00559         bool operator()(const Range64 &r) { return m_rl.containsFull(r); }
00560         RangeList64 m_rl;
00561 } checkPred;
00562 
00563 
00564 UsedRangePtr PartData::getRange(uint32_t size) {
00565         return doGetRange(size, truepred);
00566 }
00567 
00568 UsedRangePtr PartData::getRange(
00569         uint32_t size, const std::vector<bool> &chunks
00570 ) {
00571         if (chunks.empty()) {
00572                 return doGetRange(size, truepred);
00573         }
00574         checkPred.m_rl.clear();
00575         for (uint32_t i = 0, j = 0; i < chunks.size(); ++i, j += size) {
00576                 if (chunks[i]) {
00577                         Range64 r(j, j + size - 1);
00578                         if (r.end() > m_size - 1) {
00579                                 r.end(m_size - 1);
00580                         }
00581                         checkPred.m_rl.push(r);
00582                 }
00583         }
00584         return doGetRange(size, checkPred);
00585 }
00586 
00587 template<typename Predicate>
00588 UsedRangePtr PartData::doGetRange(uint64_t size, Predicate &pred) {
00589         UsedRangePtr ret;
00590         if (m_chunks->empty()) {
00591                 if (m_complete.empty()) {
00592                         ret = UsedRangePtr(new UsedRange(this, 0, size));
00593                         if (ret->end() > m_size - 1) {
00594                                 ret->end(m_size - 1);
00595                         }
00596                 } else if (m_complete.front().begin() > 0) {
00597                         uint64_t end = m_complete.front().begin() - 1;
00598                         ret = UsedRangePtr(new UsedRange(this, 0, end));
00599                 } else if (m_complete.front().end() < m_size) {
00600                         uint64_t beg = m_complete.front().end() + 1;
00601                         uint64_t end = beg + size > m_size ? m_size : beg +size;
00602                         ret = UsedRangePtr(new UsedRange(this, beg, end));
00603                 }
00604         }
00605         if (ret) try { ret->getLock(1); } catch (LockError&) { ret.reset(); }
00606         if (!ret) { // Round 1: Incomplete chunks
00607                 typedef CMPartIndex::iterator PIter;
00608                 CMPartIndex &pi = get<ID_Partial>(*m_chunks);
00609                 std::pair<PIter, PIter> r = pi.range(__1 == true, unbounded);
00610                 PIter i = pi.end();
00611                 for (PIter j = r.first; j != r.second; ++j) {
00612                         if (!(*j).m_useCnt && pred(*j)) {
00613                                 i = j;
00614                         }
00615                 }
00616                 // partial chunk with useCnt == 0
00617                 if (i != pi.end()) {
00618                         ret = UsedRangePtr(new UsedRange(this, i));
00619                 }
00620         }
00621         if (ret) try { ret->getLock(1); } catch (LockError&) { ret.reset(); }
00622         if (!ret) { // Round 2: Least available unused chunk
00623                 typedef CMAvailIndex::iterator AIter;
00624                 CMAvailIndex &ai = get<ID_Avail>(*m_chunks);
00625                 AIter i = ai.end();
00626                 for (AIter r = ai.upper_bound(0); r != ai.end(); ++r) {
00627                         if ((*r).isComplete() || (*r).isPartial() || !pred(*r)){
00628                                 continue;
00629                         }
00630                         if (!(*r).getUseCnt()) {
00631                                 i = r;
00632                                 break;
00633                         }
00634                 }
00635                 if (i != ai.end()) {
00636                         ret = UsedRangePtr(new UsedRange(this, i));
00637                 }
00638         }
00639         if (ret) try { ret->getLock(1); } catch (LockError&) { ret.reset(); }
00640         if (!ret) { // Round 3: Least used chunk
00641                 typedef CMUseIndex::iterator UIter;
00642                 CMUseIndex &ui = get<ID_UseCnt>(*m_chunks);
00643                 UIter r = ui.upper_bound(0);
00644                 while (r != ui.end() && !pred(*r)) {
00645                         r == ui.begin() ? r = ui.end() : --r;
00646                 }
00647                 if (r != ui.end()) {
00648                         ret = UsedRangePtr(new UsedRange(this, r));
00649                 }
00650         }
00651         if (ret) try { ret->getLock(1); } catch (LockError&) { ret.reset(); }
00652         if (!ret) {
00653                 throw RangeError("Failed to generate chunk request.");
00654         } else if (!pred(*ret)) {
00655                 std::string msg(
00656                         "Internal PartData error "
00657                         "while generating chunk request."
00658                 );
00659                 logDebug(msg);
00660                 throw RangeError(msg);
00661         }
00662         return ret;
00663 }
00664 
00665 uint32_t PartData::getChunkCount(uint32_t chunkSize) const {
00666         return m_size / chunkSize + (m_size % chunkSize ? 1 : 0);
00667 }
00668 
00669 void PartData::checkAddChunkMap(uint32_t cs) {
00670         typedef CMLenIndex::iterator LIter;
00671         if (m_partStatus.find(cs) == m_partStatus.end()) {
00672                 m_partStatus[cs] = std::vector<bool>(getChunkCount(cs));
00673         }
00674         std::pair<LIter, LIter> ret = m_chunks->get<ID_Length>().equal_range(cs);
00675         if (ret.first == m_chunks->get<ID_Length>().end()) {
00676                 for (uint32_t i = 0; i < getChunkCount(cs); ++i) {
00677                         uint64_t beg = i * cs;
00678                         uint64_t end = (i + 1) * cs - 1;
00679                         if (end > m_size - 1) {
00680                                 end = m_size - 1;
00681                         }
00682                         Chunk c(this, beg, end, cs);
00683                         m_chunks->insert(c);
00684                         m_partStatus[cs][i] = isComplete(beg, end);
00685                 }
00686         }
00687 }
00688 void PartData::addHashSet(const HashSetBase *hs) {
00689         CHECK_THROW(hs->getChunkSize() > 0);
00690         typedef CMLenIndex::iterator LIter;
00691         uint32_t cs = hs->getChunkSize();
00692         std::pair<LIter, LIter> ret =m_chunks->get<ID_Length>().equal_range(cs);
00693         if (ret.first == m_chunks->get<ID_Length>().end()) {
00694                 checkAddChunkMap(cs);
00695                 ret = m_chunks->get<ID_Length>().equal_range(cs);
00696         }
00697         int cc = hs->getChunkCnt();
00698         CHECK_THROW(std::distance(ret.first, ret.second) == cc);
00699         uint32_t j = 0;
00700         for (LIter i = ret.first; i != ret.second; ++i) {
00701                 m_chunks->get<ID_Length>().modify(
00702                         i, bind(&Chunk::m_hash, __1(__1)) = &(*hs)[j++]
00703                 );
00704         }
00705 }
00706 
00707 bool PartData::isComplete() const {
00708         return m_complete.size() == 1 &&
00709                 !(*m_complete.begin()).begin() &&
00710                 (*m_complete.begin()).end() == m_size - 1;
00711 }
00712 bool PartData::isComplete(const Range64 &r) const {
00713         return m_complete.containsFull(r);
00714 }
00715 bool PartData::isComplete(uint64_t begin, uint64_t end) const {
00716         return m_complete.containsFull(begin, end);
00717 }
00718 LockedRangePtr PartData::getLock(UsedRangePtr used, uint32_t size) {
00719         Range64 cand(used->begin(), used->begin());
00720         typedef RangeList64::CIter CIter;
00721         CIter i = m_complete.getContains(cand);
00722         CIter j = m_locked.getContains(cand);
00723         do {
00724                 i = m_complete.getContains(cand);
00725                 j = m_locked.getContains(cand);
00726                 if (i != m_complete.end()) {
00727                         cand = Range64((*i).end() + 1, (*i).end() + 1);
00728                 } else if (j != m_locked.end()) {
00729                         cand = Range64((*j).end() + 1, (*j).end() + 1);
00730                 } else {
00731                         break;
00732                 }
00733         } while (used->contains(cand));
00734         if (!used->contains(cand)) {
00735                 throw LockError("Unable to aquire lock within this UsedRange.");
00736         }
00737         cand.end(cand.begin() + size - 1);
00738         if (cand.end() > used->end()) {
00739                 cand.end(used->end());
00740         }
00741         i = m_complete.getContains(cand);
00742         if (i != m_complete.end()) {
00743                 cand.end((*i).begin() - 1);
00744         }
00745         i = m_locked.getContains(cand);
00746         if (i != m_locked.end()) {
00747                 cand.end((*i).begin() - 1);
00748         }
00749         CHECK_THROW(cand.length() <= size);
00750         return LockedRangePtr(new LockedRange(this, cand, *used->m_chunk));
00751 }
00752 
00753 void PartData::write(uint64_t begin, const std::string &data) {
00754         logTrace(TRACE_PARTDATA,
00755                 boost::format("Safe-writing at offset %d.") % begin
00756         );
00757         CHECK_THROW(!m_locked.contains(begin, begin + data.size() - 1));
00758         CHECK_THROW(!m_complete.contains(begin, begin + data.size() - 1));
00759         doWrite(begin, data);
00760         typedef CMPosIndex::iterator PIter;
00761         CMPosIndex &pi = m_chunks->get<ID_Pos>();
00762         Range64 tmp(begin, begin + data.size() - 1);
00763         PIter i = pi.lower_bound(Chunk(this, tmp, 0));
00764         if (i != pi.end() && (*i).contains(tmp)) {
00765                 pi.modify(i, bind(&Chunk::m_partial, __1(__1)) = true);
00766         } else if (i != pi.begin() && (*--i).contains(tmp)) {
00767                 pi.modify(i, bind(&Chunk::m_partial, __1(__1)) = true);
00768         } else if (++i != pi.end() && (*i).contains(tmp)) {
00769                 pi.modify(i, bind(&Chunk::m_partial, __1(__1)) = true);
00770         }
00771         if (isComplete() && !m_fullJob && !m_pendingHashes) {
00772                 doComplete();
00773         }
00774 }
00775 void PartData::doWrite(uint64_t begin, const std::string &data) {
00776         logTrace(TRACE_PARTDATA,
00777                 boost::format("Writing at offset %d, datasize is %d")
00778                 % begin % data.size()
00779         );
00780         CHECK_THROW(!m_complete.contains(begin, begin + data.size() - 1));
00781         m_buffer[begin] = data;
00782         m_complete.merge(begin, begin + data.size() - 1);
00783         m_toFlush += data.size();
00784         getEventTable().postEvent(this, PD_DATA_ADDED);
00785         if (m_toFlush >= BUF_SIZE_LIMIT) {
00786                 save();
00787         }
00788 }
00789 
00790 void PartData::flushBuffer() {
00791         logTrace(TRACE_PARTDATA,
00792                 boost::format("Flushing buffers: %s") % m_dest.leaf()
00793         );
00794 
00795         int fd = open(m_loc.native_file_string().c_str(), O_RDWR|O_BINARY);
00796         CHECK_THROW(fd > 0);
00797         for (BIter i = m_buffer.begin(); i != m_buffer.end(); ++i) try {
00798                 uint64_t ret = lseek64(fd, (*i).first, SEEK_SET);
00799                 CHECK_THROW(ret == static_cast<uint64_t>((*i).first));
00800                 int c = ::write(fd, (*i).second.data(), (*i).second.size());
00801                 CHECK_THROW(c == static_cast<int>((*i).second.size()));
00802         } catch (...) {
00803                 close(fd);
00804                 throw;
00805         }
00806         close(fd);
00807 
00808         m_buffer.clear();
00809         m_toFlush = 0;
00810         if (m_md) {
00811                 m_md->setModDate(Utils::getModDate(m_loc));
00812         }
00813         getEventTable().postEvent(this, PD_DATA_FLUSHED);
00814         printCompleted();
00815 }
00816 
00817 //! Sorts container of HashSetBase* objects based on chunkhashcount
00818 struct ChunkCountPred {
00819         bool operator()(const HashSetBase *x, const HashSetBase *y) {
00820                 return x->getChunkCnt() < y->getChunkCnt();
00821         }
00822 };
00823 
00824 void PartData::onHashEvent(HashWorkPtr p, HashEvent evt) {
00825         using boost::logic::tribool;
00826         if (evt == HASH_FATAL_ERROR) {
00827                 logError("Fatal error performing final rehash on PartData.");
00828         } else if (evt != HASH_COMPLETE) {
00829                 logDebug(boost::format(
00830                         "PartData received unknown event %d.") % evt
00831                 );
00832                 return;
00833         }
00834         CHECK_THROW(p->getMetaData());
00835         MetaData *ref = p->getMetaData();
00836         logTrace(TRACE_PARTDATA,
00837                 boost::format("Full metadata generated for file %s (%s)")
00838                 % ref->getFileName(0) % m_dest.leaf()
00839         );
00840         logTrace(TRACE_PARTDATA,
00841                 boost::format("Generated MetaData indicates FileSize=%d")
00842                 % ref->getFileSize()
00843         );
00844 
00845         //! Generated hashsets, sorted on chunk-count
00846         std::multiset<const HashSetBase*, ChunkCountPred> generated;
00847         typedef std::multiset<const HashSetBase*,ChunkCountPred>::iterator Iter;
00848 
00849         for (uint32_t i = 0; i < ref->getHashSetCount(); ++i) {
00850                 generated.insert(ref->getHashSet(i));
00851         }
00852 
00853         uint16_t ok = 0; uint16_t failed = 0; uint16_t notfound = 0;
00854         for (Iter i = generated.begin(); i != generated.end(); ++i) {
00855                 const HashSetBase *hs = *i;
00856                 boost::format fmt("Generated hashset: %s %s %s: %s");
00857                 fmt % hs->getFileHashType();
00858                 fmt % (hs->getChunkSize() ? hs->getChunkHashType() : "");
00859                 if (hs->getChunkSize()) {
00860                         fmt % hs->getChunkSize();
00861                 } else {
00862                         fmt % "";
00863                 }
00864                 fmt % hs->getFileHash().decode();
00865                 logTrace(TRACE_PARTDATA, fmt.str());
00866                 for (uint32_t j = 0; j < hs->getChunkCnt(); ++j) {
00867                         boost::format fmt("[%2d] %s");
00868                         logTrace(TRACE_PARTDATA, fmt % j % (*hs)[j].decode());
00869                 }
00870                 boost::logic::tribool ret = verifyHashSet(hs);
00871                 if (ret) {
00872                         ++ok;
00873                 } else if (!ret) {
00874                         ++failed;
00875                 } else if (boost::indeterminate(ret)) {
00876                         ++notfound;
00877                 }
00878         }
00879 
00880         logTrace(TRACE_PARTDATA,
00881                 boost::format("Final rehash: %d ok, %d failed, %d not found")
00882                 % ok % failed % notfound
00883         );
00884         if (isComplete()) {
00885                 getEventTable().postEvent(this, PD_COMPLETE);
00886         }
00887 }
00888 boost::logic::tribool PartData::verifyHashSet(const HashSetBase *ref) {
00889         CHECK_THROW(m_md);
00890         CHECK_THROW(ref);
00891         for (uint32_t j = 0; j < m_md->getHashSetCount(); ++j) {
00892                 const HashSetBase *orig = m_md->getHashSet(j);
00893                 if (ref->getFileHashTypeId() != orig->getFileHashTypeId()) {
00894                         continue;
00895                 }
00896                 if (ref->getChunkHashTypeId() != orig->getChunkHashTypeId()) {
00897                         continue;
00898                 }
00899                 if (ref->getChunkCnt() != orig->getChunkCnt()) {
00900                         return boost::indeterminate;
00901                 }
00902                 if (ref->getChunkCnt() == orig->getChunkCnt() == 0) {
00903                         if (ref->getFileHash() == orig->getFileHash()) {
00904                                 m_corrupt.erase(0, m_size - 1);
00905                                 m_complete.merge(0, m_size - 1);
00906                                 return true;
00907                         } else {
00908                                 m_complete.erase(0, m_size - 1);
00909                                 m_corrupt.merge(0, m_size - 1);
00910                                 return false;
00911                         }
00912                 }
00913                 bool failed = false;
00914                 for (uint32_t i = 0; i < ref->getChunkCnt(); ++i) {
00915                         uint64_t beg = ref->getChunkSize() * i;
00916                         uint64_t end = beg + ref->getChunkSize();
00917                         if (end > m_size - 1) {
00918                                 end = m_size - 1;
00919                         }
00920                         if ((*ref)[i] != ((*orig)[i])) {
00921                                 boost::format fmt(
00922                                         "Final rehash, chunkhash %d..%d failed:"
00923                                         " Generated hash %s != real hash %s"
00924                                 );
00925                                 fmt % beg % end % (*ref)[i].decode();
00926                                 fmt % (*orig)[i].decode();
00927                                 logError(fmt);
00928                                 m_complete.erase(beg, end);
00929                                 m_corrupt.merge(beg, end);
00930                                 failed = true;
00931                         } else {
00932                                 m_complete.merge(beg, end);
00933                                 m_corrupt.erase(beg, end);
00934                         }
00935                 }
00936                 return failed;
00937         }
00938         return boost::indeterminate;
00939 }
00940 void PartData::setMetaData(MetaData *md) {
00941         CHECK_THROW(md);
00942         m_md = md;
00943         for (uint32_t i = 0; i < m_md->getHashSetCount(); ++i) {
00944                 HashSetBase *hs = m_md->getHashSet(i);
00945                 if (hs->getChunkSize() && hs->getChunkCnt()) {
00946                         addHashSet(m_md->getHashSet(i));
00947                 }
00948         }
00949 }
00950 void PartData::doComplete() {
00951         CHECK_THROW(isComplete());
00952         save();
00953         HashWorkPtr p(new HashWork(m_loc.string()));
00954         HashWork::getEventTable().addHandler(p, this, &PartData::onHashEvent);
00955         WorkThread::instance().postWork(p);
00956         m_fullJob = p;
00957 }
00958 void PartData::save() try {
00959         flushBuffer();
00960         boost::filesystem::path p(m_loc.string() + ".dat");
00961         if (exists(p)) {
00962                 if (boost::filesystem::exists(p.string() + ".bak")) {
00963                         boost::filesystem::remove(p.string() + ".bak");
00964                 }
00965                 boost::filesystem::rename(p, p.string() + ".bak");
00966         }
00967         std::ofstream ofs(p.string().c_str(), std::ios::binary);
00968         CHECK_THROW(ofs);
00969         ofs << *this;
00970         if (m_md) {
00971                 ofs << *m_md;
00972         }
00973 } catch (std::exception &e) {
00974         logError(
00975                 boost::format("Error saving temp file: %s") % e.what()
00976         );
00977 }
00978 MSVC_ONLY(;)
00979 
00980 std::ostream& operator<<(std::ostream &o, const PartData &p) {
00981         Utils::putVal<uint8_t>(o, OP_PARTDATA);
00982         std::ostringstream tmp;
00983         Utils::putVal<uint8_t>(tmp, OP_PD_VER);
00984         Utils::putVal<uint64_t>(tmp, p.m_size);
00985         Utils::putVal<uint16_t>(tmp, 2); // tagcount
00986         Utils::putVal<uint8_t>(tmp, OP_PD_DESTINATION);
00987         Utils::putVal<uint16_t>(tmp, p.m_dest.string().size() + 2);
00988         Utils::putVal(tmp, p.m_dest.string());
00989         Utils::putVal<uint8_t>(tmp, OP_PD_COMPLETED);
00990         std::ostringstream tmp2;
00991         tmp2 << p.m_complete;
00992         Utils::putVal(tmp, tmp2.str());
00993         Utils::putVal(o, tmp.str());
00994         return o;
00995 }
00996 uint64_t PartData::getCompleted() const {
00997         uint64_t tmp = 0;
00998         for_each(
00999                 m_complete.begin(), m_complete.end(),
01000                 tmp += bind(&Range64::length, __1)
01001         );
01002         return tmp;
01003 }
01004 
01005 // for debugging purposes only
01006 void PartData::printCompleted() {
01007         float perc = getCompleted() * 100.0 / getSize();
01008         static const uint32_t width = 74;
01009         logTrace(TRACE_PARTDATA,
01010                 boost::format("/%s\\") % std::string(width - 2, '-')
01011         );
01012         std::string buf("[          ]");
01013         for (uint32_t i = 0; i < 10; ++i) {
01014                 if (perc > i * 10 && perc < (i + 1) * 10) {
01015                         uint8_t c = static_cast<uint8_t>(perc) % 10;
01016                         if (c > 0 && c < 4) {
01017                                 buf[i + 1] = '.';
01018                         } else if (c >= 4 && c < 7) {
01019                                 buf[i + 1] = '-';
01020                         } else if (c >= 7 && c < 10) {
01021                                 buf[i + 1] = '|';
01022                         }
01023                 } else if (perc && perc >= i * 10) {
01024                         buf[i + 1] = '#';
01025                 }
01026         }
01027 
01028         boost::format fmtname("| Name: " COL_GREEN "%s" COL_NONE "%|73t||");
01029         std::string filename = getDestination().leaf();
01030         if (filename.size() > width - 10u) {
01031                 std::string beg = filename.substr(0, width - 20);
01032                 std::string end = filename.substr(filename.size() - 4);
01033                 filename = beg + "[...]" + end;
01034         }
01035         fmtname % filename;
01036         logTrace(TRACE_PARTDATA, fmtname);
01037         boost::format fmt("| Complete: %s %5.2f%% Size: %s / %d bytes%|73t||");
01038         fmt % buf % perc % Utils::bytesToString(getSize()) % getSize();
01039         logTrace(TRACE_PARTDATA, fmt);
01040         for (RangeList64::CIter i = m_complete.begin();i!=m_complete.end();++i){
01041                 boost::format fmt("| Complete range: %d -> %d %|73t||");
01042                 logTrace(TRACE_PARTDATA, fmt % (*i).begin() % (*i).end());
01043         }
01044         logTrace(TRACE_PARTDATA,
01045                 boost::format("\\%s/") % std::string(width - 2, '-')
01046         );
01047 }
01048 
01049 void PartData::destroy() {
01050         getEventTable().safeDelete(this, PD_DESTROY);
01051 }
01052 
01053 void PartData::cancelDownload() {
01054         deleteFiles();
01055         destroy();
01056 }
01057 
01058 void PartData::deleteFiles() {
01059         using namespace boost::filesystem;
01060 
01061         if (!m_loc.empty()) {
01062                 if (exists(m_loc)) try {
01063                         remove(m_loc);
01064                 } catch (std::exception &e) {
01065                         logDebug(
01066                                 boost::format("Deleting file %s: %s") % e.what()
01067                         );
01068                 }
01069 
01070                 path tmp(m_loc.string() + ".dat", native);
01071                 if (exists(tmp)) try {
01072                         remove(tmp);
01073                 } catch (std::exception &e) {
01074                         logDebug(
01075                                 boost::format("Deleting file %s: %s") % e.what()
01076                         );
01077                 }
01078 
01079                 tmp = path(tmp.string() + ".bak", native);
01080                 if (exists(tmp)) try {
01081                         remove(tmp);
01082                 } catch (std::exception &e) {
01083                         logDebug(
01084                                 boost::format("Deleting file %s: %s") % e.what()
01085                         );
01086                 }
01087         }
01088 }