Skip to content

Commit 36c6a74

Browse files
committed
Handle errors coming from Pusher
1 parent 8488242 commit 36c6a74

File tree

2 files changed

+79
-12
lines changed

2 files changed

+79
-12
lines changed

src/AsyncClient.php

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44

55
use React\Dns\Resolver\Resolver;
66
use React\EventLoop\LoopInterface;
7+
use RuntimeException;
78
use Rx\Disposable\CallbackDisposable;
89
use Rx\Observable;
910
use Rx\ObserverInterface;
1011
use Rx\Scheduler;
1112
use Rx\Websocket\Client as WebsocketClient;
1213
use Rx\Websocket\MessageSubject;
14+
use Rx\Websocket\WebsocketErrorException;
1315
use Throwable;
16+
use function React\Promise\reject;
17+
use function React\Promise\resolve;
1418

1519
final class AsyncClient
1620
{
@@ -88,29 +92,35 @@ public function __construct(Observable $client)
8892
->startWith($x);
8993
})
9094

91-
// Handle connection level errors
92-
->retryWhen(function (Observable $errors) {
93-
return $errors->flatMap(function (Throwable $throwable) {
94-
return $this->handleLowLevelError($throwable);
95-
});
96-
})
97-
9895
// Decode JSON
9996
->_ApiClients_jsonDecode()
10097

10198
// Deal with connection established messages
102-
->map(function (array $message) {
99+
->flatMap(function (array $message) {
103100
$this->delay = self::DEFAULT_DELAY;
104101

105102
$event = Event::createFromMessage($message);
106103

104+
if ($event->getEvent() === 'pusher:error') {
105+
return Observable::fromPromise(reject(new PusherErrorException($event->getData()['message'], $event->getData()['code'])));
106+
}
107+
107108
if ($event->getEvent() === 'pusher:connection_established') {
108109
$this->setActivityTimeout($event);
109110
}
110111

111-
return $event;
112+
return Observable::fromPromise(resolve($event));
113+
})
114+
115+
// Handle connection level errors
116+
->retryWhen(function (Observable $errors) {
117+
return $errors->flatMap(function (Throwable $throwable) {
118+
return $this->handleLowLevelError($throwable);
119+
});
112120
})
113-
->share();
121+
122+
// Share client
123+
->share();
114124
}
115125

116126
/**
@@ -206,8 +216,33 @@ public function send(array $message): bool
206216
return true;
207217
}
208218

219+
/**
220+
* Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
221+
*/
209222
private function handleLowLevelError(Throwable $throwable)
210223
{
224+
if (!($throwable instanceof WebsocketErrorException) && !($throwable instanceof RuntimeException) && !($throwable instanceof PusherErrorException)) {
225+
return Observable::fromPromise(reject($throwable));
226+
}
227+
228+
$code = $throwable->getCode();
229+
$pusherError = ($throwable instanceof WebsocketErrorException || $throwable instanceof PusherErrorException);
230+
231+
// Errors 4000-4099, don't retry connecting
232+
if ($pusherError && $code >= 4000 && $code <= 4099) {
233+
return Observable::fromPromise(reject($throwable));
234+
}
235+
236+
// Errors 4100-4199 reconnect after 1 or more seconds, we do it after 1.001 second
237+
if ($pusherError && $code >= 4100 && $code <= 4199) {
238+
return Observable::timer(1001);
239+
}
240+
241+
// Errors 4200-4299 connection closed by Pusher, reconnect immediately, we wait 0.001 second
242+
if ($pusherError && $code >= 4200 && $code <= 4299) {
243+
return Observable::timer(1);
244+
}
245+
211246
$this->delay *= 2;
212247

213248
return Observable::timer($this->delay);

tests/AsyncClientTest.php

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
<?php
2-
declare(strict_types=1);
1+
<?php declare(strict_types=1);
32

43
namespace ApiClients\Tests\Client\Pusher;
54

5+
use React\Dns\Resolver\Resolver;
6+
use function React\Promise\reject;
7+
use RuntimeException;
68
use ApiClients\Client\Pusher\AsyncClient;
79
use ApiClients\Tools\TestUtilities\TestCase;
810
use React\EventLoop\Factory;
11+
use Rx\Observable;
12+
use Rx\Scheduler\ImmediateScheduler;
13+
use Rx\Websocket\Client;
914

1015
final class AsyncClientTest extends TestCase
1116
{
@@ -15,4 +20,31 @@ public function testCreateFactory()
1520
$appId = uniqid('app-id-', true);
1621
self::assertInstanceOf(AsyncClient::class, AsyncClient::create($loop, $appId));
1722
}
23+
24+
public function testConnectionError()
25+
{
26+
$capturedException = null;
27+
$error = new RuntimeException();
28+
$observable = Observable::error($error, new ImmediateScheduler());
29+
$client = new AsyncClient($observable);
30+
$client->channel('test')->subscribe(
31+
function () {},
32+
function ($e) use (&$capturedException) {
33+
$capturedException = $e;
34+
}
35+
);
36+
self::assertNull($capturedException);
37+
}
38+
39+
public function testConnectionRetry()
40+
{
41+
$loop = Factory::create();
42+
$error = new RuntimeException('', 4199);
43+
$resolver = $this->prophesize(Resolver::class);
44+
$resolver->resolve('ws.pusherapp.com')->shouldBeCalled()->willReturn(reject($error));
45+
$client = AsyncClient::create($loop, 'abc', $resolver->reveal());
46+
$client->channel('test')->subscribe();
47+
$loop->addTimer(1, function () {});
48+
$loop->run();
49+
}
1850
}

0 commit comments

Comments
 (0)