non-blocking client

This commit is contained in:
ackimixs
2024-05-23 16:17:35 +02:00
parent 7fa6740893
commit e7861f6339
2 changed files with 49 additions and 14 deletions

View File

@@ -7,11 +7,14 @@
#include <unistd.h>
#include <cstring>
#include <thread>
#include <fcntl.h>
#include <atomic>
class TCPClient {
private:
int clientSocket;
sockaddr_in serverAddress{};
std::thread receiveThread;
protected:
bool running;

View File

@@ -21,6 +21,9 @@ TCPClient::TCPClient(const char* serverIP, int port) : running(false), _stoped(f
std::cerr << "Connection failed" << std::endl;
exit(EXIT_FAILURE);
}
int flags = fcntl(clientSocket, F_GETFL, 0);
fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK);
}
void TCPClient::sendMessage(const char* message) const {
@@ -43,20 +46,47 @@ void TCPClient::sendMessage(const std::string& message) const {
void TCPClient::receiveMessages() {
char buffer[1024] = {0};
fd_set readfds;
while (running) {
ssize_t valread = recv(clientSocket, buffer, sizeof(buffer), 0);
if (valread > 0) {
std::vector<std::string> messages = Modelec::split(buffer, "\n");
for (const std::string& message : messages) {
handleMessage(message);
if (clientSocket == -1) break;
FD_ZERO(&readfds);
FD_SET(clientSocket, &readfds);
struct timeval timeout;
timeout.tv_sec = 1; // Set timeout to 1 second
timeout.tv_usec = 0;
int activity = select(clientSocket + 1, &readfds, nullptr, nullptr, &timeout);
if (activity < 0) {
if (errno != EINTR) {
std::cerr << "Select error" << std::endl;
break;
}
} else if (activity == 0) {
// Timeout occurred, check if we should stop
if (shouldStop()) {
break;
}
memset(buffer, 0, sizeof(buffer)); // Clear buffer
} else if (valread == 0) {
std::cerr << "Connection closed by server" << std::endl;
break;
} else {
std::cerr << "Error in receiving message" << std::endl;
break;
if (FD_ISSET(clientSocket, &readfds)) {
ssize_t valread = recv(clientSocket, buffer, sizeof(buffer), 0);
if (valread > 0) {
std::vector<std::string> messages = Modelec::split(buffer, "\n");
for (const std::string& message : messages) {
handleMessage(message);
}
memset(buffer, 0, sizeof(buffer)); // Clear buffer
} else if (valread == 0) {
std::cerr << "Connection closed by server" << std::endl;
break;
} else {
std::cerr << "Error in receiving message" << std::endl;
break;
}
}
}
}
running = false;
@@ -72,15 +102,17 @@ TCPClient::~TCPClient() {
void TCPClient::start() {
running = true;
std::thread receiveThread(&TCPClient::receiveMessages, this);
receiveThread = std::thread(&TCPClient::receiveMessages, this);
receiveThread.detach();
}
void TCPClient::stop() {
if (!_stoped) {
running = false;
close(clientSocket);
_stoped = true;
running = false;
close(clientSocket);
clientSocket = -1;
}
}