Skip to content

Commit 7a27c72

Browse files
authored
Added method Sender::close(int $fd, bool $reset = null). (#1920)
1 parent 4379367 commit 7a27c72

File tree

3 files changed

+29
-16
lines changed

3 files changed

+29
-16
lines changed

src/Listener/OnPipeMessageListener.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public function process(object $event)
6464
$message = $event->data;
6565

6666
try {
67-
$fd = $this->sender->getFdFromProxyMethod($message->name, $message->arguments);
68-
$this->sender->proxy($fd, $message->arguments);
67+
[$fd, $method] = $this->sender->getFdAndMethodFromProxyMethod($message->name, $message->arguments);
68+
$this->sender->proxy($fd, $method, $message->arguments);
6969
} catch (\Throwable $exception) {
7070
$formatter = $this->container->get(FormatterInterface::class);
7171
$this->logger->warning($formatter->format($exception));

src/Sender.php

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
/**
2323
* @method push(int $fd, $data, int $opcode = null, $finish = null)
24+
* @method close(int $fd, bool $reset = null)
2425
*/
2526
class Sender
2627
{
@@ -60,29 +61,29 @@ public function __construct(ContainerInterface $container)
6061

6162
public function __call($name, $arguments)
6263
{
63-
$fd = $this->getFdFromProxyMethod($name, $arguments);
64+
[$fd, $method] = $this->getFdAndMethodFromProxyMethod($name, $arguments);
6465

6566
if ($this->isCoroutineServer) {
6667
if (isset($this->responses[$fd])) {
6768
array_shift($arguments);
68-
$this->responses[$fd]->push(...$arguments);
69+
$this->responses[$fd]->{$method}(...$arguments);
6970
$this->logger->debug("[WebSocket] Worker send to #{$fd}");
7071
}
7172
return;
7273
}
7374

74-
if (! $this->proxy($fd, $arguments)) {
75+
if (! $this->proxy($fd, $method, $arguments)) {
7576
$this->sendPipeMessage($name, $arguments);
7677
}
7778
}
7879

79-
public function proxy(int $fd, array $arguments): bool
80+
public function proxy(int $fd, string $method, array $arguments): bool
8081
{
8182
$result = $this->check($fd);
8283
if ($result) {
8384
/** @var \Swoole\WebSocket\Server $server */
8485
$server = $this->getServer();
85-
$server->push(...$arguments);
86+
$server->{$method}(...$arguments);
8687
$this->logger->debug("[WebSocket] Worker.{$this->workerId} send to #{$fd}");
8788
}
8889

@@ -114,13 +115,16 @@ public function setResponse(int $fd, ?Response $response): void
114115
}
115116
}
116117

117-
public function getFdFromProxyMethod(string $method, array $arguments): int
118+
public function getFdAndMethodFromProxyMethod(string $method, array $arguments): array
118119
{
119-
if (! in_array($method, ['push', 'send', 'sendto'])) {
120+
if (! in_array($method, ['push', 'send', 'sendto', 'close'])) {
120121
throw new InvalidMethodException(sprintf('Method [%s] is not allowed.', $method));
121122
}
122123

123-
return (int) $arguments[0];
124+
if ($method !== 'close') {
125+
$method = 'push';
126+
}
127+
return [(int) $arguments[0], $method];
124128
}
125129

126130
protected function getServer(): Server

src/Server.php

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
170170
if ($server instanceof \Swoole\Coroutine\Http\Server) {
171171
$response->upgrade();
172172
$this->getSender()->setResponse($fd, $response);
173+
$this->deferOnOpen($request, $class, $response);
173174

174175
[, , $callbacks] = ServerManager::get($this->serverName);
175176

@@ -192,12 +193,7 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
192193
$onMessageCallbackInstance->{$onMessageCallbackMethod}($response, $frame);
193194
}
194195
} else {
195-
defer(function () use ($request, $class, $server) {
196-
$instance = $this->container->get($class);
197-
if ($instance instanceof OnOpenInterface) {
198-
$instance->onOpen($server, $request);
199-
}
200-
});
196+
$this->deferOnOpen($request, $class, $server);
201197
}
202198
}
203199
} catch (\Throwable $throwable) {
@@ -257,6 +253,19 @@ public function onClose($server, int $fd, int $reactorId): void
257253
}
258254
}
259255

256+
/**
257+
* @param SwooleResponse|WebSocketServer $server
258+
*/
259+
protected function deferOnOpen(SwooleRequest $request, string $class, $server)
260+
{
261+
defer(function () use ($request, $class, $server) {
262+
$instance = $this->container->get($class);
263+
if ($instance instanceof OnOpenInterface) {
264+
$instance->onOpen($server, $request);
265+
}
266+
});
267+
}
268+
260269
/**
261270
* Initialize PSR-7 Request.
262271
*/

0 commit comments

Comments
 (0)