Skip to content

Commit c7dafea

Browse files
committed
Added tests and fixed exceptions that allow the connection to auto reconnect
1 parent eea1d0a commit c7dafea

File tree

2 files changed

+93
-6
lines changed

2 files changed

+93
-6
lines changed

src/AsyncClient.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,6 @@ public function __construct(Subject $client)
7272
$this->messages = $events
7373
->merge($this->timeout($events))
7474
->merge($pusherErrors)
75-
->retryWhen(function (Observable $errors) {
76-
return $errors->flatMap(function (Throwable $throwable) {
77-
return $this->handleLowLevelError($throwable);
78-
});
79-
})
8075
->singleInstance();
8176
}
8277

@@ -131,6 +126,11 @@ public function channel(string $channel): Observable
131126
$this->channels[$channel] = $channelMessages
132127
->merge($subscribe)
133128
->filter([Event::class, 'subscriptionSucceeded'])
129+
->retryWhen(function (Observable $errors) {
130+
return $errors->flatMap(function (Throwable $throwable) {
131+
return $this->handleLowLevelError($throwable);
132+
});
133+
})
134134
->finally(function () use ($channel) {
135135
// Send unsubscribe event
136136
$this->send(Event::unsubscribeOn($channel));

tests/AsyncClientTest.php

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use ApiClients\Client\Pusher\AsyncClient;
66
use ApiClients\Client\Pusher\Event;
7+
use ApiClients\Client\Pusher\PusherErrorException;
78
use React\Dns\Resolver\Resolver;
89
use React\EventLoop\Factory;
910
use RuntimeException;
@@ -34,7 +35,7 @@ function ($e) use (&$capturedException) {
3435
$capturedException = $e;
3536
}
3637
);
37-
self::assertNotNull($capturedException);
38+
self::assertNull($capturedException);
3839
}
3940

4041
public function testConnectionRetry()
@@ -315,4 +316,90 @@ public function testPusherReconnectInnerError()
315316
[1040, '{"event":"pusher:unsubscribe","data":{"channel":"test"}}'],
316317
], $webSocket->getSentMessages());
317318
}
319+
320+
public function testPusherException4000NoAutoRetry()
321+
{
322+
$observable = $this->createColdObservable([
323+
onNext(320, '{"event":"pusher:connection_established","data":"{\"socket_id\":\"218656.9503498\",\"activity_timeout\":120}"}'),
324+
onNext(340, '{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"test"}'),
325+
onNext(350, '{"event":"new-listing","data":["test1"],"channel":"test"}'),
326+
onError(370, new PusherErrorException('', 4000)),
327+
]);
328+
329+
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
330+
331+
$results = $this->scheduler->startWithDispose(function () use ($webSocket) {
332+
return (new AsyncClient($webSocket))->channel('test');
333+
}, 5000);
334+
335+
$this->assertMessages([
336+
onNext(550, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
337+
onError(570, new PusherErrorException('', 4000)),
338+
], $results->getMessages());
339+
340+
$this->assertSubscriptions([subscribe(200, 570)], $observable->getSubscriptions());
341+
342+
$this->assertEquals([
343+
[520, '{"event":"pusher:subscribe","data":{"channel":"test"}}'],
344+
[570, '{"event":"pusher:unsubscribe","data":{"channel":"test"}}'],
345+
], $webSocket->getSentMessages());
346+
}
347+
348+
public function testPusherException4100AutoRetry()
349+
{
350+
$observable = $this->createColdObservable([
351+
onNext(320, '{"event":"pusher:connection_established","data":"{\"socket_id\":\"218656.9503498\",\"activity_timeout\":120}"}'),
352+
onNext(340, '{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"test"}'),
353+
onNext(350, '{"event":"new-listing","data":["test1"],"channel":"test"}'),
354+
onError(370, new PusherErrorException('', 4100)),
355+
]);
356+
357+
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
358+
359+
$results = $this->scheduler->startWithDispose(function () use ($webSocket) {
360+
return (new AsyncClient($webSocket))->channel('test');
361+
}, 3000);
362+
363+
$this->assertMessages([
364+
onNext(550, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
365+
onNext(1921, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
366+
], $results->getMessages());
367+
368+
$this->assertSubscriptions([subscribe(200, 570), subscribe(1571, 1941), subscribe(2942, 3000)], $observable->getSubscriptions());
369+
370+
$this->assertEquals([
371+
[520, '{"event":"pusher:subscribe","data":{"channel":"test"}}'],
372+
[1891, '{"event":"pusher:subscribe","data":{"channel":"test"}}'],
373+
[3000, '{"event":"pusher:unsubscribe","data":{"channel":"test"}}'],
374+
], $webSocket->getSentMessages());
375+
}
376+
377+
public function testPusherException4200AutoRetry()
378+
{
379+
$observable = $this->createColdObservable([
380+
onNext(320, '{"event":"pusher:connection_established","data":"{\"socket_id\":\"218656.9503498\",\"activity_timeout\":120}"}'),
381+
onNext(340, '{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"test"}'),
382+
onNext(350, '{"event":"new-listing","data":["test1"],"channel":"test"}'),
383+
onError(370, new PusherErrorException('', 4200)),
384+
]);
385+
386+
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
387+
388+
$results = $this->scheduler->startWithDispose(function () use ($webSocket) {
389+
return (new AsyncClient($webSocket))->channel('test');
390+
}, 1000);
391+
392+
$this->assertMessages([
393+
onNext(550, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
394+
onNext(921, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
395+
], $results->getMessages());
396+
397+
$this->assertSubscriptions([subscribe(200, 570), subscribe(571, 941), subscribe(942, 1000)], $observable->getSubscriptions());
398+
399+
$this->assertEquals([
400+
[520, '{"event":"pusher:subscribe","data":{"channel":"test"}}'],
401+
[891, '{"event":"pusher:subscribe","data":{"channel":"test"}}'],
402+
[1000, '{"event":"pusher:unsubscribe","data":{"channel":"test"}}'],
403+
], $webSocket->getSentMessages());
404+
}
318405
}

0 commit comments

Comments
 (0)