From e7861f633954ae4a9238dac6795de0efa7d7472a Mon Sep 17 00:00:00 2001 From: ackimixs Date: Thu, 23 May 2024 16:17:35 +0200 Subject: [PATCH] non-blocking client --- .../TCPClient/include/Modelec/TCPClient.h | 3 + components/TCPClient/src/TCPClient.cpp | 60 ++++++++++++++----- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/components/TCPClient/include/Modelec/TCPClient.h b/components/TCPClient/include/Modelec/TCPClient.h index c49f862..49d327d 100644 --- a/components/TCPClient/include/Modelec/TCPClient.h +++ b/components/TCPClient/include/Modelec/TCPClient.h @@ -7,11 +7,14 @@ #include #include #include +#include +#include class TCPClient { private: int clientSocket; sockaddr_in serverAddress{}; + std::thread receiveThread; protected: bool running; diff --git a/components/TCPClient/src/TCPClient.cpp b/components/TCPClient/src/TCPClient.cpp index 82074b2..1816d5e 100644 --- a/components/TCPClient/src/TCPClient.cpp +++ b/components/TCPClient/src/TCPClient.cpp @@ -21,6 +21,9 @@ TCPClient::TCPClient(const char* serverIP, int port) : running(false), _stoped(f std::cerr << "Connection failed" << std::endl; exit(EXIT_FAILURE); } + + int flags = fcntl(clientSocket, F_GETFL, 0); + fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK); } void TCPClient::sendMessage(const char* message) const { @@ -43,20 +46,47 @@ void TCPClient::sendMessage(const std::string& message) const { void TCPClient::receiveMessages() { char buffer[1024] = {0}; + fd_set readfds; + while (running) { - ssize_t valread = recv(clientSocket, buffer, sizeof(buffer), 0); - if (valread > 0) { - std::vector messages = Modelec::split(buffer, "\n"); - for (const std::string& message : messages) { - handleMessage(message); + if (clientSocket == -1) break; + + FD_ZERO(&readfds); + FD_SET(clientSocket, &readfds); + + struct timeval timeout; + timeout.tv_sec = 1; // Set timeout to 1 second + timeout.tv_usec = 0; + + int activity = select(clientSocket + 1, &readfds, nullptr, nullptr, &timeout); + + if (activity < 0) { + if (errno != EINTR) { + std::cerr << "Select error" << std::endl; + break; + } + } else if (activity == 0) { + // Timeout occurred, check if we should stop + if (shouldStop()) { + break; } - memset(buffer, 0, sizeof(buffer)); // Clear buffer - } else if (valread == 0) { - std::cerr << "Connection closed by server" << std::endl; - break; } else { - std::cerr << "Error in receiving message" << std::endl; - break; + if (FD_ISSET(clientSocket, &readfds)) { + ssize_t valread = recv(clientSocket, buffer, sizeof(buffer), 0); + if (valread > 0) { + std::vector messages = Modelec::split(buffer, "\n"); + for (const std::string& message : messages) { + handleMessage(message); + } + memset(buffer, 0, sizeof(buffer)); // Clear buffer + } else if (valread == 0) { + std::cerr << "Connection closed by server" << std::endl; + break; + } else { + std::cerr << "Error in receiving message" << std::endl; + break; + } + } } } running = false; @@ -72,15 +102,17 @@ TCPClient::~TCPClient() { void TCPClient::start() { running = true; - std::thread receiveThread(&TCPClient::receiveMessages, this); + receiveThread = std::thread(&TCPClient::receiveMessages, this); receiveThread.detach(); } void TCPClient::stop() { if (!_stoped) { - running = false; - close(clientSocket); _stoped = true; + running = false; + + close(clientSocket); + clientSocket = -1; } }