Skip to content

Commit 50a57c9

Browse files
committed
Support swow for websocket server.
1 parent e19c9e5 commit 50a57c9

File tree

1 file changed

+71
-44
lines changed

1 file changed

+71
-44
lines changed

src/Server.php

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
use Hyperf\Contract\OnOpenInterface;
2020
use Hyperf\Contract\StdoutLoggerInterface;
2121
use Hyperf\Dispatcher\HttpDispatcher;
22+
use Hyperf\Engine\Constant;
23+
use Hyperf\Engine\Http\FdGetter;
24+
use Hyperf\Engine\WebSocket\WebSocket;
2225
use Hyperf\ExceptionHandler\ExceptionHandlerDispatcher;
2326
use Hyperf\HttpMessage\Base\Response;
2427
use Hyperf\HttpMessage\Server\Request as Psr7Request;
@@ -40,10 +43,8 @@
4043
use Psr\Container\ContainerInterface;
4144
use Psr\Http\Message\ResponseInterface;
4245
use Psr\Http\Message\ServerRequestInterface;
43-
use Swoole\Http\Request as SwooleRequest;
4446
use Swoole\Http\Response as SwooleResponse;
4547
use Swoole\Server as SwooleServer;
46-
use Swoole\WebSocket\CloseFrame;
4748
use Swoole\WebSocket\Server as WebSocketServer;
4849
use Throwable;
4950

@@ -151,11 +152,15 @@ public function getSender(): Sender
151152
return $this->container->get(Sender::class);
152153
}
153154

155+
/**
156+
* @param \Swoole\Http\Request|\Swow\Http\Server\Request $request
157+
* @param SwooleResponse $response
158+
*/
154159
public function onHandShake($request, $response): void
155160
{
156161
try {
157162
CoordinatorManager::until(Constants::WORKER_START)->yield();
158-
$fd = $request->fd;
163+
$fd = $this->getFd($response);
159164
Context::set(WsContext::FD, $fd);
160165
$security = $this->container->get(Security::class);
161166

@@ -190,40 +195,24 @@ public function onHandShake($request, $response): void
190195

191196
FdCollector::set($fd, $class);
192197
$server = $this->getServer();
193-
if ($server instanceof \Swoole\Coroutine\Http\Server) {
194-
$response->upgrade();
195-
$this->getSender()->setResponse($fd, $response);
196-
$this->deferOnOpen($request, $class, $response);
197-
198-
[, , $callbacks] = ServerManager::get($this->serverName);
199-
200-
[$onMessageCallbackClass, $onMessageCallbackMethod] = $callbacks[Event::ON_MESSAGE];
201-
$onMessageCallbackInstance = $this->container->get($onMessageCallbackClass);
198+
if (Constant::isCoroutineServer($server)) {
199+
$upgrade = new WebSocket($response, $request);
202200

203-
[$onCloseCallbackClass, $onCloseCallbackMethod] = $callbacks[Event::ON_CLOSE];
204-
$onCloseCallbackInstance = $this->container->get($onCloseCallbackClass);
205-
206-
while (true) {
207-
$frame = $response->recv();
208-
if ($frame === false || $frame instanceof CloseFrame || $frame === '') {
209-
wait(static function () use ($onCloseCallbackInstance, $onCloseCallbackMethod, $response, $fd) {
210-
$onCloseCallbackInstance->{$onCloseCallbackMethod}($response, $fd, 0);
211-
});
212-
break;
213-
}
201+
// TODO: Support SWOW
202+
$response instanceof SwooleResponse && $this->getSender()->setResponse($fd, $response);
203+
$this->deferOnOpen($request, $class, $response);
214204

215-
wait(static function () use ($onMessageCallbackInstance, $onMessageCallbackMethod, $response, $frame) {
216-
$onMessageCallbackInstance->{$onMessageCallbackMethod}($response, $frame);
217-
});
218-
}
205+
$upgrade->on(WebSocket::ON_MESSAGE, $this->getOnMessageCallback());
206+
$upgrade->on(WebSocket::ON_CLOSE, $this->getOnCloseCallback());
207+
$upgrade->start();
219208
} else {
220209
$this->deferOnOpen($request, $class, $server);
221210
}
222211
} catch (Throwable $throwable) {
223212
// Delegate the exception to exception handler.
224213
$psr7Response = $this->exceptionHandlerDispatcher->dispatch($throwable, $this->exceptionHandlers);
225-
FdCollector::del($request->fd);
226-
WsContext::release($request->fd);
214+
isset($fd) && FdCollector::del($fd);
215+
isset($fd) && WsContext::release($fd);
227216
} finally {
228217
isset($fd) && $this->getSender()->setResponse($fd, null);
229218
// Send the Response to client.
@@ -236,10 +225,10 @@ public function onHandShake($request, $response): void
236225

237226
public function onMessage($server, $frame): void
238227
{
239-
if ($server instanceof SwooleResponse) {
240-
$fd = $server->fd;
241-
} else {
228+
if ($server instanceof WebSocketServer) {
242229
$fd = $frame->fd;
230+
} else {
231+
$fd = $this->getFd($server);
243232
}
244233
Context::set(WsContext::FD, $fd);
245234
$fdObj = FdCollector::get($fd);
@@ -288,31 +277,37 @@ public function onClose($server, int $fd, int $reactorId): void
288277
}
289278
}
290279

280+
protected function getFd($response): int
281+
{
282+
return $this->container->get(FdGetter::class)->get($response);
283+
}
284+
291285
/**
292286
* @param SwooleResponse|WebSocketServer $server
287+
* @param mixed $request
293288
*/
294-
protected function deferOnOpen(SwooleRequest $request, string $class, $server)
289+
protected function deferOnOpen($request, string $class, $server)
295290
{
296-
$onOpen = function () use ($request, $class, $server) {
297-
$instance = $this->container->get($class);
291+
$instance = $this->container->get($class);
292+
wait(static function () use ($request, $instance, $server) {
298293
if ($instance instanceof OnOpenInterface) {
299294
$instance->onOpen($server, $request);
300295
}
301-
};
302-
303-
if ($server instanceof SwooleResponse) {
304-
wait($onOpen);
305-
} else {
306-
defer($onOpen);
307-
}
296+
});
308297
}
309298

310299
/**
311300
* Initialize PSR-7 Request.
301+
* @param mixed $request
312302
*/
313-
protected function initRequest(SwooleRequest $request): ServerRequestInterface
303+
protected function initRequest($request): ServerRequestInterface
314304
{
315-
Context::set(ServerRequestInterface::class, $psr7Request = Psr7Request::loadFromSwooleRequest($request));
305+
if ($request instanceof ServerRequestInterface) {
306+
$psr7Request = $request;
307+
} else {
308+
$psr7Request = Psr7Request::loadFromSwooleRequest($request);
309+
}
310+
Context::set(ServerRequestInterface::class, $psr7Request);
316311
WsContext::set(ServerRequestInterface::class, $psr7Request);
317312
return $psr7Request;
318313
}
@@ -325,4 +320,36 @@ protected function initResponse(): ResponseInterface
325320
Context::set(ResponseInterface::class, $psr7Response = new Psr7Response());
326321
return $psr7Response;
327322
}
323+
324+
protected function getOnMessageCallback(): callable
325+
{
326+
[$instance, $method] = $this->getCallback(Event::ON_MESSAGE);
327+
328+
return static function ($response, $frame) use ($instance, $method) {
329+
wait(static function () use ($instance, $method, $response, $frame) {
330+
$instance->{$method}($response, $frame);
331+
});
332+
};
333+
}
334+
335+
protected function getOnCloseCallback(): callable
336+
{
337+
[$instance, $method] = $this->getCallback(Event::ON_CLOSE);
338+
339+
return static function ($response, $fd) use ($instance, $method) {
340+
wait(static function () use ($instance, $method, $response, $fd) {
341+
$instance->{$method}($response, $fd, 0);
342+
});
343+
};
344+
}
345+
346+
protected function getCallback(string $event): array
347+
{
348+
[, , $callbacks] = ServerManager::get($this->serverName);
349+
350+
[$callback, $method] = $callbacks[$event];
351+
$instance = $this->container->get($callback);
352+
353+
return [$instance, $method];
354+
}
328355
}

0 commit comments

Comments
 (0)