@@ -83,24 +83,15 @@ namespace Modbus {
8383
8484 // ---------------------------------------------------------------------------
8585 int Server::poll (long timeout) {
86+ if (isRunning () || not isOpen ()){
87+ // cann not proceed if the `loop()` is running
88+ // OR connection is not established
89+ return -1 ;
90+ }
8691
87- if (!isRunning () && isOpen ()) {
88- PIMP_D (Server);
89-
90- if (!d->receiveTask .valid ()) {
91- // starts the receiving thread
92- d->receiveTask = std::async (std::launch::async, Server::Private::receive, d);
93- }
94-
95- if (d->receiveTask .wait_for (std::chrono::milliseconds (timeout)) == std::future_status::ready) {
92+ PIMP_D (Server);
93+ return d->poll (timeout);
9694
97- // message received ?
98- int rc = d->receiveTask .get ();
99- return d->task (rc);
100- }
101- return 0 ;
102- }
103- return -1 ;
10495 }
10596
10697 // ---------------------------------------------------------------------------
@@ -184,9 +175,9 @@ namespace Modbus {
184175 void Server::terminate () {
185176 PIMP_D (Server);
186177
187- if (d->sock != -1 ) {
188-
189- ::shutdown (d->sock, SHUT_RDWR );
178+ if (d->listen_sock != -1 ) {
179+ ::shutdown (d->listen_sock, SHUT_RDWR);
180+ ::close (d->listen_sock );
190181 }
191182
192183 if (isRunning ()) {
@@ -196,6 +187,7 @@ namespace Modbus {
196187 // Wait for thread to join
197188 d->daemon .join ();
198189 }
190+
199191 }
200192
201193 // ---------------------------------------------------------------------------
@@ -234,10 +226,10 @@ namespace Modbus {
234226
235227 // ---------------------------------------------------------------------------
236228 Server::Private::Private (Server * q) :
237- Device::Private (q), sock (-1 ), req (0 ) {}
229+ Device::Private (q), listen_sock (-1 ), req (0 ) {
230+ all_pollfds.reserve (MAX_CONNECTIONS +1 );
231+ }
238232
239- // ---------------------------------------------------------------------------
240- Server::Private::~Private () = default ;
241233
242234 // ---------------------------------------------------------------------------
243235 // virtual
@@ -292,14 +284,23 @@ namespace Modbus {
292284 switch (backend->net ()) {
293285
294286 case Tcp:
295- sock = modbus_tcp_pi_listen (ctx (), 1 );
296- isOk = (sock != -1 );
287+ listen_sock = modbus_tcp_pi_listen (ctx (), MAX_CONNECTIONS);
288+ if ( listen_sock != -1 ) {
289+ isOk = true ;
290+ all_pollfds.push_back (pollfd{.fd =listen_sock, .events =POLL_IN, .revents =0 });
291+ }
297292 break ;
298293
299294 case Rtu:
300- case Ascii:
301- isOk = Device::Private::open ();
302-
295+ case Ascii: {
296+ if ( Device::Private::open () ) {
297+ isOk = true ;
298+ int listen_sock = modbus_get_socket (ctx ());
299+ all_pollfds.push_back (pollfd{.fd =listen_sock, .events =POLL_IN, .revents =0 });
300+ } else {
301+ std::cout << " fd after open: " << listen_sock << " \n " ;
302+ }
303+ }
303304 default :
304305 break ;
305306 }
@@ -315,18 +316,16 @@ namespace Modbus {
315316
316317 // ---------------------------------------------------------------------------
317318 void Server::Private::close () {
318- std::lock_guard<std::mutex> lg (d_guard);
319-
319+ std::lock_guard<std::mutex> lg (d_guard);
320320 if (backend->net () == Tcp) {
321-
322- if (sock != -1 ) {
323-
321+ if (listen_sock != -1 ) {
324322#ifdef _WIN32
325- ::closesocket (sock );
323+ ::closesocket (listen_sock );
326324#else
327- ::close (sock );
325+ ::close (listen_sock );
328326#endif
329- sock = -1 ;
327+ all_pollfds.clear ();
328+ listen_sock = -1 ;
330329 }
331330 }
332331 Device::Private::close ();
@@ -393,35 +392,99 @@ namespace Modbus {
393392 rc = 0 ;
394393 }
395394 else {
396-
397395 if (messageCB) {
398-
399396 rc = messageCB (req.get (), q);
400397 }
401398 }
402399 }
403400 return rc;
404401 }
405402
406- // ---------------------------------------------------------------------------
407- // static
408- int Server::Private::receive (Private * d) {
409- std::lock_guard<std::mutex> lg (d->d_guard );
410- int rc;
411- if ( (d->backend ->net () == Tcp) && !d->isConnected ()) {
403+ int Server::Private::poll (int timeout)
404+ {
405+ int eventCount = ::poll (all_pollfds.data (), all_pollfds.size (), timeout);
412406
413- // accept blocking call !
414- if (modbus_tcp_pi_accept (d->ctx (), &d->sock ) < 0 ) {
407+ if ( eventCount == 0 ) {
408+ // there is nothing to process
409+ return 0 ;
410+ }
415411
412+ if ( eventCount < 0 ) {
413+ // handle error
416414 return -1 ;
415+ }
416+
417+ // handle events
418+ std::vector<pollfd> new_pfds{};
419+
420+ for ( pollfd& pfd : all_pollfds ) {
421+
422+ if ( pfd.revents & POLLIN ) {
423+
424+ if ( backend->net () == Net::Tcp && pfd.fd == listen_sock ) {
425+ // if there is an event on the 'listening' socket
426+ // handle incomming connection request aka `connect()`
427+ // but only Server::Private::MAX_CONNECTIONS
428+ if ( all_pollfds.size () < MAX_CONNECTIONS ) {
429+ int new_socket = modbus_tcp_accept (ctx (), &listen_sock);
430+ if ( new_socket != -1 ) {
431+ new_pfds.push_back (pollfd{
432+ .fd =new_socket,
433+ .events =POLL_IN,
434+ .revents =0 });
435+ }
436+ }
437+ } else {
438+ // handle incomming request
439+ modbus_set_socket (ctx (), pfd.fd );
440+ int rc = Server::Private::receive (this );
441+ if ( rc == -1 ) {
442+ // if receive fails after a successfull poll
443+ // probably the connection is broken
444+ ::close (pfd.fd);
445+ pfd.fd = -1 ;
446+ continue ;
447+ }
448+
449+ task (rc);
450+ }
451+ } else if ( pfd.revents & POLLHUP ) {
452+ ::close (pfd.fd);
453+ pfd.fd = -1 ;
417454 }
418455 }
456+
457+ // remove bad file descriptors from watch list
458+ auto badFds = std::remove_if (all_pollfds.begin (), all_pollfds.end (), [](const pollfd& pfd) {
459+ return (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) || (pfd.fd == -1 );
460+ });
461+ all_pollfds.erase (badFds, all_pollfds.end ());
462+
463+ if ( all_pollfds.empty () ) {
464+ return -1 ;
465+ }
466+
467+ // add new accepted file descriptors to watch list
468+ if ( not new_pfds.empty () ) {
469+ all_pollfds.insert (all_pollfds.end (), new_pfds.begin (), new_pfds.end ());
470+ }
471+
472+ return 0 ;
473+ }
474+
475+ // ---------------------------------------------------------------------------
476+ // static
477+ int Server::Private::receive (Private * d) {
478+ std::lock_guard<std::mutex> lg (d->d_guard );
479+
480+ int rc = 0 ;
481+
419482 d->req ->clear ();
420483 rc = modbus_receive (d->ctx (), d->req ->adu ());
421- if (rc > 0 ) {
422-
423- d->req ->setAduSize (rc);
484+ if ( rc > 0 ) {
485+ d->req ->setAduSize (rc);
424486 }
487+
425488 return rc;
426489 }
427490
0 commit comments