Skip to content

Commit e4e748f

Browse files
author
Karl Herbig
committed
enable tcp server to handle multiple client connections
1 parent 933de75 commit e4e748f

File tree

3 files changed

+117
-52
lines changed

3 files changed

+117
-52
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
7272
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
7373
find_package(Threads REQUIRED)
7474

75-
set(LIBMODBUS_MIN_VERSION "3.1.8")
75+
set(LIBMODBUS_MIN_VERSION "3.1.10")
7676
find_package(PkgConfig REQUIRED)
7777

7878
pkg_check_modules (LIBMODBUS REQUIRED libmodbusepsi>=${LIBMODBUS_MIN_VERSION})

src/server.cpp

Lines changed: 111 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -83,24 +83,15 @@ namespace Modbus {
8383

8484
// ---------------------------------------------------------------------------
8585
int Server::poll (long timeout) {
86+
if(isRunning() || not isOpen()){
87+
// cann not proceed if the `loop()` is running
88+
// OR connection is not established
89+
return -1;
90+
}
8691

87-
if (!isRunning() && isOpen()) {
88-
PIMP_D (Server);
89-
90-
if (!d->receiveTask.valid()) {
91-
// starts the receiving thread
92-
d->receiveTask = std::async (std::launch::async, Server::Private::receive, d);
93-
}
94-
95-
if (d->receiveTask.wait_for (std::chrono::milliseconds (timeout)) == std::future_status::ready) {
92+
PIMP_D (Server);
93+
return d->poll(timeout);
9694

97-
// message received ?
98-
int rc = d->receiveTask.get();
99-
return d->task (rc);
100-
}
101-
return 0;
102-
}
103-
return -1;
10495
}
10596

10697
// ---------------------------------------------------------------------------
@@ -184,9 +175,9 @@ namespace Modbus {
184175
void Server::terminate () {
185176
PIMP_D (Server);
186177

187-
if (d->sock != -1) {
188-
189-
::shutdown (d->sock, SHUT_RDWR);
178+
if (d->listen_sock != -1) {
179+
::shutdown (d->listen_sock, SHUT_RDWR);
180+
::close(d->listen_sock);
190181
}
191182

192183
if (isRunning()) {
@@ -196,6 +187,7 @@ namespace Modbus {
196187
// Wait for thread to join
197188
d->daemon.join();
198189
}
190+
199191
}
200192

201193
// ---------------------------------------------------------------------------
@@ -234,10 +226,10 @@ namespace Modbus {
234226

235227
// ---------------------------------------------------------------------------
236228
Server::Private::Private (Server * q) :
237-
Device::Private (q), sock (-1), req (0) {}
229+
Device::Private (q), listen_sock (-1), req (0) {
230+
all_pollfds.reserve(MAX_CONNECTIONS +1);
231+
}
238232

239-
// ---------------------------------------------------------------------------
240-
Server::Private::~Private() = default;
241233

242234
// ---------------------------------------------------------------------------
243235
// virtual
@@ -292,14 +284,23 @@ namespace Modbus {
292284
switch (backend->net()) {
293285

294286
case Tcp:
295-
sock = modbus_tcp_pi_listen (ctx(), 1);
296-
isOk = (sock != -1);
287+
listen_sock = modbus_tcp_pi_listen (ctx(), MAX_CONNECTIONS);
288+
if ( listen_sock != -1 ) {
289+
isOk = true;
290+
all_pollfds.push_back(pollfd{.fd=listen_sock, .events=POLL_IN, .revents=0});
291+
}
297292
break;
298293

299294
case Rtu:
300-
case Ascii:
301-
isOk = Device::Private::open();
302-
295+
case Ascii: {
296+
if ( Device::Private::open() ) {
297+
isOk = true;
298+
int listen_sock = modbus_get_socket(ctx());
299+
all_pollfds.push_back(pollfd{.fd=listen_sock, .events=POLL_IN, .revents=0});
300+
} else {
301+
std::cout << "fd after open: " << listen_sock << "\n";
302+
}
303+
}
303304
default:
304305
break;
305306
}
@@ -315,18 +316,16 @@ namespace Modbus {
315316

316317
// ---------------------------------------------------------------------------
317318
void Server::Private::close() {
318-
std::lock_guard<std::mutex> lg (d_guard);
319-
319+
std::lock_guard<std::mutex> lg (d_guard);
320320
if (backend->net() == Tcp) {
321-
322-
if (sock != -1) {
323-
321+
if (listen_sock != -1) {
324322
#ifdef _WIN32
325-
::closesocket (sock);
323+
::closesocket (listen_sock);
326324
#else
327-
::close (sock);
325+
::close (listen_sock);
328326
#endif
329-
sock = -1;
327+
all_pollfds.clear();
328+
listen_sock = -1;
330329
}
331330
}
332331
Device::Private::close();
@@ -393,35 +392,99 @@ namespace Modbus {
393392
rc = 0;
394393
}
395394
else {
396-
397395
if (messageCB) {
398-
399396
rc = messageCB (req.get(), q);
400397
}
401398
}
402399
}
403400
return rc;
404401
}
405402

406-
// ---------------------------------------------------------------------------
407-
// static
408-
int Server::Private::receive (Private * d) {
409-
std::lock_guard<std::mutex> lg (d->d_guard);
410-
int rc;
411-
if ( (d->backend->net() == Tcp) && !d->isConnected()) {
403+
int Server::Private::poll(int timeout)
404+
{
405+
int eventCount = ::poll(all_pollfds.data(), all_pollfds.size(), timeout);
412406

413-
// accept blocking call !
414-
if (modbus_tcp_pi_accept (d->ctx(), &d->sock) < 0) {
407+
if ( eventCount == 0) {
408+
// there is nothing to process
409+
return 0;
410+
}
415411

412+
if ( eventCount < 0 ) {
413+
// handle error
416414
return -1;
415+
}
416+
417+
// handle events
418+
std::vector<pollfd> new_pfds{};
419+
420+
for ( pollfd& pfd : all_pollfds ) {
421+
422+
if ( pfd.revents & POLLIN ) {
423+
424+
if ( backend->net() == Net::Tcp && pfd.fd == listen_sock ) {
425+
// if there is an event on the 'listening' socket
426+
// handle incomming connection request aka `connect()`
427+
// but only Server::Private::MAX_CONNECTIONS
428+
if ( all_pollfds.size() < MAX_CONNECTIONS ) {
429+
int new_socket = modbus_tcp_accept(ctx(), &listen_sock);
430+
if ( new_socket != -1 ) {
431+
new_pfds.push_back(pollfd{
432+
.fd=new_socket,
433+
.events=POLL_IN,
434+
.revents=0});
435+
}
436+
}
437+
} else {
438+
// handle incomming request
439+
modbus_set_socket(ctx(), pfd.fd);
440+
int rc = Server::Private::receive(this);
441+
if ( rc == -1 ) {
442+
// if receive fails after a successfull poll
443+
// probably the connection is broken
444+
::close(pfd.fd);
445+
pfd.fd = -1;
446+
continue;
447+
}
448+
449+
task(rc);
450+
}
451+
} else if ( pfd.revents & POLLHUP ) {
452+
::close(pfd.fd);
453+
pfd.fd = -1;
417454
}
418455
}
456+
457+
// remove bad file descriptors from watch list
458+
auto badFds = std::remove_if(all_pollfds.begin(), all_pollfds.end(), [](const pollfd& pfd) {
459+
return (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) || (pfd.fd == -1);
460+
});
461+
all_pollfds.erase(badFds, all_pollfds.end());
462+
463+
if ( all_pollfds.empty() ) {
464+
return -1;
465+
}
466+
467+
// add new accepted file descriptors to watch list
468+
if ( not new_pfds.empty() ) {
469+
all_pollfds.insert(all_pollfds.end(), new_pfds.begin(), new_pfds.end());
470+
}
471+
472+
return 0;
473+
}
474+
475+
// ---------------------------------------------------------------------------
476+
// static
477+
int Server::Private::receive (Private * d) {
478+
std::lock_guard<std::mutex> lg (d->d_guard);
479+
480+
int rc = 0;
481+
419482
d->req->clear();
420483
rc = modbus_receive (d->ctx(), d->req->adu());
421-
if (rc > 0) {
422-
423-
d->req->setAduSize (rc);
484+
if ( rc > 0 ) {
485+
d->req->setAduSize(rc);
424486
}
487+
425488
return rc;
426489
}
427490

src/server_p.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
#pragma once
1818

19+
#include <poll.h>
1920
#include <map>
2021
#include <future>
2122
#include <thread>
@@ -26,26 +27,27 @@ namespace Modbus {
2627

2728
class Server::Private : public Device::Private {
2829

30+
static const int MAX_CONNECTIONS = 16;
2931
public:
3032
Private (Server * q);
31-
virtual ~Private();
3233
virtual void setBackend (Net net, const std::string & connection,
3334
const std::string & settings);
3435
virtual void setConfig (const nlohmann::json & config);
3536

3637
virtual bool open();
3738
virtual void close();
3839
int task (int rc);
40+
int poll (int timeout);
3941

4042
BufferedSlave * addSlave (int slaveAddr, Device * master);
4143

4244
static void * loop (std::future<void> run, Private * d);
4345
static int receive (Private * d);
4446

45-
int sock;
47+
int listen_sock = -1;
48+
std::vector<pollfd> all_pollfds {};
4649
std::shared_ptr<Request> req;
4750
std::map <int, std::shared_ptr<BufferedSlave>> slave;
48-
std::future<int> receiveTask;
4951
std::thread daemon;
5052
std::promise<void> stopDaemon;
5153
Message::Callback messageCB;

0 commit comments

Comments
 (0)