Skip to content

Commit 4379367

Browse files
authored
Sender support coroutine server. (#1919)
* Sender support coroutine server. * fixed travis. * Update OnPipeMessageListener.php * Update Server.php
1 parent 3734285 commit 4379367

File tree

3 files changed

+50
-7
lines changed

3 files changed

+50
-7
lines changed

src/Listener/OnPipeMessageListener.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public function process(object $event)
6464
$message = $event->data;
6565

6666
try {
67-
$this->sender->proxy($message->name, $message->arguments);
67+
$fd = $this->sender->getFdFromProxyMethod($message->name, $message->arguments);
68+
$this->sender->proxy($fd, $message->arguments);
6869
} catch (\Throwable $exception) {
6970
$formatter = $this->container->get(FormatterInterface::class);
7071
$this->logger->warning($formatter->format($exception));

src/Sender.php

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
*/
1212
namespace Hyperf\WebSocketServer;
1313

14+
use Hyperf\Contract\ConfigInterface;
1415
use Hyperf\Contract\StdoutLoggerInterface;
16+
use Hyperf\Server\CoroutineServer;
1517
use Hyperf\WebSocketServer\Exception\InvalidMethodException;
1618
use Psr\Container\ContainerInterface;
19+
use Swoole\Http\Response;
1720
use Swoole\Server;
1821

1922
/**
@@ -36,23 +39,45 @@ class Sender
3639
*/
3740
protected $workerId;
3841

42+
/**
43+
* @var Response[]
44+
*/
45+
protected $responses = [];
46+
47+
/**
48+
* @var bool
49+
*/
50+
protected $isCoroutineServer = false;
51+
3952
public function __construct(ContainerInterface $container)
4053
{
4154
$this->container = $container;
4255
$this->logger = $container->get(StdoutLoggerInterface::class);
56+
if ($config = $container->get(ConfigInterface::class)) {
57+
$this->isCoroutineServer = $config->get('server.type') === CoroutineServer::class;
58+
}
4359
}
4460

4561
public function __call($name, $arguments)
4662
{
47-
if (! $this->proxy($name, $arguments)) {
63+
$fd = $this->getFdFromProxyMethod($name, $arguments);
64+
65+
if ($this->isCoroutineServer) {
66+
if (isset($this->responses[$fd])) {
67+
array_shift($arguments);
68+
$this->responses[$fd]->push(...$arguments);
69+
$this->logger->debug("[WebSocket] Worker send to #{$fd}");
70+
}
71+
return;
72+
}
73+
74+
if (! $this->proxy($fd, $arguments)) {
4875
$this->sendPipeMessage($name, $arguments);
4976
}
5077
}
5178

52-
public function proxy(string $name, array $arguments): bool
79+
public function proxy(int $fd, array $arguments): bool
5380
{
54-
$fd = $this->getFdFromProxyMethod($name, $arguments);
55-
5681
$result = $this->check($fd);
5782
if ($result) {
5883
/** @var \Swoole\WebSocket\Server $server */
@@ -80,7 +105,16 @@ public function check($fd): bool
80105
return false;
81106
}
82107

83-
protected function getFdFromProxyMethod(string $method, array $arguments): int
108+
public function setResponse(int $fd, ?Response $response): void
109+
{
110+
if ($response === null) {
111+
unset($this->responses[$fd]);
112+
} else {
113+
$this->responses[$fd] = $response;
114+
}
115+
}
116+
117+
public function getFdFromProxyMethod(string $method, array $arguments): int
84118
{
85119
if (! in_array($method, ['push', 'send', 'sendto'])) {
86120
throw new InvalidMethodException(sprintf('Method [%s] is not allowed.', $method));

src/Server.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ public function getServer()
127127
return $this->container->get(SwooleServer::class);
128128
}
129129

130+
public function getSender(): Sender
131+
{
132+
return $this->container->get(Sender::class);
133+
}
134+
130135
public function onHandShake(SwooleRequest $request, SwooleResponse $response): void
131136
{
132137
try {
@@ -164,7 +169,9 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
164169
$server = $this->getServer();
165170
if ($server instanceof \Swoole\Coroutine\Http\Server) {
166171
$response->upgrade();
167-
[,,$callbacks] = ServerManager::get($this->serverName);
172+
$this->getSender()->setResponse($fd, $response);
173+
174+
[, , $callbacks] = ServerManager::get($this->serverName);
168175

169176
[$onMessageCallbackClass, $onMessageCallbackMethod] = $callbacks[SwooleEvent::ON_MESSAGE];
170177
$onMessageCallbackInstance = $this->container->get($onMessageCallbackClass);
@@ -197,6 +204,7 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
197204
// Delegate the exception to exception handler.
198205
$psr7Response = $this->exceptionHandlerDispatcher->dispatch($throwable, $this->exceptionHandlers);
199206
} finally {
207+
isset($fd) && $this->getSender()->setResponse($fd, null);
200208
// Send the Response to client.
201209
if (! $psr7Response || ! $psr7Response instanceof Psr7Response) {
202210
return;

0 commit comments

Comments
 (0)