aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/sibs/BootstrapNode.hpp28
-rw-r--r--include/sibs/DirectConnection.hpp22
-rw-r--r--include/sibs/IpAddress.hpp35
-rw-r--r--src/BootstrapNode.cpp65
-rw-r--r--src/DirectConnection.cpp49
-rw-r--r--src/IpAddress.cpp40
6 files changed, 185 insertions, 54 deletions
diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp
new file mode 100644
index 0000000..eaf48eb
--- /dev/null
+++ b/include/sibs/BootstrapNode.hpp
@@ -0,0 +1,28 @@
+#pragma once
+
+#include "DirectConnection.hpp"
+#include "IpAddress.hpp"
+
+namespace sibs
+{
+ class BootstrapException : public std::runtime_error
+ {
+ public:
+ BootstrapException(const std::string &errMsg) : std::runtime_error(errMsg) {}
+ };
+
+ class BootstrapNode
+ {
+ DISABLE_COPY(BootstrapNode)
+ public:
+ // Throws BootstrapException on error
+ BootstrapNode(const Ipv4 &address);
+ ~BootstrapNode();
+ private:
+ void acceptConnections();
+ private:
+ DirectConnections connections;
+ int socket;
+ std::thread acceptConnectionsThread;
+ };
+}
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index 8e92137..c4431a5 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -7,19 +7,12 @@
#include <memory>
#include <thread>
#include <mutex>
+#include "IpAddress.hpp"
#include "../types.hpp"
#include "../utils.hpp"
-struct addrinfo;
-
namespace sibs
{
- class InvalidAddressException : public std::runtime_error
- {
- public:
- InvalidAddressException(const std::string &errMsg) : std::runtime_error(errMsg) {}
- };
-
class ConnectionException : public std::runtime_error
{
public:
@@ -32,16 +25,6 @@ namespace sibs
SendException(const std::string &errMsg) : std::runtime_error(errMsg) {}
};
- class Ipv4
- {
- DISABLE_COPY(Ipv4)
- public:
- // Throws InvalidAddressException on error
- Ipv4(const char *ip, u16 port);
- ~Ipv4();
-
- struct addrinfo *address;
- };
enum class PubSubConnectResult
{
@@ -61,6 +44,7 @@ namespace sibs
class DirectConnections
{
DISABLE_COPY(DirectConnections)
+ friend class BootstrapNode;
public:
DirectConnections(u16 port = 27137);
~DirectConnections();
@@ -69,6 +53,8 @@ namespace sibs
std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc);
// Throws SendException on error
void send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size);
+ protected:
+ int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr);
private:
void receiveData();
bool receiveDataFromPeer(const int socket, char *output);
diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp
new file mode 100644
index 0000000..a0fad75
--- /dev/null
+++ b/include/sibs/IpAddress.hpp
@@ -0,0 +1,35 @@
+#pragma once
+
+#include "../utils.hpp"
+#include <stdexcept>
+#include <string>
+#ifndef WIN32
+ #include <arpa/inet.h>
+ #include <netdb.h>
+#else
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+#endif
+
+namespace sibs
+{
+ class InvalidAddressException : public std::runtime_error
+ {
+ public:
+ InvalidAddressException(const std::string &errMsg) : std::runtime_error(errMsg) {}
+ };
+
+ class Ipv4
+ {
+ DISABLE_COPY(Ipv4)
+ public:
+ // If @ip is nullptr, then bind to all available sockets (typical for servers)
+ // Throws InvalidAddressException on error.
+ Ipv4(const char *ip, unsigned short port);
+
+ std::string getAddress() const;
+ unsigned short getPort() const;
+
+ struct sockaddr_in address;
+ };
+}
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
new file mode 100644
index 0000000..e26b9ea
--- /dev/null
+++ b/src/BootstrapNode.cpp
@@ -0,0 +1,65 @@
+#include "../include/sibs/BootstrapNode.hpp"
+#include "../include/Log.hpp"
+#ifndef WIN32
+ #include <arpa/inet.h>
+ #include <netdb.h>
+#else
+ #include <winsock2.h>
+ #include <ws2tcpip.h>
+#endif
+#include <udt/udt.h>
+
+namespace sibs
+{
+ BootstrapNode::BootstrapNode(const Ipv4 &address) :
+ socket(connections.createSocket(address, false, true))
+ {
+ if(UDT::listen(socket, 10) == UDT::ERROR)
+ {
+ std::string errMsg = "UDT: Failed to listen, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw BootstrapException(errMsg);
+ }
+ acceptConnectionsThread = std::thread(&BootstrapNode::acceptConnections, this);
+ }
+
+ BootstrapNode::~BootstrapNode()
+ {
+ connections.alive = false;
+ UDT::close(socket);
+ acceptConnectionsThread.join();
+ }
+
+ void BootstrapNode::acceptConnections()
+ {
+ sockaddr_storage clientAddr;
+ int addrLen = sizeof(clientAddr);
+
+ while(connections.alive)
+ {
+ UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen);
+ if(clientSocket == UDT::INVALID_SOCK)
+ {
+ // Connection was killed because bootstrap node was taken down
+ if(!connections.alive)
+ return;
+ Log::error("UDT: Failed to accept connection, error: %s", UDT::getlasterror_desc());
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ continue;
+ }
+
+ char clientHost[NI_MAXHOST];
+ char clientService[NI_MAXSERV];
+ getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV);
+ Log::debug("UDT: New connection: %s:%s", clientHost, clientService);
+
+ UDT::epoll_add_usock(connections.eid, clientSocket);
+ std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
+ peer->socket = clientSocket;
+ peer->receiveDataCallbackFunc = nullptr;
+ connections.peersMutex.lock();
+ connections.peers[clientSocket] = peer;
+ connections.peersMutex.unlock();
+ }
+ }
+}
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 6e3ddcc..86b1aa3 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -17,30 +17,6 @@ namespace sibs
// Max received data size allowed when receiving regular data, receive data as file to receive more data
const int MAX_RECEIVED_DATA_SIZE = 1024 * 1024 * 1; // 1Mb
- Ipv4::Ipv4(const char *ip, u16 port)
- {
- struct addrinfo hints = {};
- hints.ai_flags = AI_PASSIVE;
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
-
- char portStr[6];
- sprintf(portStr, "%d", port);
- int addrInfoResult = getaddrinfo(ip, portStr, &hints, &address);
- if(addrInfoResult != 0)
- {
- std::string errMsg = "Ip ";
- errMsg += ip;
- errMsg += " is not a valid ip";
- throw InvalidAddressException(errMsg);
- }
- }
-
- Ipv4::~Ipv4()
- {
- freeaddrinfo(address);
- }
-
DirectConnections::DirectConnections(u16 _port) :
port(_port),
alive(true)
@@ -63,9 +39,10 @@ namespace sibs
UDT::cleanup();
}
- std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ int DirectConnections::createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr)
{
- UDTSOCKET socket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM);
+ Log::debug("UDT: Creating socket for ipv4 address %s, port: %d", addressToBind.getAddress().c_str(), addressToBind.getPort());
+ UDTSOCKET socket = UDT::socket(AF_INET, SOCK_STREAM, IPPROTO_UDP);
if(socket == UDT::INVALID_SOCK)
{
std::string errMsg = "UDT: Failed to create socket, error: ";
@@ -73,9 +50,7 @@ namespace sibs
throw ConnectionException(errMsg);
}
- bool rendezvous = true;
UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
- bool reuseAddr = true;
UDT::setsockopt(socket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool));
// Windows UDP issue
@@ -84,22 +59,24 @@ namespace sibs
int mss = 1052;
UDT::setsockopt(socket, 0, UDT_MSS, &mss, sizeof(mss));
#endif
-
- sockaddr_in myAddr = {};
- myAddr.sin_family = AF_INET;
- myAddr.sin_port = htons(port);
- myAddr.sin_addr.s_addr = INADDR_ANY;
- memset(&myAddr.sin_zero, '\0', 8);
- if(UDT::bind(socket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR)
+ if(UDT::bind(socket, (sockaddr*)&addressToBind.address, sizeof(addressToBind.address)) == UDT::ERROR)
{
std::string errMsg = "UDT: Failed to bind, error: ";
errMsg += UDT::getlasterror_desc();
throw ConnectionException(errMsg);
}
- if(UDT::connect(socket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR)
+ return socket;
+ }
+
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true);
+
+ if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
{
+ UDT::close(socket);
std::string errMsg = "UDT: Failed to connect, error: ";
errMsg += UDT::getlasterror_desc();
throw ConnectionException(errMsg);
diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp
new file mode 100644
index 0000000..22e81e5
--- /dev/null
+++ b/src/IpAddress.cpp
@@ -0,0 +1,40 @@
+#include "../include/sibs/IpAddress.hpp"
+#include <cstring>
+
+namespace sibs
+{
+ Ipv4::Ipv4(const char *ip, unsigned short port)
+ {
+ address.sin_family = AF_INET;
+ address.sin_port = htons(port);
+ if(ip)
+ {
+ if(strlen(ip) > 15)
+ throw InvalidAddressException("Ip address is too long");
+
+ if(inet_pton(AF_INET, ip, &address.sin_addr.s_addr) != 1)
+ {
+ std::string errMsg = "Ip ";
+ errMsg += ip;
+ errMsg += " is not a valid ip";
+ throw InvalidAddressException(errMsg);
+ }
+ }
+ else
+ address.sin_addr.s_addr = INADDR_ANY;
+ memset(address.sin_zero, 0, sizeof(address.sin_zero));
+ }
+
+ std::string Ipv4::getAddress() const
+ {
+ std::string result;
+ result.resize(INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &address.sin_addr, &result[0], INET_ADDRSTRLEN);
+ return result;
+ }
+
+ unsigned short Ipv4::getPort() const
+ {
+ return ntohs(address.sin_port);
+ }
+}