schedbase.cpp

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 #include <hn/hnprec.h>
00020 #include <hn/schedbase.h>
00021 #include <hn/gettickcount.h>
00022 #include <hn/utils.h>               // for bytesToString
00023 #include <hn/lambda_placeholders.h>
00024 #include <hn/prefs.h>
00025 #include <hn/ipv4addr.h>
00026 #include <boost/multi_index_container.hpp>
00027 #include <boost/multi_index/ordered_index.hpp>
00028 #include <boost/multi_index/key_extractors.hpp>
00029 
00030 namespace Detail {
00031         //! Unary function object for extracting a requests score
00032         template<typename T>
00033         struct ScoreExtractor {
00034                 typedef float result_type;
00035                 float operator()(const T &x) {
00036                         return x->getScore();
00037                 }
00038         };
00039 
00040         //! Indexes for request maps
00041         template<typename T>
00042         struct RequestIndex : boost::multi_index::indexed_by<
00043                 boost::multi_index::ordered_unique<
00044                         boost::multi_index::identity<T>
00045                 >,
00046                 boost::multi_index::ordered_non_unique<
00047                         ScoreExtractor<T>
00048                 >
00049         > {};
00050         struct UploadReqMap : boost::multi_index_container<
00051                 SchedBase::UploadReqBase*,
00052                 RequestIndex<SchedBase::UploadReqBase*>
00053         > {};
00054         struct DownloadReqMap : boost::multi_index_container<
00055                 SchedBase::DownloadReqBase*,
00056                 RequestIndex<SchedBase::DownloadReqBase*>
00057         > {};
00058         struct ConnReqMap : boost::multi_index_container<
00059                 SchedBase::ConnReqBase*,
00060                 RequestIndex<SchedBase::ConnReqBase*>
00061         > {};
00062 }
00063 using namespace Detail;
00064 
00065 SchedBase::ReqBase::ReqBase(float score) : m_score(score), m_valid(true) {}
00066 SchedBase::ReqBase::~ReqBase() {}
00067 SchedBase::UploadReqBase::UploadReqBase(float score) : ReqBase(score) {}
00068 SchedBase::UploadReqBase::~UploadReqBase() {}
00069 SchedBase::DownloadReqBase::DownloadReqBase(float score) : ReqBase(score) {}
00070 SchedBase::DownloadReqBase::~DownloadReqBase() {}
00071 SchedBase::ConnReqBase::ConnReqBase(float score) : ReqBase(score) {}
00072 SchedBase::ConnReqBase::~ConnReqBase() {}
00073 
00074 
00075 /**
00076  * Utility function, which performs sets clearing and notifications sending.
00077  *
00078  * @param toRemove       Set containing requests to be removed/deleted
00079  * @param toNotify       Set containing requests to be notified
00080  * @param cont           Main container to remove the toRemove elements from
00081  */
00082 template<typename T, typename D>
00083 void clearAndNotify(T &toRemove, T &toNotify, D &cont) {
00084         typedef typename T::iterator Iter;
00085         for (Iter i = toRemove.begin(); i != toRemove.end(); ++i) {
00086                 if (toNotify.find(*i) != toNotify.end()) {
00087                         (*i)->notify();
00088                         toNotify.erase(*i);
00089                 }
00090                 if (!(*i)->isValid()) {
00091                         cont.erase(*i);
00092                         delete *i;
00093                 }
00094         }
00095         for (Iter i = toNotify.begin(); i != toNotify.end(); ++i) {
00096                 CHECK((*i)->isValid());
00097                 (*i)->notify();
00098         }
00099 }
00100 
00101 // SchedBase class
00102 // ---------------
00103 SchedBase::SchedBase() : m_uploadReqs(new UploadReqMap),
00104 m_downloadReqs(new DownloadReqMap), m_connReqs(new ConnReqMap),
00105 m_upLimit(25*1024), m_downLimit(std::numeric_limits<uint32_t>::max()),
00106 m_connLimit(300), m_connCnt(), m_totalUp(), m_totalDown(), m_lastDownReset(),
00107 m_lastUpReset(), m_recentDown(), m_recentUp() {
00108         Prefs::instance().setPath("/");
00109         m_upLimit = Prefs::instance().read<uint32_t>("UpSpeedLimit", 25*1024);
00110         m_downLimit = Prefs::instance().read<uint32_t>("DownSpeedLimit", 0);
00111         m_connLimit = Prefs::instance().read<uint32_t>("ConnectionLimit", 300);
00112 
00113         logMsg(
00114                 boost::format(
00115                         "Networking scheduler started: UpSpeedLimit: "
00116                         "%s/s DownSpeedLimit: %s/s"
00117                 ) % Utils::bytesToString(m_upLimit)
00118                 % Utils::bytesToString(m_downLimit)
00119         );
00120         if (m_downLimit == 0) {
00121                 m_downLimit = std::numeric_limits<uint32_t>::max();
00122         }
00123         if (m_upLimit == 0) {
00124                 m_upLimit = std::numeric_limits<uint32_t>::max();
00125         }
00126         if (m_connLimit == 0) {
00127                 m_connLimit = 300;
00128         }
00129 
00130         // no-limit ip addresses;
00131         //! \todo Move NoLimit IP address list to some config file
00132         m_noSpeedLimit.push(Range32(Socket::makeAddr("127.0.0.1")));
00133         uint32_t ipLow = Socket::makeAddr("192.168.0.0");
00134         uint32_t ipHigh = Socket::makeAddr("192.168.0.255");
00135         m_noSpeedLimit.push(SWAP32_ON_LE(ipLow), SWAP32_ON_LE(ipHigh));
00136 }
00137 
00138 SchedBase::~SchedBase() {}
00139 SchedBase& SchedBase::instance() {
00140         static SchedBase sched;
00141         return sched;
00142 }
00143 
00144 void SchedBase::exit() {
00145         if (m_upLimit == std::numeric_limits<uint32_t>::max()) {
00146                 m_upLimit = 0;
00147         }
00148         if (m_downLimit == std::numeric_limits<uint32_t>::max()) {
00149                 m_downLimit = 0;
00150         }
00151 
00152         Prefs::instance().setPath("/");
00153         Prefs::instance().write<uint32_t>("UpSpeedLimit", m_upLimit);
00154         Prefs::instance().write<uint32_t>("DownSpeedLimit", m_downLimit);
00155         Prefs::instance().write<uint32_t>("ConnectionLimit", m_connLimit);
00156 
00157         logDebug(
00158                 boost::format("Scheduler: UpLimit: %s DownLimit: %s")
00159                 % m_upLimit % m_downLimit
00160         );
00161         logMsg(
00162                 boost::format("Total uploaded: %s Total downloaded: %s")
00163                 % Utils::bytesToString(getTotalUpstream())
00164                 % Utils::bytesToString(getTotalDownstream())
00165         );
00166 }
00167 
00168 // Iterate on download requests
00169 void SchedBase::handleDownloads() {
00170         typedef DownloadReqMap::nth_index<1>::type ScoreIndex;
00171         typedef ScoreIndex::iterator DIter;
00172 
00173         std::set<DownloadReqBase*> toRemove;
00174         std::set<DownloadReqBase*> toNotify;
00175 
00176         uint32_t pendingReqs = m_downloadReqs->size();
00177         ScoreIndex &scoreIndex = m_downloadReqs->get<1>();
00178 
00179         for (DIter i = scoreIndex.begin(); i != scoreIndex.end(); ++i) {
00180                 if (!(*i)->isValid()) {
00181                         toRemove.insert(*i);
00182                         --pendingReqs;
00183                         continue;
00184                 }
00185                 if (!pendingReqs) {
00186                         logDebug(
00187                                 "Scheduler::handleDownloads(): pendingReqs == 0!"
00188                                 " This shouldn't happen."
00189                         );
00190                         break;
00191                 }
00192 
00193                 uint32_t amount = getFreeDown() / pendingReqs;
00194                 if (!amount) { // don't attempt to recv 0 bytes
00195                         break;
00196                 }
00197 
00198                 uint32_t ret = 0;
00199                 try {
00200                         ret = (*i)->doRecv(amount);
00201                 } catch (std::exception &e) {
00202                         error(boost::format("doRecv(%d)") % amount, e.what());
00203                 }
00204                 addDownStream(ret);
00205 
00206                 if (ret < amount) {
00207                         toRemove.insert(*i);
00208                         (*i)->invalidate();
00209                 }
00210                 toNotify.insert(*i);
00211                 --pendingReqs;
00212         }
00213 
00214         clearAndNotify(toRemove, toNotify, *m_downloadReqs);
00215 }
00216 
00217 // Iterate on upload requets
00218 void SchedBase::handleUploads() {
00219         typedef UploadReqMap::nth_index<1>::type ScoreIndex;
00220         typedef ScoreIndex::iterator UIter;
00221 
00222         std::set<UploadReqBase*> toRemove;
00223         std::set<UploadReqBase*> toNotify;
00224 
00225         uint32_t pendingReqs = m_uploadReqs->size();
00226         ScoreIndex &scoreIndex = m_uploadReqs->get<1>();
00227 
00228         for (UIter i = scoreIndex.begin(); i != scoreIndex.end(); ++i) {
00229                 if (!(*i)->isValid()) {
00230                         toRemove.insert(*i);
00231                         --pendingReqs;
00232                         continue;
00233                 }
00234                 if (!pendingReqs) {
00235                         logDebug(
00236                                 "Scheduler::handleUploads(): pendingReqs == 0!"
00237                                 " This shouldn't happen."
00238                         );
00239                         break;
00240                 }
00241 
00242                 uint32_t amount = getFreeUp() / pendingReqs;
00243                 if (!amount) {
00244                         break;
00245                 } else if (amount > 100*1024) {
00246                         amount = 100*1024;
00247                 }
00248                 uint32_t ret = 0;
00249                 try {
00250                         ret = (*i)->doSend(amount);
00251                 } catch (std::exception &e) {
00252                         error(
00253                                 boost::format("doSend(%d)") % getFreeUp(),
00254                                 e.what()
00255                         );
00256                 }
00257                 addUpStream(ret);
00258 
00259                 if ((*i)->getPending() == 0) {
00260                         toRemove.insert(*i);
00261                         (*i)->invalidate();
00262                 }
00263 
00264                 --pendingReqs;
00265         }
00266 
00267         clearAndNotify(toRemove, toNotify, *m_uploadReqs);
00268 }
00269 
00270 // grant connections up to the connection limit
00271 void SchedBase::handleConnections() {
00272         typedef ConnReqMap::nth_index<1>::type ScoreIndex;
00273         typedef ScoreIndex::iterator CIter;
00274 
00275         std::set<ConnReqBase*> toRemove;
00276         std::set<ConnReqBase*> toNotify;
00277 
00278         ConnReqMap::nth_index<1>::type &scoreIndex = m_connReqs->get<1>();
00279 
00280         for (CIter i = scoreIndex.begin(); i != scoreIndex.end(); ++i) {
00281                 if (!(*i)->isValid()) {
00282                         toRemove.insert(*i);
00283                         logTrace(TRACE_SCHED, "Discarding invalid request.");
00284                         continue;
00285                 }
00286                 if (getConnection()) try {
00287                         int ret = (*i)->doConn();
00288                         if (ret & ConnReqBase::NOTIFY) {
00289                                 toNotify.insert(*i);
00290                         }
00291                         if (ret & ConnReqBase::REMOVE) {
00292                                 toRemove.insert(*i);
00293                                 (*i)->invalidate();
00294                         }
00295                         if (ret & ConnReqBase::ADDCONN) {
00296                                 ++m_connCnt;
00297                         }
00298                 } catch (std::exception &e) {
00299                         error(boost::format("doConn()"), e.what());
00300                 } else {
00301                         break;
00302                 }
00303         }
00304 
00305         clearAndNotify(toRemove, toNotify, *m_connReqs);
00306 }
00307 
00308 // main scheduler loop, called from global event table
00309 void SchedBase::handleEvents() {
00310         m_curTick = Utils::getTick();
00311         handleDownloads();
00312         handleUploads();
00313         handleConnections();
00314         boost::format fmt3(
00315                 "Upload: " COL_CYAN "%10s/s" COL_NONE
00316                 " | Download: " COL_GREEN "%10s/s" COL_NONE
00317                 " | Connections:" COL_YELLOW "%4s" COL_NONE
00318         );
00319         fmt3 % Utils::bytesToString(getUpSpeed());
00320         fmt3 % Utils::bytesToString(getDownSpeed()) % m_connCnt;
00321         std::cerr << '\r' << fmt3.str();
00322 }
00323 
00324 // get free downstream bandwidth by summing up last 100ms downstream data and
00325 // comparing it to downstream limit
00326 uint32_t SchedBase::getFreeDown() {
00327         return getDownLimit() - getDownSpeed();
00328 }
00329 
00330 // get free upstream bandwidth by summing up the last 100ms upstream data and
00331 // comparing it to upstream limit
00332 uint32_t SchedBase::getFreeUp() {
00333         return getUpLimit() - getUpSpeed();
00334 }
00335 
00336 bool SchedBase::getConnection() {
00337         return m_connCnt < m_connLimit;
00338 }
00339 
00340 /**
00341  * Adds amount to downstream data counter.
00342  *
00343  * Gaps longer than 900ms will result in a range of zeroes, so to avoid needless
00344  * list manipulation, we can simply clear the list if the gap becomes larger
00345  * than 900ms, which gives the same result, i.e 0.
00346  *
00347  * If the gap is more than a single 100ms interval, then the first 100ms
00348  * interval will contain the last "recent" value, whereas the next intervals
00349  * will be zero, as no adds occoured for them.
00350  *
00351  * 900ms of entries are kept, which together with the amount stored in
00352  * m_recentDown gives a timespan of 100ms to 1s. Is this to be desired? It will
00353  * give a a fluxuation in the rate calculation, which could perhaps cause
00354  * problems.
00355  */
00356 void SchedBase::addDownStream(uint32_t amount) {
00357         if (m_lastDownReset + 900 < m_curTick) {
00358                 m_downList.clear();
00359                 m_recentDown = amount;
00360                 m_lastDownReset += 100 * ((m_curTick - m_lastDownReset) / 100);
00361                 m_curDownSpeed = amount;
00362         } else if (m_lastDownReset + 100 < m_curTick) {
00363                 do {
00364                         m_downList.push_back(m_recentDown);
00365                         m_lastDownReset += 100;
00366                         m_recentDown = 0;
00367                 } while (m_lastDownReset + 100 < m_curTick);
00368                 m_recentDown = amount;
00369                 m_curDownSpeed += amount;
00370                 while (m_downList.size() > 9) {
00371                         m_curDownSpeed -= m_downList.front();
00372                         m_downList.pop_front();
00373                 }
00374         } else {
00375                 m_recentDown += amount;
00376                 m_curDownSpeed += amount;
00377         }
00378         m_totalDown += amount;
00379 }
00380 
00381 //! Refer to addDownStream for docs
00382 void SchedBase::addUpStream(uint32_t amount) {
00383         if (m_lastUpReset + 900 < m_curTick) {
00384                 m_upList.clear();
00385                 m_recentUp = amount;
00386                 m_lastUpReset += 100 * ((m_curTick - m_lastUpReset) / 100);
00387                 m_curUpSpeed = amount;
00388         } else if (m_lastUpReset + 100 < m_curTick) {
00389                 do {
00390                         m_upList.push_back(m_recentUp);
00391                         m_lastUpReset += 100;
00392                         m_recentUp = 0;
00393                 } while (m_lastUpReset + 100 < m_curTick);
00394                 m_recentUp = amount;
00395                 m_curUpSpeed += amount;
00396                 while (m_upList.size() > 9) {
00397                         m_curUpSpeed -= m_upList.front();
00398                         m_upList.pop_front();
00399                 }
00400         } else {
00401                 m_recentUp += amount;
00402                 m_curUpSpeed += amount;
00403         }
00404         m_totalUp += amount;
00405 }
00406 
00407 uint32_t SchedBase::getDownSpeed() {
00408         addDownStream(0); // forces speed update
00409         return m_curDownSpeed;
00410 }
00411 uint32_t SchedBase::getUpSpeed() {
00412         addUpStream(0); // forces speed update
00413         return m_curUpSpeed;
00414 }
00415 
00416 void SchedBase::error(const boost::format &pre, const std::string &what) {
00417         logDebug(boost::format("%s: %s") % pre % what);
00418 }
00419 
00420 size_t SchedBase::getConnReqCount()     const { return m_connReqs->size();     }
00421 size_t SchedBase::getUploadReqCount()   const { return m_uploadReqs->size();   }
00422 size_t SchedBase::getDownloadReqCount() const { return m_downloadReqs->size(); }
00423 void SchedBase::addUploadReq(UploadReqBase *r)  { m_uploadReqs->insert(r);   }
00424 void SchedBase::addDloadReq(DownloadReqBase *r) { m_downloadReqs->insert(r); }
00425 void SchedBase::addConnReq(ConnReqBase *r)      { m_connReqs->insert(r);     }