sockets.cpp

Go to the documentation of this file.
00001 /**
00002  *  Copyright (C) 2004-2005 Alo Sarv <madcat_@users.sourceforge.net>
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; either version 2 of the License, or
00007  *  (at your option) any later version.
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  You should have received a copy of the GNU General Public License
00015  *  along with this program; if not, write to the Free Software
00016  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00017  */
00018 
00019 /** \file sockets.cpp Implementation of HydraNode Sockets API */
00020 
00021 #include <hn/hnprec.h>
00022 #include <hn/sockets.h>
00023 #include <hn/log.h>
00024 #include <stdexcept>
00025 
00026 #ifdef WIN32
00027         #include <winsock2.h>
00028         typedef SOCKADDR sockaddr;
00029         typedef int socklen_t;
00030 #else
00031         #include <sys/socket.h>
00032         #include <sys/un.h>
00033         #include <fcntl.h>
00034         #include <netinet/in.h>
00035         #include <arpa/inet.h>
00036         #include <errno.h>
00037 #endif
00038 
00039 #ifndef MSG_NOSIGNAL
00040         #define MSG_NOSIGNAL 0
00041 #endif
00042 
00043 /**
00044  * Socket error codes
00045  * Define our internal socket error codes from platform-specific codes for
00046  * simplicity. If platform misses some of the errors - no prob, just assign
00047  * some negative value to it.
00048  */
00049 #ifdef WIN32
00050         enum Socket_Errors {
00051                 SOCK_EUNKNOWN       = 0,
00052                 SOCK_EINETDOWN      = WSAENETDOWN,
00053                 SOCK_ECONNREFUSED   = WSAECONNREFUSED,
00054                 SOCK_EWOULDBLOCK    = WSAEWOULDBLOCK,
00055                 SOCK_EWINSOCK       = -1, // WSANOTINITIALIZED
00056                 SOCK_EINVALIDSOCKET = WSAENOTSOCK,
00057                 SOCK_ECONNRESET     = WSAECONNRESET,
00058                 SOCK_ECONNFAILED    = WSAECONNREFUSED,
00059                 SOCK_EACCESS        = WSAEACCES,
00060                 SOCK_EPERM          = -2,
00061                 SOCK_EADDRINUSE     = WSAEADDRINUSE,
00062                 SOCK_EADDRNOTAVAIL  = WSAEADDRNOTAVAIL,
00063                 SOCK_EINVALIDPARAM  = WSAEFAULT,
00064                 SOCK_EINPROGRESS    = WSAEINPROGRESS,
00065                 SOCK_EALREADYBOUND  = WSAEINVAL,
00066                 SOCK_ETOOMANYCONN   = WSAENOBUFS,
00067                 SOCK_ENOTSOCK       = WSAENOTSOCK,
00068                 SOCK_EAGAIN         = WSAEWOULDBLOCK
00069         };
00070 #else
00071         enum Socket_Errors {
00072                 SOCK_EUNKNOWN       = 0,
00073                 SOCK_EINETDOWN      = -1,
00074                 SOCK_ECONNREFUSED   = ECONNREFUSED,
00075                 SOCK_EWOULDBLOCK    = EWOULDBLOCK,
00076                 SOCK_EWINSOCK       = -2,
00077                 SOCK_EINVALIDSOCKET = ENOTSOCK,
00078                 SOCK_ECONNRESET     = ECONNRESET,
00079                 SOCK_ECONNFAILED    = ECONNREFUSED,
00080                 SOCK_EACCESS        = EACCES,
00081                 SOCK_EPERM          = EPERM,
00082                 SOCK_EADDRINUSE     = EADDRINUSE,
00083                 SOCK_EADDRNOTAVAIL  = EADDRNOTAVAIL,
00084                 SOCK_EINVALIDPARAM  = EINVAL,
00085                 SOCK_EINPROGRESS    = EINPROGRESS,
00086                 SOCK_EALREADYBOUND  = SOCK_EADDRINUSE,
00087                 SOCK_ETOOMANYCONN   = EMFILE,
00088                 SOCK_ENOTSOCK       = ENOTSOCK,
00089                 SOCK_EAGAIN         = EAGAIN
00090         };
00091 #endif
00092 
00093 // Exception class constructor
00094 SocketError::SocketError(const std::string &err) : std::runtime_error(err) {
00095 }
00096 // Exception class destructor
00097 SocketError::~SocketError() throw() {
00098 }
00099 
00100 static void initSockets() {
00101 #ifdef WIN32
00102         static bool initialized = false;
00103         if (!initialized) {
00104                 WSADATA wsaData;
00105                 if (WSAStartup(MAKEWORD(2,2), &wsaData) != NO_ERROR) {
00106                         throw SocketError("WinSock Init failed.");
00107                 }
00108         }
00109 #endif
00110 }
00111 
00112 static void setNonBlocking(SOCKET s) {
00113 #ifdef WIN32
00114         int iMode = 1;
00115         ioctlsocket(s, FIONBIO, (u_long FAR*) &iMode);
00116 #else
00117         fcntl(s, F_SETFL, O_NONBLOCK);
00118 #endif
00119 }
00120 
00121 static void setReUsable(SOCKET s) {
00122 #ifndef WIN32
00123         // Allow reusing addresses to avoid ::bind causing ADDR_IN_USE errors
00124         int yes = 1;
00125         int ret = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
00126         if (ret == -1) {
00127                 throw SocketError("SocketServer::SocketServer: setsockopt()");
00128         }
00129 #endif
00130 }
00131 
00132 static std::string getErrorStr(int err) {
00133 #ifdef WIN32
00134         switch (err) {
00135                 case SOCK_EWINSOCK:
00136                         return "WinSock error";
00137                         break;
00138                 case SOCK_EINETDOWN:
00139                         return "Network connection is down";
00140                         break;
00141                 case SOCK_EACCESS:
00142                         return "Access denied";
00143                         break;
00144                 case SOCK_EADDRINUSE:
00145                         return "Address already in use";
00146                         break;
00147                 case SOCK_EADDRNOTAVAIL:
00148                         return "Address not available";
00149                         break;
00150                 case SOCK_EINVALIDPARAM:
00151                         return "Invalid parameter";
00152                         break;
00153                 case SOCK_EINPROGRESS:
00154                         return "Operation in progress";
00155                         break;
00156                 case SOCK_EALREADYBOUND:
00157                         return "Already bound";
00158                         break;
00159                 case SOCK_ETOOMANYCONN:
00160                         return "Too many connections";
00161                         break;
00162                 case SOCK_ENOTSOCK:
00163                         return "Not a socket";
00164                         break;
00165                 case SOCK_ECONNREFUSED:
00166                         return "Connection refused";
00167                         break;
00168                 case SOCK_EWOULDBLOCK:
00169                         return "Operation would block";
00170                         break;
00171                 case SOCK_EUNKNOWN:
00172                         return "Unknown error";
00173                         break;
00174                 default:
00175                         return "Unknown error";
00176                         break;
00177         }
00178         return "";
00179 #else
00180         return strerror(err);
00181 #endif
00182 }
00183 
00184 static int getLastError() {
00185 #ifdef WIN32
00186         return WSAGetLastError();
00187 #else
00188         return errno;
00189 #endif
00190 }
00191 
00192 static std::string socketError(const std::string &msg) {
00193         logDebug(boost::format("%s%s") % msg % getErrorStr(getLastError()));
00194         return getErrorStr(getLastError());
00195 }
00196 
00197 #ifndef WIN32
00198         #define closesocket(socket) ::close(socket)
00199 #endif
00200 
00201 // SocketBase class
00202 // ----------------
00203 SocketBase::SocketBase() : m_socket(0), m_toDelete(false), m_priority(PR_NORMAL)
00204 {}
00205 SocketBase::SocketBase(SOCKET s) : m_socket(s), m_toDelete(false),
00206 m_priority(PR_NORMAL) {}
00207 SocketBase::~SocketBase() {}
00208 
00209 // SocketClient class
00210 // ------------------
00211 IMPLEMENT_EVENT_TABLE(SocketClient, SocketClient*, SocketEvent);
00212 #ifndef WIN32
00213 static const int INVALID_SOCKET = -1;
00214 static const int SOCKET_ERROR   = -1;
00215 #endif
00216 
00217 // Construct and initialize
00218 SocketClient::SocketClient(SCEventHandler ehandler /* = 0 */)
00219 : m_connected(false), m_hasData(false), m_connecting(false), m_erronous(false),
00220 m_writable(false), m_timeout() {
00221         initSockets();
00222 
00223         logTrace(TRACE_SOCKET, "Creating SocketClient");
00224 
00225         if (ehandler) {
00226                 getEventTable().addHandler(this, ehandler);
00227         }
00228 }
00229 
00230 SocketClient::SocketClient(SOCKET s) : SocketBase(s), m_connected(false),
00231 m_hasData(false), m_connecting(false), m_erronous(false), m_writable(false),
00232 m_timeout() {}
00233 
00234 // Destructor. Do only cleanup here - remove event handlers (if there are any),
00235 // try to disconnect (but make sure we handle the errors too if that fails).
00236 SocketClient::~SocketClient() {
00237         getEventTable().delHandler(this);
00238 }
00239 
00240 void SocketClient::destroy() {
00241         close();
00242         m_toDelete = true;
00243 }
00244 
00245 void SocketClient::connect(IPV4Address addr, uint32_t timeout) {
00246         if (m_connected) {
00247                 throw SocketError("connect(): Already connected.");
00248         } else if (m_erronous) {
00249                 throw SocketError("connect(): Socket is erronous.");
00250         } else if (m_connecting) {
00251                 throw SocketError("connect(): Already connecting.");
00252         } else if (addr.getPort() == 0) {
00253                 throw SocketError("connect(): Cannot connect to port 0.");
00254         }
00255 
00256         logTrace(TRACE_SOCKET, boost::format("SocketClient::connect(%s)") % addr);
00257 
00258         if (m_socket) {
00259                 close();
00260         }
00261 
00262         m_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00263         if (m_socket == INVALID_SOCKET) {
00264                 throw SocketError(
00265                         "SocketClient::SocketClient Error: Invalid socket."
00266                 );
00267         }
00268 
00269         setNonBlocking(m_socket);
00270 
00271         sockaddr_in sin;
00272         sin.sin_family = AF_INET;
00273         sin.sin_addr.s_addr = addr.getAddr();
00274         sin.sin_port = htons(addr.getPort());
00275         int ret = ::connect(m_socket, (sockaddr*)&sin, sizeof(sin));
00276 
00277         // Filter out non-fatal errors (e.g. related to nonblocking sockets
00278         // usage)
00279         if (
00280                 ret == SOCKET_ERROR
00281                 && getLastError() != SOCK_EINPROGRESS // not fatal
00282                 && getLastError() != SOCK_EWOULDBLOCK // not fatal
00283         ) {
00284                 throw SocketError(socketError("connect(): "));
00285         }
00286         m_connecting = true;
00287         SocketWatcher::addSocket(this);
00288 
00289         if (timeout) {
00290                 setTimeout(timeout);
00291         }
00292 
00293         m_peer = addr;
00294 }
00295 
00296 void SocketClient::close() {
00297         if (m_connecting || m_connected) {
00298                 logTrace(TRACE_SOCKET,
00299                         boost::format("Closing socket %s") % m_socket
00300                 );
00301                 if (closesocket(m_socket) != 0) {
00302                         logWarning(socketError("Error closing socket: "));
00303                 }
00304                 SocketWatcher::removeSocket(this);
00305         }
00306         m_connecting = false;
00307         m_connected = false;
00308         m_timeout = 0;
00309 }
00310 
00311 // Disconnect
00312 void SocketClient::disconnect() {
00313         logTrace(TRACE_SOCKET, "Disconnecting SocketClient.");
00314         close();
00315         m_peer = IPV4Address();     //!< Reset peer
00316 }
00317 
00318 // Read data from socket
00319 uint32_t SocketClient::read(void *buffer, uint32_t length) {
00320         if (!m_connected) {
00321                 throw SocketError(
00322                         "Attempt to read from a disconnected socket."
00323                 );
00324         } else if (m_connecting) {
00325                 throw SocketError("Attempt to read from a connecting socket.");
00326         } else if (m_erronous) {
00327                 throw SocketError("Attempt to read from an erronous socket.");
00328         }
00329         int ret = ::recv(
00330                 m_socket, reinterpret_cast<char*>(buffer), length, MSG_NOSIGNAL
00331         );
00332         m_hasData = false;
00333         if (ret == SOCKET_ERROR) {
00334                 ret = 0;
00335                 if (getLastError() != SOCK_EAGAIN) {
00336                         close();
00337                         m_erronous = true;
00338                         m_connected = false;
00339                         m_connecting = false;
00340                         getEventTable().postEvent(this, SOCK_LOST);
00341                 }
00342         }
00343         return ret;
00344 }
00345 
00346 // Write data to socket
00347 uint32_t SocketClient::write(const char *buffer, uint32_t length) {
00348         if (!m_connected) {
00349                 throw SocketError("Attempt to write to a disconnected socket.");
00350         } else if (m_connecting) {
00351                 throw SocketError("Attempt to write to a connecting socket.");
00352         } else if (m_erronous) {
00353                 throw SocketError("Attempt to write to an erronous socket.");
00354         }
00355         int ret = ::send(m_socket, buffer, length, MSG_NOSIGNAL);
00356         m_writable = false;
00357         if (ret == SOCKET_ERROR) {
00358                 ret = 0;
00359                 if (getLastError() != SOCK_EAGAIN) {
00360                         close();
00361                         m_erronous = true;
00362                         m_connected = false;
00363                         m_connecting = false;
00364                         getEventTable().postEvent(this, SOCK_LOST);
00365                 }
00366         }
00367         return ret;
00368 }
00369 
00370 IPV4Address SocketClient::getAddr() const {
00371         sockaddr_in name;
00372         socklen_t sz = sizeof(name);
00373 
00374         int rv = getsockname(m_socket, reinterpret_cast<sockaddr*>(&name), &sz);
00375 
00376         if (rv != SOCKET_ERROR) {
00377                 return IPV4Address(name.sin_addr.s_addr, name.sin_port);
00378         } else {
00379                 return IPV4Address();
00380         }
00381 }
00382 
00383 // SocketServer class
00384 // ------------------
00385 IMPLEMENT_EVENT_TABLE(SocketServer, SocketServer*, SocketEvent);
00386 
00387 // Constructor
00388 SocketServer::SocketServer(SSEventHandler ehandler) : m_incoming(false),
00389 m_listening(false), m_erronous(false) {
00390         initSockets();
00391 
00392         logTrace(TRACE_SOCKET, "Constructing SocketServer");
00393         m_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00394         if (m_socket == INVALID_SOCKET) {
00395                 throw SocketError(
00396                         "SocketServer::SocketServer Error: Invalid socket."
00397                 );
00398         }
00399 
00400         setNonBlocking(m_socket);
00401         setReUsable(m_socket);
00402         if (ehandler) {
00403                 getEventTable().addHandler(this, ehandler);
00404         }
00405 }
00406 
00407 // Destructor
00408 SocketServer::~SocketServer() {
00409         logTrace(TRACE_SOCKET, "Destroying SocketServer");
00410         getEventTable().delHandler(this);
00411 }
00412 
00413 void SocketServer::destroy() {
00414         close();
00415         m_toDelete = true;
00416 }
00417 
00418 void SocketServer::close() {
00419         if (m_listening) {
00420                 logTrace(TRACE_SOCKET,
00421                         boost::format("Closing socket %s") % m_socket
00422                 );
00423                 if (closesocket(m_socket) != 0) {
00424                         logWarning(socketError("Error closing socket: "));
00425                 }
00426                 SocketWatcher::removeSocket(this);
00427         }
00428         m_listening = false;
00429 }
00430 
00431 // Set socket to listening state.
00432 void SocketServer::listen(IPV4Address addr) {
00433         if (m_listening) {
00434                 throw SocketError("listen(): Already listening.");
00435         } else if (m_erronous) {
00436                 throw SocketError("listen(): Socket is erronous.");
00437         } else if (addr.getPort() == 0) {
00438                 throw SocketError("listen(): Cannot listen on port 0.");
00439         }
00440         logTrace(TRACE_SOCKET,
00441                 boost::format("Starting listener on port %d") % addr.getPort()
00442         );
00443 
00444         sockaddr_in sin;
00445 
00446         memset(&sin, 0, sizeof(sin));
00447 
00448         sin.sin_family = AF_INET;
00449         if (addr.getAddr()) {
00450                 sin.sin_addr.s_addr = addr.getAddr();
00451         } else {
00452                 sin.sin_addr.s_addr = INADDR_ANY;
00453         }
00454         sin.sin_port = htons(addr.getPort());
00455         int retval = ::bind(
00456                 m_socket, reinterpret_cast<sockaddr*>(&sin), sizeof(sin)
00457         );
00458         if (retval == SOCKET_ERROR) {
00459                 throw SocketError(socketError("bind(): "));
00460         }
00461 
00462         if (::listen(m_socket, SOMAXCONN) == SOCKET_ERROR) {
00463                 throw SocketError(socketError("listen(): "));
00464         }
00465         m_listening = true;
00466         SocketWatcher::addSocket(this);
00467 
00468         // Set m_addr
00469         m_addr = addr;
00470         if (m_addr.getAddr() == 0) {
00471                 // This means we are listening on any available networks.
00472                 struct sockaddr name;
00473                 socklen_t sz = sizeof(name);
00474                 if (getsockname(m_socket, &name, &sz) == SOCKET_ERROR) {
00475                         // Ok, not fatal, just log an error
00476                         logDebug("getsockname() failed!");
00477                 } else {
00478                         m_addr.setAddr(name.sa_data);
00479                 }
00480         }
00481 }
00482 
00483 // accept an incoming connection, using handler to assign to new connections
00484 // events
00485 SocketClient* SocketServer::accept(SCEventHandler ehandler) {
00486         if (!m_listening) {
00487                 throw SocketError("accept(): Not listening.");
00488         } else if (!m_incoming) {
00489                 throw SocketError("accept(): No incoming connections.");
00490         } else if (m_erronous) {
00491                 throw SocketError("accept(): Socket is erronous.");
00492         }
00493 
00494         struct sockaddr_in addr;
00495         socklen_t sz = sizeof(addr);
00496         SOCKET sock = ::accept(
00497                 m_socket, reinterpret_cast<sockaddr*>(&addr), &sz
00498         );
00499 
00500         if (sock == INVALID_SOCKET) {
00501                 throw SocketError(socketError("accept(): "));
00502         }
00503 
00504         SocketClient *client = new SocketClient(sock);
00505         client->m_peer = IPV4Address(addr.sin_addr.s_addr, 0);
00506         client->m_connected = true;
00507         setNonBlocking(client->m_socket);
00508         SocketWatcher::addSocket(client);
00509 
00510         if (ehandler) {
00511                 SocketClient::getEventTable().addHandler(client, ehandler);
00512         }
00513 
00514         m_incoming = false;
00515 
00516         return client;
00517 }
00518 
00519 // UDPSocket class
00520 // ---------------
00521 IMPLEMENT_EVENT_TABLE(UDPSocket, UDPSocket*, SocketEvent);
00522 
00523 UDPSocket::UDPSocket(UDPSEventHandler handler) : m_listening(false),
00524 m_hasData(false), m_erronous(false) {
00525         logTrace(TRACE_SOCKET, "Constructing UDPSocket.");
00526 
00527         initSockets();
00528 
00529         m_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00530         if (m_socket == INVALID_SOCKET) {
00531                 throw SocketError("UDPSocket::UDPSocket: Invalid socket.");
00532         }
00533 
00534         setNonBlocking(m_socket);
00535         setReUsable(m_socket);
00536 
00537         if (handler) {
00538                 getEventTable().addHandler(this, handler);
00539         }
00540 }
00541 
00542 UDPSocket::~UDPSocket() {
00543         getEventTable().delHandler(this);
00544 }
00545 
00546 void UDPSocket::destroy() {
00547         if (m_listening) {
00548                 if (closesocket(m_socket) != 0) {
00549                         logDebug(socketError("Error closing socket: "));
00550                 }
00551                 SocketWatcher::removeSocket(this);
00552         }
00553         m_listening = false;
00554         m_toDelete = true;
00555 }
00556 
00557 void UDPSocket::listen(IPV4Address addr) {
00558         CHECK_THROW_MSG(addr.getPort(), "Listening port may not be NULL");
00559         logTrace(TRACE_SOCKET,
00560                 boost::format("Starting UDP listener at port %d")
00561                 % addr.getPort()
00562         );
00563 
00564         sockaddr_in sin;
00565         sin.sin_family = AF_INET;
00566         if (addr.getAddr()) {
00567                 sin.sin_addr.s_addr = addr.getAddr();
00568         } else {
00569                 sin.sin_addr.s_addr = INADDR_ANY;
00570         }
00571         sin.sin_port = htons(addr.getPort());
00572 
00573         int retval = ::bind(m_socket, (sockaddr*)&sin, sizeof(sin));
00574 
00575         // Set m_addr
00576         m_addr = addr;
00577         if (m_addr.getAddr() == 0) {
00578                 // This means we are listening on any available networks.
00579                 struct sockaddr name;
00580                 socklen_t sz = sizeof(name);
00581                 if (getsockname(m_socket, &name, &sz) == SOCKET_ERROR) {
00582                         // Ok, not fatal, just log an error
00583                         logDebug("getsockname() failed!");
00584                 } else {
00585                         m_addr.setAddr(name.sa_data);
00586                 }
00587         }
00588 
00589         if (retval == SOCKET_ERROR) {
00590                 m_erronous = true;
00591                 throw SocketError(socketError("bind(): "));
00592         }
00593         m_listening = true;
00594         SocketWatcher::addSocket(this);
00595 }
00596 
00597 uint32_t UDPSocket::recv(char *buf, uint32_t len, IPV4Address *from) {
00598         CHECK_THROW_MSG(m_listening, "Socket must listen before receiving.");
00599         CHECK_THROW_MSG(m_hasData, "Socket must have data before receiving.");
00600         CHECK_THROW_MSG(buf, "Cowardly refusing to recv into NULL buffer.");
00601         CHECK_THROW_MSG(len, "Cowardly refusing to receive NULL bytes.");
00602         CHECK_THROW_MSG(from,"Cowardly refusing to receive into NULL address.");
00603 
00604         sockaddr_in sin;
00605         socklen_t fromlen = sizeof(sin);
00606 
00607         memset(&sin, 0, sizeof(sin));
00608 
00609         int ret = recvfrom(
00610                 m_socket, buf, len, MSG_NOSIGNAL,
00611                 reinterpret_cast<sockaddr*>(&sin), &fromlen
00612         );
00613 
00614         if (ret == SOCKET_ERROR) {
00615                 if (getLastError() != SOCK_EAGAIN) {
00616                         throw SocketError(socketError("recvfrom(): "));
00617                 } else {
00618                         return 0;
00619                 }
00620         }
00621 
00622         from->setPort(ntohs(sin.sin_port));
00623         from->setAddr(sin.sin_addr.s_addr);
00624 
00625         CHECK(ret >= 0);
00626 
00627         m_hasData = false;
00628 
00629         return ret;
00630 }
00631 
00632 uint32_t UDPSocket::send(const std::string &data, IPV4Address to) {
00633         CHECK_THROW_MSG(to.getPort(), "Can't send to port NULL!");
00634         CHECK_THROW_MSG(to.getAddr(), "Can't send to NULL!");
00635         CHECK_THROW_MSG(data.size(), "Cowardly refusing to send NULL buffer.");
00636 
00637         sockaddr_in sin;
00638         memset(&sin, 0, sizeof(sin));
00639 
00640         sin.sin_family = AF_INET;
00641         sin.sin_addr.s_addr = to.getAddr();
00642         sin.sin_port = htons(to.getPort());
00643 
00644         logTrace(TRACE_SOCKET,
00645                 boost::format("UDPSocket: to.Addr=%s sin.addr=%d sin.port=%d")
00646                 % to % sin.sin_addr.s_addr % sin.sin_port
00647         );
00648 
00649         int ret = sendto(
00650                 m_socket, data.data(), data.size(), MSG_NOSIGNAL,
00651                 reinterpret_cast<sockaddr*>(&sin), sizeof(sin)
00652         );
00653 
00654         if (ret == SOCKET_ERROR) {
00655                 switch (getLastError()) {
00656                         case SOCK_EINVALIDPARAM:
00657                         case SOCK_EACCESS:
00658                         case SOCK_EPERM:
00659                                 logDebug(
00660                                         boost::format(
00661                                                 "UDPSocket(%s): sendto(%s): %s"
00662                                         )% getAddr() % to % socketError("")
00663                                 );
00664                                 break;
00665                         default:
00666                                 throw SocketError(socketError("sendto(): "));
00667                                 break;
00668                 }
00669         }
00670 
00671         return ret;
00672 }
00673 
00674 // SocketWatcher class - Performs sockets polling and events dispatching.
00675 // -------------------
00676 // constructors/destructors
00677 SocketWatcher::SocketWatcher() {
00678 //      Log::instance().enableTraceMask(TRACE_SOCKET, "socket");
00679 }
00680 SocketWatcher::~SocketWatcher() {}
00681 
00682 // Initialize socket API in platform-dependant way
00683 // This function is called during each socket construction. The only platforms
00684 // that are known to require explicit socket API initialization is win32.
00685 bool SocketWatcher::initialize() {
00686 #ifdef WIN32
00687         static bool initialized = false;
00688         if (initialized) {
00689                 return true;
00690         }
00691         WSADATA wsaData;
00692         if (WSAStartup(MAKEWORD(2, 2), &wsaData) != NO_ERROR) {
00693                 return false;
00694         }
00695         return initialized = true;
00696 #else
00697         return true;
00698 #endif
00699 }
00700 
00701 SocketWatcher& SocketWatcher::instance() {
00702         static SocketWatcher s;
00703         return s;
00704 }
00705 
00706 // Add a socket for polling
00707 void SocketWatcher::doAddSocket(SocketServer *socket) {
00708         CHECK_THROW(socket != 0);
00709         logTrace(
00710                 TRACE_SOCKET, boost::format("SocketWatcher: Adding socket %d")
00711                 % socket->getSocket()
00712         );
00713         m_servers[socket->getSocket()] = socket;
00714         m_sockets.insert(socket->getSocket());
00715 }
00716 
00717 // Add a socket for polling
00718 void SocketWatcher::doAddSocket(SocketClient *socket) {
00719         CHECK_THROW(socket != 0);
00720         logTrace(
00721                 TRACE_SOCKET, boost::format("SocketWatcher: Adding socket %d")
00722                 % socket->getSocket()
00723         );
00724         m_clients[socket->getSocket()] = socket;
00725         m_sockets.insert(socket->getSocket());
00726 }
00727 
00728 void SocketWatcher::doAddSocket(UDPSocket *socket) {
00729         CHECK_THROW(socket != 0);
00730         logTrace(
00731                 TRACE_SOCKET,
00732                 boost::format("SocketWatcher: Adding UDP socket %d")
00733                 % socket->getSocket()
00734         );
00735         m_udpSockets[socket->getSocket()] = socket;
00736         m_sockets.insert(socket->getSocket());
00737 }
00738 
00739 // Remove a socket from polled sockets list
00740 void SocketWatcher::doRemoveSocket(SocketServer *socket) {
00741         CHECK_THROW(socket);
00742         m_serversToRemove.push_back(socket);
00743 }
00744 void SocketWatcher::doRemoveSocket(SocketClient *socket) {
00745         CHECK_THROW(socket);
00746         m_clientsToRemove.push_back(socket);
00747 }
00748 void SocketWatcher::doRemoveSocket(UDPSocket *socket) {
00749         CHECK_THROW(socket);
00750         m_udpToRemove.push_back(socket);
00751 }
00752 void SocketWatcher::cleanupSockets() {
00753         uint32_t serversRemoved = 0;
00754         uint32_t clientsRemoved = 0;
00755         uint32_t udpRemoved = 0;
00756         while (m_serversToRemove.size()) {
00757                 SocketServer *toRemove = m_serversToRemove.back();
00758                 SSIter i = m_servers.find(toRemove->getSocket());
00759                 if (i != m_servers.end() && (*i).second == toRemove) {
00760                         m_servers.erase(i);
00761                         m_sockets.erase(toRemove->getSocket());
00762                 }
00763                 if (toRemove->toDelete()) {
00764                         delete toRemove;
00765                 }
00766                 m_serversToRemove.pop_back();
00767                 ++serversRemoved;
00768         }
00769         while (m_clientsToRemove.size()) {
00770                 SocketClient *toRemove = m_clientsToRemove.back();
00771                 SCIter i = m_clients.find(toRemove->getSocket());
00772                 if (i != m_clients.end() && (*i).second == toRemove) {
00773                         m_clients.erase(i);
00774                         m_sockets.erase(toRemove->getSocket());
00775                 }
00776                 if (toRemove->toDelete()) {
00777                         delete toRemove;
00778                 }
00779                 m_clientsToRemove.pop_back();
00780                 ++clientsRemoved;
00781         }
00782         while (m_udpToRemove.size()) {
00783                 UDPSocket *toRemove = m_udpToRemove.back();
00784                 SUIter i = m_udpSockets.find(toRemove->getSocket());
00785                 if (i != m_udpSockets.end() && (*i).second == toRemove) {
00786                         m_udpSockets.erase(i);
00787                         m_sockets.erase(toRemove->getSocket());
00788                 }
00789                 if (toRemove->toDelete()) {
00790                         delete toRemove;
00791                 }
00792                 m_udpToRemove.pop_back();
00793                 ++udpRemoved;
00794         }
00795         if (serversRemoved || clientsRemoved || udpRemoved) {
00796                 logTrace(TRACE_SOCKET, boost::format(
00797                         "SocketWatcher: Cleaning sockets. Removed %d TCP "
00798                         "Clients, %d TCP Servers and %d UDP sockets."
00799                 ) % clientsRemoved % serversRemoved % udpRemoved);
00800         }
00801 }
00802 
00803 // Poll listed sockets
00804 //
00805 // We only add sockets to the sets for which we are certain that the select()
00806 // operation wouldn't return immediately, e.g. if a socket already has incoming
00807 // data, we don't add it to the sets, since that would cause select() to return
00808 // instantly. The main reason for this is safety - if client doesn't read the
00809 // data out of a readable socket, or accept an incoming connection, we would
00810 // detect the socket readable again in next select(), re-post the event etc.
00811 // Uff. Better safe than sorry - once the data has been read out, the
00812 // SocketClient/SocketServer re-enable the flags so we'll start polling it
00813 // again.
00814 void SocketWatcher::doPoll() {
00815         fd_set rfds;
00816         fd_set wfds;
00817         fd_set efds;
00818         FD_ZERO(&rfds);
00819         FD_ZERO(&wfds);
00820         FD_ZERO(&efds);
00821         uint64_t curTick = Utils::getTick();
00822 
00823         cleanupSockets();
00824 
00825         // Temporary variable to keep track of highest-numbered socket,
00826         // needed later for passing to select().
00827         SOCKET highest = 0;
00828 
00829         // Add all servers. Note: Servers can't become writable.
00830         for (SSIter i = m_servers.begin(); i != m_servers.end(); ++i) {
00831                 // Only add the ones which don't have any incoming pending
00832                 if (!(*i).second->m_incoming) {
00833                         FD_SET((*i).first, &rfds);
00834                 }
00835                 // Only add those which are not erronous
00836                 if (!(*i).second->m_erronous) {
00837                         FD_SET((*i).first, &efds);
00838                 }
00839                 if ((*i).first > highest) {
00840                         highest = (*i).first;
00841                 }
00842         }
00843         // Add all clients
00844         for (SCIter i = m_clients.begin(); i != m_clients.end(); ++i) {
00845                 SocketClient *c = (*i).second;
00846                 if (c->m_timeout && c->m_timeout < curTick) {
00847                         // Timeout is over
00848                         SocketClient::getEventTable().postEvent(c,SOCK_TIMEOUT);
00849                         c->close();
00850                         c->m_timeout = 0;
00851                         continue;
00852                 }
00853 
00854                 // Only add the ones which don't have incoming data and are
00855                 // connected
00856                 if (!c->m_hasData && c->m_connected) {
00857                         FD_SET((*i).first, &rfds);
00858                 }
00859                 // Only add the ones which are in connecting/connected
00860                 // state
00861                 if ((c->m_connecting || c->m_connected) && !c->m_writable) {
00862                         FD_SET((*i).first, &wfds);
00863                 }
00864                 // Only add the ones that aren't erronous already
00865                 if (!c->m_erronous && c->m_connected) {
00866                         FD_SET((*i).first, &efds);
00867                 }
00868                 if ((*i).first > highest) {
00869                         highest = (*i).first;
00870                 }
00871         }
00872 
00873         // Add all UDP sockets
00874         for (SUIter i = m_udpSockets.begin(); i != m_udpSockets.end(); ++i) {
00875                 if (!(*i).second->m_hasData) {
00876                         FD_SET((*i).first, &rfds);
00877                 }
00878                 if (!(*i).second->m_erronous) {
00879                         FD_SET((*i).first, &efds);
00880                 }
00881         }
00882 
00883         cleanupSockets();
00884 
00885         timeval tv;
00886         tv.tv_sec = 0;
00887         tv.tv_usec = 50000;
00888 
00889         int ret = select(highest+1, &rfds, &wfds, &efds, &tv);
00890         if (ret > 0) {
00891 #ifdef WIN32
00892                 for (unsigned int i = 0; i < rfds.fd_count; i++) {
00893                         handleReadableSocket(rfds.fd_array[i]);
00894                 }
00895                 for (unsigned int i = 0; i < wfds.fd_count; i++) {
00896                         handleWritableSocket(wfds.fd_array[i]);
00897                 }
00898                 for (unsigned int i = 0; i < efds.fd_count; i++) {
00899                         handleErronousSocket(efds.fd_array[i]);
00900                 }
00901 #else
00902                 for (SIter i = m_sockets.begin(); i != m_sockets.end(); i++) {
00903                         if (FD_ISSET(*i, &rfds)) {
00904                                 handleReadableSocket(*i);
00905                         } else if (FD_ISSET(*i, &wfds)) {
00906                                 handleWritableSocket(*i);
00907                         } else if (FD_ISSET(*i, &efds)) {
00908                                 handleErronousSocket(*i);
00909                         }
00910                 }
00911 #endif
00912         }
00913         cleanupSockets();
00914 }
00915 
00916 // Socket being readable may mean a number of things:
00917 // * If its a server, there might be an incoming connection
00918 // * If its a connected stream socket, there may be incoming data
00919 // * If its a connected stream socket, the connection may have been closed
00920 void SocketWatcher::handleReadableSocket(SOCKET sock) {
00921         logTrace(TRACE_SOCKET, boost::format("Socket %d is readable.") % sock);
00922 
00923         // First determine who governs the socket. Need two map lookups here.
00924         SCIter i = m_clients.find(sock);
00925         if (i == m_clients.end()) {
00926                 SSIter j = m_servers.find(sock);
00927                 if (j == m_servers.end()) {
00928                         SUIter k = m_udpSockets.find(sock);
00929                         if (k == m_udpSockets.end()) {
00930                                 // This should never happen - we have a SOCKET
00931                                 // on record w/o governing SocketBase object!
00932                                 logDebug("Ungoverned SOCKET in SocketWatcher!");
00933                                 return;
00934                         }
00935                         UDPSocket *s = (*k).second;
00936                         s->m_hasData = true;
00937                         logTrace(TRACE_SOCKET, "UDP socket is readable.");
00938                         UDPSocket::getEventTable().postEvent(
00939                                 k->second, SOCK_READ
00940                         );
00941                         return;
00942                 }
00943                 // Here we have determined that its a server and we have
00944                 // the pointer to the SocketServer object available
00945                 SocketServer *server = j->second;
00946                 server->m_incoming = true;
00947                 logTrace(TRACE_SOCKET, "TCP Server is readable.");
00948                 SocketServer::getEventTable().postEvent(server, SOCK_ACCEPT);
00949                 return;
00950         }
00951         // We get here if the SOCKET was found in clients list - thus we
00952         // have pointer to the SocketClient object governing the SOCKET
00953         SocketClient *client = i->second;
00954         client->m_timeout = 0;
00955 
00956         // Peek at the data to determine if its simply readable or the
00957         // connection has been closed
00958         char buf[1];
00959         int retval = ::recv(sock, buf, 1, MSG_PEEK|MSG_NOSIGNAL);
00960         if (retval == 0) {
00961                 // Connection has been closed
00962                 client->close();
00963                 client->m_connected = false;
00964                 client->m_connecting = false;
00965                 logTrace(TRACE_SOCKET,
00966                         boost::format("%p: TCP Client is lost.") % client
00967                 );
00968                 SocketClient::getEventTable().postEvent(client, SOCK_LOST);
00969         } else if (
00970                 retval == SOCKET_ERROR
00971                 && getLastError() != SOCK_EINPROGRESS // not fatal
00972                 && getLastError() != SOCK_EWOULDBLOCK // not fatal
00973         ) {
00974                 // Socket error has occoured
00975                 client->m_erronous = true;
00976                 client->close();
00977                 logTrace(TRACE_SOCKET,
00978                         boost::format("%p: TCP Client is erroneous: %s")
00979                         % client % getErrorStr(getLastError())
00980                 );
00981                 SocketClient::getEventTable().postEvent(client, SOCK_ERR);
00982         } else {
00983                 // There's data available for reading
00984                 client->m_hasData = true;
00985                 logTrace(TRACE_SOCKET,
00986                         boost::format("%p: TCP Client has data.") % client
00987                 );
00988                 SocketClient::getEventTable().postEvent(client, SOCK_READ);
00989         }
00990 }
00991 
00992 // Socket becoming writable means a connect() call has completed or a socket
00993 // with queued data became (again) writable
00994 void SocketWatcher::handleWritableSocket(SOCKET sock) {
00995         logTrace(TRACE_SOCKET, boost::format("Socket %d is writable.") % sock);
00996 
00997         SCIter i = m_clients.find(sock);
00998         if (i == m_clients.end()) {
00999                 // This should never happen - we have ungoverned SOCKET
01000                 // listed.
01001                 logDebug("Ungoverned SOCKET in SocketWatcher!");
01002                 return;
01003         }
01004         SocketClient *client = (*i).second;
01005         client->m_timeout = 0;
01006         if (client->m_connecting && !client->m_connected) {
01007 #ifndef WIN32
01008                 // Check if the connection attempt succeeded or failed. On win32
01009                 // this notificaiton is done via exceptfds, however on posix
01010                 // we must check the flags ourselves.
01011                 int val = 0;
01012                 socklen_t sz = sizeof(int);
01013                 int ret = getsockopt(sock, SOL_SOCKET, SO_ERROR, &val, &sz);
01014                 if (ret == SOCKET_ERROR) {
01015                         // Doh ?
01016                         perror("getsockopt()");
01017                 }
01018                 if (val == 0) {
01019                         client->m_connecting = false;
01020                         client->m_connected = true;
01021                         SocketClient::getEventTable().postEvent(
01022                                 client, SOCK_CONNECTED
01023                         );
01024                 } else {
01025                         logTrace(TRACE_SOCKET,
01026                                 boost::format("Socket error: %d") %strerror(val)
01027                         );
01028                         client->close();
01029                         client->m_connecting = false;
01030                         client->m_connected = false;
01031                         SocketClient::getEventTable().postEvent(
01032                                 client, SOCK_CONNFAILED
01033                         );
01034                 }
01035 #else
01036                 client->m_connecting = false;
01037                 client->m_connected = true;
01038                 SocketClient::getEventTable().postEvent(client, SOCK_CONNECTED);
01039 #endif
01040         } else if (client->m_connected && !client->m_writable) {
01041                 client->m_writable = true;
01042                 SocketClient::getEventTable().postEvent(client, SOCK_WRITE);
01043         }
01044         return;
01045 }
01046 
01047 // Actually, these are not (as the rumors say), erronous sockets. These are
01048 // sockets which have OOB data in them. refer to `man 2 select_tut` for more
01049 // information.
01050 void SocketWatcher::handleErronousSocket(SOCKET sock) {
01051         logTrace(TRACE_SOCKET, boost::format("Socket %d is erronous.") % sock);
01052 
01053         SCIter i = m_clients.find(sock);
01054         if (i == m_clients.end()) {
01055                 SSIter j = m_servers.find(sock);
01056                 if (j == m_servers.end()) {
01057                         // This is a serious error - see similar places in
01058                         // previous functions
01059                         logDebug("Ungoverned SOCKET in SocketWatcher!");
01060                         return;
01061                 }
01062                 // Server has become erronous
01063                 SocketServer *server = j->second;
01064                 server->close();
01065                 server->m_listening = false;
01066                 server->m_erronous = true;
01067                 SocketServer::getEventTable().postEvent(server, SOCK_LOST);
01068         } else {
01069                 // Client has become erronous
01070                 SocketClient *client = i->second;
01071                 logTrace(TRACE_SOCKET,
01072                         boost::format("%p: TCP socket became erronous") % client
01073                 );
01074                 client->m_timeout = 0;
01075                 client->close();
01076                 client->m_connected = false;
01077                 client->m_connecting = false;
01078                 client->m_erronous = true;
01079                 SocketClient::getEventTable().postEvent(client, SOCK_LOST);
01080         }
01081 }
01082 
01083 namespace Socket {
01084         //! Implement EOL symbol
01085         _Endl Endl;
01086 }