Skip to content

Commit 6ac0675

Browse files
authored
feat(tcp): add backlog parameter and batch-drain accept loop (#617)
1 parent 56a5443 commit 6ac0675

File tree

6 files changed

+102
-25
lines changed

6 files changed

+102
-25
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
### features
66

7-
- feat(dict,vec): add filter_nonnull_by and map_nonnull - [#576](https://github.com/azjezz/psl/pull/576) by @Dima-369
7+
- feat(dict, vec): add filter_nonnull_by and map_nonnull - [#576](https://github.com/azjezz/psl/pull/576) by @Dima-369
8+
* feat(tcp): add `backlog` parameter to `TCP\listen()` for configuring the pending connection queue size - [#617](https://github.com/azjezz/psl/pull/617) - by @azjezz
9+
* feat(tcp): listener now drains the accept backlog in a loop for higher throughput - [#617](https://github.com/azjezz/psl/pull/617) - by @azjezz
810

911
### other
1012

docs/content/networking/tcp.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ All TCP connectors implement `ConnectorInterface`, making them interchangeable a
2222

2323
@example('networking/tcp-socket-pool.php')
2424

25+
### Backlog
26+
27+
`TCP\listen()` accepts a `backlog` parameter (default 512) to configure the OS-level queue of pending connections. A larger backlog helps high-connection-rate servers avoid dropped connections.
28+
29+
@example('networking/tcp-backlog.php')
30+
2531
### Low-Level Socket
2632

2733
`Socket` gives you fine-grained control over socket options before connecting or listening. Create a socket, configure it, then consume it:
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
require_once __DIR__ . '/../../../vendor/autoload.php';
6+
7+
use Psl\TCP;
8+
9+
// Default backlog of 512
10+
$listener = TCP\listen('127.0.0.1', 8080);
11+
12+
// High-throughput server with larger backlog
13+
$listener = TCP\listen('127.0.0.1', 8080, backlog: 4096);

src/Psl/TCP/Internal/Listener.php

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
use Psl\TCP;
1111
use Revolt\EventLoop;
1212

13+
use function error_clear_last;
1314
use function error_get_last;
1415
use function fclose;
1516
use function is_resource;
17+
use function str_contains;
1618
use function stream_socket_accept;
1719

1820
/**
@@ -51,38 +53,42 @@ public function __construct(mixed $impl, int $idleConnections = self::DEFAULT_ID
5153
[$receiver, $sender] = Channel\bounded($idleConnections);
5254

5355
$this->receiver = $receiver;
54-
$this->watcher = EventLoop::onReadable(
55-
$impl,
56-
/**
57-
* @param resource $resource
58-
*/
59-
static function (string $watcher, mixed $resource) use ($sender): void {
60-
try {
56+
$this->watcher = EventLoop::onReadable($impl, static function (string $watcher, mixed $resource) use (
57+
$sender,
58+
): void {
59+
try {
60+
while (true) {
61+
error_clear_last();
6162
$sock = @stream_socket_accept($resource, timeout: 0.0);
62-
if (false !== $sock) {
63+
if ($sock !== false) {
6364
$sender->send([true, new Stream($sock)]);
64-
65-
return;
65+
continue;
6666
}
6767

6868
// @codeCoverageIgnoreStart
69-
/** @var array{file: string, line: int, message: string, type: int} $err */
7069
$err = error_get_last();
71-
$sender->send([
72-
false,
73-
new Network\Exception\RuntimeException(
74-
'Failed to accept incoming connection: ' . $err['message'],
75-
$err['type'],
76-
),
77-
]);
78-
// @codeCoverageIgnoreEnd
79-
} catch (Channel\Exception\ClosedChannelException) {
80-
EventLoop::cancel($watcher);
70+
if ($err !== null && !str_contains($err['message'], 'Accept failed')) {
71+
// OS error (e.g., EMFILE, ENFILE, ENOBUFS)
72+
$sender->send([
73+
false,
74+
new Network\Exception\RuntimeException(
75+
'Failed to accept incoming connection: ' . $err['message'],
76+
$err['type'],
77+
),
78+
]);
8179

82-
return;
80+
return;
81+
}
82+
83+
// No more pending connections (EAGAIN / timeout with no backlog).
84+
break;
85+
// @codeCoverageIgnoreEnd
8386
}
84-
},
85-
);
87+
} catch (Channel\Exception\ClosedChannelException) {
88+
EventLoop::cancel($watcher);
89+
return;
90+
}
91+
});
8692
}
8793

8894
#[Override]

src/Psl/TCP/listen.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* @param non-empty-string $host
1414
* @param int<0, max> $port
1515
* @param int<1, max> $idle_connections Maximum number of idle connections to buffer.
16+
* @param int<1, max> $backlog Maximum length of the queue of pending connections.
1617
*
1718
* @throws Network\Exception\RuntimeException If failed to listen on given address.
1819
*/
@@ -23,13 +24,15 @@ function listen(
2324
bool $reuse_address = false,
2425
bool $reuse_port = false,
2526
int $idle_connections = 256,
27+
int $backlog = 512,
2628
): ListenerInterface {
2729
$socket_context = ['socket' => [
2830
'ipv6_v6only' => true,
2931
'so_reuseaddr' => OS\is_windows() ? $reuse_port : $reuse_address,
3032
'so_reuseport' => $reuse_port,
3133
'so_broadcast' => false,
3234
'tcp_nodelay' => $no_delay,
35+
'backlog' => $backlog,
3336
]];
3437

3538
$socket = Network\Internal\server_listen("tcp://{$host}:{$port}", $socket_context);

tests/unit/TCP/ServerTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,53 @@ public function testGetLocalAddressOnStoppedListener(): void
3535
$listener->getLocalAddress();
3636
}
3737

38+
public function testListenWithCustomBacklog(): void
39+
{
40+
$listener = TCP\listen('127.0.0.1', 0, backlog: 128);
41+
$address = $listener->getLocalAddress();
42+
43+
static::assertSame('127.0.0.1', $address->host);
44+
static::assertGreaterThan(0, $address->port);
45+
46+
$client = TCP\connect('127.0.0.1', $address->port);
47+
$server = $listener->accept();
48+
49+
$client->write('ping');
50+
static::assertSame('ping', $server->read(4));
51+
52+
$client->close();
53+
$server->close();
54+
$listener->close();
55+
}
56+
57+
public function testAcceptMultipleConnections(): void
58+
{
59+
$listener = TCP\listen('127.0.0.1', 0, no_delay: true, backlog: 64);
60+
$address = $listener->getLocalAddress();
61+
62+
[$server1, $client1, $client2] = Async\concurrently([
63+
$listener->accept(...),
64+
static fn(): Network\StreamInterface => TCP\connect('127.0.0.1', $address->port),
65+
static fn(): Network\StreamInterface => TCP\connect('127.0.0.1', $address->port),
66+
]);
67+
68+
$server2 = $listener->accept();
69+
70+
$client1->write('one');
71+
$client2->write('two');
72+
73+
$msg1 = $server1->read(3);
74+
$msg2 = $server2->read(3);
75+
76+
static::assertTrue($msg1 === 'one' && $msg2 === 'two' || $msg1 === 'two' && $msg2 === 'one');
77+
78+
$client1->close();
79+
$client2->close();
80+
$server1->close();
81+
$server2->close();
82+
$listener->close();
83+
}
84+
3885
public function testWaitsForPendingOperation(): void
3986
{
4087
$listener = TCP\listen('127.0.0.1');

0 commit comments

Comments
 (0)