Skip to content

Commit 3734285

Browse files
Support WebSocket coroutine server (#1908)
* Support WebSocket coroutine server * Fixed bugs. * Fixed bug. Co-authored-by: 李铭昕 <[email protected]>
1 parent 2fd2e4e commit 3734285

File tree

2 files changed

+69
-15
lines changed

2 files changed

+69
-15
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* This file is part of Hyperf.
6+
*
7+
* @link https://www.hyperf.io
8+
* @document https://doc.hyperf.io
9+
* @contact [email protected]
10+
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
11+
*/
12+
namespace Hyperf\WebSocketServer\Exception;
13+
14+
use Hyperf\Server\Exception\ServerException;
15+
16+
class WebSocketMessageException extends ServerException
17+
{
18+
}

src/Server.php

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
use Hyperf\HttpServer\MiddlewareManager;
2828
use Hyperf\HttpServer\ResponseEmitter;
2929
use Hyperf\HttpServer\Router\Dispatched;
30+
use Hyperf\Server\ServerManager;
31+
use Hyperf\Server\SwooleEvent;
3032
use Hyperf\Utils\Context;
3133
use Hyperf\Utils\Coordinator\Constants;
3234
use Hyperf\Utils\Coordinator\CoordinatorManager;
@@ -40,6 +42,7 @@
4042
use Swoole\Http\Request as SwooleRequest;
4143
use Swoole\Http\Response as SwooleResponse;
4244
use Swoole\Server as SwooleServer;
45+
use Swoole\WebSocket\CloseFrame;
4346
use Swoole\Websocket\Frame;
4447
use Swoole\WebSocket\Server as WebSocketServer;
4548

@@ -116,7 +119,10 @@ public function initCoreMiddleware(string $serverName): void
116119
]);
117120
}
118121

119-
public function getServer(): WebSocketServer
122+
/**
123+
* @return \Swoole\Coroutine\Http\Server|WebSocketServer
124+
*/
125+
public function getServer()
120126
{
121127
return $this->container->get(SwooleServer::class);
122128
}
@@ -125,13 +131,14 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
125131
{
126132
try {
127133
CoordinatorManager::until(Constants::WORKER_START)->yield();
128-
Context::set(WsContext::FD, $request->fd);
134+
$fd = $request->fd;
135+
Context::set(WsContext::FD, $fd);
129136
$security = $this->container->get(Security::class);
130137

131138
$psr7Request = $this->initRequest($request);
132139
$psr7Response = $this->initResponse($response);
133140

134-
$this->logger->debug(sprintf('WebSocket: fd[%d] start a handshake request.', $request->fd));
141+
$this->logger->debug(sprintf('WebSocket: fd[%d] start a handshake request.', $fd));
135142

136143
$key = $psr7Request->getHeaderLine(Security::SEC_WEBSOCKET_KEY);
137144
if ($security->isInvalidSecurityKey($key)) {
@@ -153,14 +160,38 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
153160
$class = $psr7Response->getAttribute('class');
154161

155162
if (! empty($class)) {
156-
FdCollector::set($request->fd, $class);
157-
158-
defer(function () use ($request, $class) {
159-
$instance = $this->container->get($class);
160-
if ($instance instanceof OnOpenInterface) {
161-
$instance->onOpen($this->getServer(), $request);
163+
FdCollector::set($fd, $class);
164+
$server = $this->getServer();
165+
if ($server instanceof \Swoole\Coroutine\Http\Server) {
166+
$response->upgrade();
167+
[,,$callbacks] = ServerManager::get($this->serverName);
168+
169+
[$onMessageCallbackClass, $onMessageCallbackMethod] = $callbacks[SwooleEvent::ON_MESSAGE];
170+
$onMessageCallbackInstance = $this->container->get($onMessageCallbackClass);
171+
172+
[$onCloseCallbackClass, $onCloseCallbackMethod] = $callbacks[SwooleEvent::ON_CLOSE];
173+
$onCloseCallbackInstance = $this->container->get($onCloseCallbackClass);
174+
while (true) {
175+
$frame = $response->recv();
176+
if ($frame === false) {
177+
// When close the connection by server-side, the $frame is false.
178+
break;
179+
}
180+
if ($frame instanceof CloseFrame || $frame === '') {
181+
// The connection is closed.
182+
$onCloseCallbackInstance->{$onCloseCallbackMethod}($response, $fd, 0);
183+
break;
184+
}
185+
$onMessageCallbackInstance->{$onMessageCallbackMethod}($response, $frame);
162186
}
163-
});
187+
} else {
188+
defer(function () use ($request, $class, $server) {
189+
$instance = $this->container->get($class);
190+
if ($instance instanceof OnOpenInterface) {
191+
$instance->onOpen($server, $request);
192+
}
193+
});
194+
}
164195
}
165196
} catch (\Throwable $throwable) {
166197
// Delegate the exception to exception handler.
@@ -174,12 +205,17 @@ public function onHandShake(SwooleRequest $request, SwooleResponse $response): v
174205
}
175206
}
176207

177-
public function onMessage(WebSocketServer $server, Frame $frame): void
208+
public function onMessage($server, Frame $frame): void
178209
{
179-
Context::set(WsContext::FD, $frame->fd);
180-
$fdObj = FdCollector::get($frame->fd);
210+
if ($server instanceof SwooleResponse) {
211+
$fd = $server->fd;
212+
} else {
213+
$fd = $frame->fd;
214+
}
215+
Context::set(WsContext::FD, $fd);
216+
$fdObj = FdCollector::get($fd);
181217
if (! $fdObj) {
182-
$this->logger->warning(sprintf('WebSocket: fd[%d] does not exist.', $frame->fd));
218+
$this->logger->warning(sprintf('WebSocket: fd[%d] does not exist.', $fd));
183219
return;
184220
}
185221

@@ -193,7 +229,7 @@ public function onMessage(WebSocketServer $server, Frame $frame): void
193229
$instance->onMessage($server, $frame);
194230
}
195231

196-
public function onClose(SwooleServer $server, int $fd, int $reactorId): void
232+
public function onClose($server, int $fd, int $reactorId): void
197233
{
198234
$this->logger->debug(sprintf('WebSocket: fd[%d] closed.', $fd));
199235

0 commit comments

Comments
 (0)