diff options
-rw-r--r-- | include/sibs/BootstrapNode.hpp | 28 | ||||
-rw-r--r-- | include/sibs/DirectConnection.hpp | 22 | ||||
-rw-r--r-- | include/sibs/IpAddress.hpp | 35 | ||||
-rw-r--r-- | src/BootstrapNode.cpp | 65 | ||||
-rw-r--r-- | src/DirectConnection.cpp | 49 | ||||
-rw-r--r-- | src/IpAddress.cpp | 40 |
6 files changed, 185 insertions, 54 deletions
diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp new file mode 100644 index 0000000..eaf48eb --- /dev/null +++ b/include/sibs/BootstrapNode.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "DirectConnection.hpp" +#include "IpAddress.hpp" + +namespace sibs +{ + class BootstrapException : public std::runtime_error + { + public: + BootstrapException(const std::string &errMsg) : std::runtime_error(errMsg) {} + }; + + class BootstrapNode + { + DISABLE_COPY(BootstrapNode) + public: + // Throws BootstrapException on error + BootstrapNode(const Ipv4 &address); + ~BootstrapNode(); + private: + void acceptConnections(); + private: + DirectConnections connections; + int socket; + std::thread acceptConnectionsThread; + }; +} diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index 8e92137..c4431a5 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -7,19 +7,12 @@ #include <memory> #include <thread> #include <mutex> +#include "IpAddress.hpp" #include "../types.hpp" #include "../utils.hpp" -struct addrinfo; - namespace sibs { - class InvalidAddressException : public std::runtime_error - { - public: - InvalidAddressException(const std::string &errMsg) : std::runtime_error(errMsg) {} - }; - class ConnectionException : public std::runtime_error { public: @@ -32,16 +25,6 @@ namespace sibs SendException(const std::string &errMsg) : std::runtime_error(errMsg) {} }; - class Ipv4 - { - DISABLE_COPY(Ipv4) - public: - // Throws InvalidAddressException on error - Ipv4(const char *ip, u16 port); - ~Ipv4(); - - struct addrinfo *address; - }; enum class PubSubConnectResult { @@ -61,6 +44,7 @@ namespace sibs class DirectConnections { DISABLE_COPY(DirectConnections) + friend class BootstrapNode; public: DirectConnections(u16 port = 27137); ~DirectConnections(); @@ -69,6 +53,8 @@ namespace sibs std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc); // Throws SendException on error void send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size); + protected: + int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr); private: void receiveData(); bool receiveDataFromPeer(const int socket, char *output); diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp new file mode 100644 index 0000000..a0fad75 --- /dev/null +++ b/include/sibs/IpAddress.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include "../utils.hpp" +#include <stdexcept> +#include <string> +#ifndef WIN32 + #include <arpa/inet.h> + #include <netdb.h> +#else + #include <winsock2.h> + #include <ws2tcpip.h> +#endif + +namespace sibs +{ + class InvalidAddressException : public std::runtime_error + { + public: + InvalidAddressException(const std::string &errMsg) : std::runtime_error(errMsg) {} + }; + + class Ipv4 + { + DISABLE_COPY(Ipv4) + public: + // If @ip is nullptr, then bind to all available sockets (typical for servers) + // Throws InvalidAddressException on error. + Ipv4(const char *ip, unsigned short port); + + std::string getAddress() const; + unsigned short getPort() const; + + struct sockaddr_in address; + }; +} diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp new file mode 100644 index 0000000..e26b9ea --- /dev/null +++ b/src/BootstrapNode.cpp @@ -0,0 +1,65 @@ +#include "../include/sibs/BootstrapNode.hpp" +#include "../include/Log.hpp" +#ifndef WIN32 + #include <arpa/inet.h> + #include <netdb.h> +#else + #include <winsock2.h> + #include <ws2tcpip.h> +#endif +#include <udt/udt.h> + +namespace sibs +{ + BootstrapNode::BootstrapNode(const Ipv4 &address) : + socket(connections.createSocket(address, false, true)) + { + if(UDT::listen(socket, 10) == UDT::ERROR) + { + std::string errMsg = "UDT: Failed to listen, error: "; + errMsg += UDT::getlasterror_desc(); + throw BootstrapException(errMsg); + } + acceptConnectionsThread = std::thread(&BootstrapNode::acceptConnections, this); + } + + BootstrapNode::~BootstrapNode() + { + connections.alive = false; + UDT::close(socket); + acceptConnectionsThread.join(); + } + + void BootstrapNode::acceptConnections() + { + sockaddr_storage clientAddr; + int addrLen = sizeof(clientAddr); + + while(connections.alive) + { + UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen); + if(clientSocket == UDT::INVALID_SOCK) + { + // Connection was killed because bootstrap node was taken down + if(!connections.alive) + return; + Log::error("UDT: Failed to accept connection, error: %s", UDT::getlasterror_desc()); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + char clientHost[NI_MAXHOST]; + char clientService[NI_MAXSERV]; + getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV); + Log::debug("UDT: New connection: %s:%s", clientHost, clientService); + + UDT::epoll_add_usock(connections.eid, clientSocket); + std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>(); + peer->socket = clientSocket; + peer->receiveDataCallbackFunc = nullptr; + connections.peersMutex.lock(); + connections.peers[clientSocket] = peer; + connections.peersMutex.unlock(); + } + } +} diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 6e3ddcc..86b1aa3 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -17,30 +17,6 @@ namespace sibs // Max received data size allowed when receiving regular data, receive data as file to receive more data const int MAX_RECEIVED_DATA_SIZE = 1024 * 1024 * 1; // 1Mb - Ipv4::Ipv4(const char *ip, u16 port) - { - struct addrinfo hints = {}; - hints.ai_flags = AI_PASSIVE; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - - char portStr[6]; - sprintf(portStr, "%d", port); - int addrInfoResult = getaddrinfo(ip, portStr, &hints, &address); - if(addrInfoResult != 0) - { - std::string errMsg = "Ip "; - errMsg += ip; - errMsg += " is not a valid ip"; - throw InvalidAddressException(errMsg); - } - } - - Ipv4::~Ipv4() - { - freeaddrinfo(address); - } - DirectConnections::DirectConnections(u16 _port) : port(_port), alive(true) @@ -63,9 +39,10 @@ namespace sibs UDT::cleanup(); } - std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) + int DirectConnections::createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr) { - UDTSOCKET socket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM); + Log::debug("UDT: Creating socket for ipv4 address %s, port: %d", addressToBind.getAddress().c_str(), addressToBind.getPort()); + UDTSOCKET socket = UDT::socket(AF_INET, SOCK_STREAM, IPPROTO_UDP); if(socket == UDT::INVALID_SOCK) { std::string errMsg = "UDT: Failed to create socket, error: "; @@ -73,9 +50,7 @@ namespace sibs throw ConnectionException(errMsg); } - bool rendezvous = true; UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool)); - bool reuseAddr = true; UDT::setsockopt(socket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool)); // Windows UDP issue @@ -84,22 +59,24 @@ namespace sibs int mss = 1052; UDT::setsockopt(socket, 0, UDT_MSS, &mss, sizeof(mss)); #endif - - sockaddr_in myAddr = {}; - myAddr.sin_family = AF_INET; - myAddr.sin_port = htons(port); - myAddr.sin_addr.s_addr = INADDR_ANY; - memset(&myAddr.sin_zero, '\0', 8); - if(UDT::bind(socket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR) + if(UDT::bind(socket, (sockaddr*)&addressToBind.address, sizeof(addressToBind.address)) == UDT::ERROR) { std::string errMsg = "UDT: Failed to bind, error: "; errMsg += UDT::getlasterror_desc(); throw ConnectionException(errMsg); } - if(UDT::connect(socket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR) + return socket; + } + + std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) + { + UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true); + + if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) { + UDT::close(socket); std::string errMsg = "UDT: Failed to connect, error: "; errMsg += UDT::getlasterror_desc(); throw ConnectionException(errMsg); diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp new file mode 100644 index 0000000..22e81e5 --- /dev/null +++ b/src/IpAddress.cpp @@ -0,0 +1,40 @@ +#include "../include/sibs/IpAddress.hpp" +#include <cstring> + +namespace sibs +{ + Ipv4::Ipv4(const char *ip, unsigned short port) + { + address.sin_family = AF_INET; + address.sin_port = htons(port); + if(ip) + { + if(strlen(ip) > 15) + throw InvalidAddressException("Ip address is too long"); + + if(inet_pton(AF_INET, ip, &address.sin_addr.s_addr) != 1) + { + std::string errMsg = "Ip "; + errMsg += ip; + errMsg += " is not a valid ip"; + throw InvalidAddressException(errMsg); + } + } + else + address.sin_addr.s_addr = INADDR_ANY; + memset(address.sin_zero, 0, sizeof(address.sin_zero)); + } + + std::string Ipv4::getAddress() const + { + std::string result; + result.resize(INET_ADDRSTRLEN); + inet_ntop(AF_INET, &address.sin_addr, &result[0], INET_ADDRSTRLEN); + return result; + } + + unsigned short Ipv4::getPort() const + { + return ntohs(address.sin_port); + } +} |