Skip to content

Commit fe5a8f2

Browse files
committed
Apply cs-fix
1 parent 09e7840 commit fe5a8f2

File tree

6 files changed

+70
-70
lines changed

6 files changed

+70
-70
lines changed

src/AsyncClient.php

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ final class AsyncClient
4444

4545
/**
4646
* @internal
47-
* @param Subject $client
47+
* @param Subject $client
4848
* @throws \InvalidArgumentException
4949
*/
5050
public function __construct(Subject $client)
@@ -81,11 +81,11 @@ public function __construct(Subject $client)
8181
}
8282

8383
/**
84-
* @param LoopInterface $loop
85-
* @param string $app Application ID
86-
* @param Resolver $resolver Optional DNS resolver
87-
* @return AsyncClient
84+
* @param LoopInterface $loop
85+
* @param string $app Application ID
86+
* @param Resolver $resolver Optional DNS resolver
8887
* @throws \InvalidArgumentException
88+
* @return AsyncClient
8989
*/
9090
public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
9191
{
@@ -104,9 +104,9 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
104104
/**
105105
* Listen on a channel.
106106
*
107-
* @param string $channel Channel to listen on
108-
* @return Observable
107+
* @param string $channel Channel to listen on
109108
* @throws \InvalidArgumentException
109+
* @return Observable
110110
*/
111111
public function channel(string $channel): Observable
112112
{
@@ -155,9 +155,45 @@ public function send(array $message)
155155
$this->client->onNext(json_encode($message));
156156
}
157157

158+
/**
159+
* Returns an observable of TimeoutException.
160+
* The timeout observable will get cancelled every time a new event is received.
161+
*
162+
* @param Observable $events
163+
* @return Observable
164+
*/
165+
public function timeout(Observable $events): Observable
166+
{
167+
$timeoutDuration = $this->connected->map(function (Event $event) {
168+
return ($event->getData()['activity_timeout'] ?? self::NO_ACTIVITY_TIMEOUT) * 1000;
169+
});
170+
171+
return $timeoutDuration
172+
->combineLatest([$events])
173+
->pluck(0)
174+
->concat(Observable::of(-1))
175+
->flatMapLatest(function (int $time) {
176+
177+
// If the events observable ends, return an empty observable so we don't keep the stream alive
178+
if ($time === -1) {
179+
return Observable::empty();
180+
}
181+
182+
return Observable::never()
183+
->timeout($time)
184+
->catch(function () use ($time) {
185+
// ping (do something that causes incoming stream to get a message)
186+
$this->send(Event::ping());
187+
// this timeout will actually timeout with a TimeoutException - causing
188+
// everything above this to dispose
189+
return Observable::never()->timeout($time);
190+
});
191+
});
192+
}
193+
158194
/**
159195
* Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
160-
* @param Throwable $throwable
196+
* @param Throwable $throwable
161197
* @return Observable
162198
*/
163199
private function handleLowLevelError(Throwable $throwable): Observable
@@ -193,40 +229,4 @@ private function handleLowLevelError(Throwable $throwable): Observable
193229

194230
return Observable::timer($this->delay);
195231
}
196-
197-
/**
198-
* Returns an observable of TimeoutException.
199-
* The timeout observable will get cancelled every time a new event is received.
200-
*
201-
* @param Observable $events
202-
* @return Observable
203-
*/
204-
public function timeout(Observable $events): Observable
205-
{
206-
$timeoutDuration = $this->connected->map(function (Event $event) {
207-
return ($event->getData()['activity_timeout'] ?? self::NO_ACTIVITY_TIMEOUT) * 1000;
208-
});
209-
210-
return $timeoutDuration
211-
->combineLatest([$events])
212-
->pluck(0)
213-
->concat(Observable::of(-1))
214-
->flatMapLatest(function (int $time) {
215-
216-
// If the events observable ends, return an empty observable so we don't keep the stream alive
217-
if ($time === -1) {
218-
return Observable::empty();
219-
}
220-
221-
return Observable::never()
222-
->timeout($time)
223-
->catch(function () use ($time) {
224-
// ping (do something that causes incoming stream to get a message)
225-
$this->send(Event::ping());
226-
// this timeout will actually timeout with a TimeoutException - causing
227-
// everything above this to dispose
228-
return Observable::never()->timeout($time);
229-
});
230-
});
231-
}
232232
}

src/Event.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ final class Event implements \JsonSerializable
2121

2222
/**
2323
* @param string $event
24-
* @param array $data
24+
* @param array $data
2525
* @param string $channel
2626
*/
2727
public function __construct(string $event, array $data, string $channel = '')

src/WebSocket.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
namespace ApiClients\Client\Pusher;
44

5-
use React\EventLoop\LoopInterface;
65
use React\Dns\Resolver\Resolver;
7-
use Rx\Subject\ReplaySubject;
6+
use React\EventLoop\LoopInterface;
87
use Rx\DisposableInterface;
98
use Rx\ObserverInterface;
10-
use Rx\Websocket\Client;
9+
use Rx\Subject\ReplaySubject;
1110
use Rx\Subject\Subject;
11+
use Rx\Websocket\Client;
1212

1313
/**
1414
* Class WebSocket - WebSocket wrapper that queues messages while the connection is being established.
@@ -24,6 +24,11 @@ public function __construct(string $url, bool $useMessageObject = false, array $
2424
$this->ws = new Client($url, $useMessageObject, $subProtocols, $loop, $dnsResolver);
2525
}
2626

27+
public function onNext($value)
28+
{
29+
$this->sendSubject->onNext($value);
30+
}
31+
2732
protected function _subscribe(ObserverInterface $observer): DisposableInterface
2833
{
2934
return $this->ws
@@ -41,9 +46,4 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface
4146
->mergeAll()
4247
->subscribe($observer);
4348
}
44-
45-
public function onNext($value)
46-
{
47-
$this->sendSubject->onNext($value);
48-
}
4949
}

tests/AsyncClientTest.php

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
use ApiClients\Client\Pusher\AsyncClient;
66
use ApiClients\Client\Pusher\Event;
7+
use React\Dns\Resolver\Resolver;
78
use React\EventLoop\Factory;
9+
use RuntimeException;
810
use Rx\Exception\TimeoutException;
911
use Rx\Observable;
10-
use React\Dns\Resolver\Resolver;
11-
use RuntimeException;
1212
use function React\Promise\reject;
1313

1414
final class AsyncClientTest extends TestCase
@@ -66,7 +66,7 @@ public function testWebSocketEmpty()
6666
{
6767
$observable = $this->createHotObservable([
6868
onNext(150, 1),
69-
onCompleted(235)
69+
onCompleted(235),
7070
]);
7171

7272
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
@@ -76,7 +76,7 @@ public function testWebSocketEmpty()
7676
});
7777

7878
$this->assertMessages([
79-
onCompleted(237)
79+
onCompleted(237),
8080
], $results->getMessages());
8181

8282
$this->assertSubscriptions([subscribe(200, 235)], $observable->getSubscriptions());
@@ -86,7 +86,7 @@ public function testWebSocketDispose()
8686
{
8787
$observable = $this->createHotObservable([
8888
onNext(150, 1),
89-
onCompleted(435)
89+
onCompleted(435),
9090
]);
9191

9292
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
@@ -105,7 +105,7 @@ public function testPusherConnection()
105105
$observable = $this->createHotObservable([
106106
onNext(150, 1),
107107
onNext(320, '{"event":"pusher:connection_established","data":"{\"socket_id\":\"218656.9503498\",\"activity_timeout\":120}"}'),
108-
onCompleted(635)
108+
onCompleted(635),
109109
]);
110110

111111
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
@@ -130,7 +130,7 @@ public function testPusherSubscribed()
130130
onNext(150, 1),
131131
onNext(320, '{"event":"pusher:connection_established","data":"{\"socket_id\":\"218656.9503498\",\"activity_timeout\":120}"}'),
132132
onNext(340, '{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"test"}'),
133-
onCompleted(635)
133+
onCompleted(635),
134134
]);
135135

136136
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
@@ -159,7 +159,7 @@ public function testPusherData()
159159
onNext(370, '{"event":"new-listing","data":["test10"],"channel":"other"}'),
160160
onNext(390, '{"event":"new-listing","data":["test2"],"channel":"test"}'),
161161
onNext(400, '{"event":"new-listing","data":["test3"],"channel":"test"}'),
162-
onCompleted(900)
162+
onCompleted(900),
163163
]);
164164

165165
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
@@ -172,7 +172,7 @@ public function testPusherData()
172172
onNext(350, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
173173
onNext(390, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test2"],"channel":"test"}', true))),
174174
onNext(400, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test3"],"channel":"test"}', true))),
175-
onCompleted(902)
175+
onCompleted(902),
176176
], $results->getMessages());
177177

178178
$this->assertSubscriptions([subscribe(200, 900)], $observable->getSubscriptions());
@@ -253,7 +253,7 @@ public function testPusherReconnectInnerError()
253253
$this->assertMessages([
254254
onNext(550, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
255255
onNext(1020, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
256-
onCompleted(1040)
256+
onCompleted(1040),
257257
], $results->getMessages());
258258

259259
$this->assertSubscriptions([subscribe(200, 570), subscribe(670, 1040)], $observable->getSubscriptions());

tests/TestCase.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<?php
1+
<?php declare(strict_types=1);
22

33
namespace ApiClients\Tests\Client\Pusher;
44

tests/TestWebSocketSubject.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<?php
1+
<?php declare(strict_types=1);
22

33
namespace ApiClients\Tests\Client\Pusher;
44

@@ -20,11 +20,6 @@ public function __construct(Observable $observable, TestScheduler $scheduler)
2020
$this->scheduler = $scheduler;
2121
}
2222

23-
protected function _subscribe(ObserverInterface $observer): DisposableInterface
24-
{
25-
return $this->observable->subscribe($observer);
26-
}
27-
2823
public function getSentMessages(): array
2924
{
3025
return $this->sentMessages;
@@ -34,4 +29,9 @@ public function onNext($value)
3429
{
3530
$this->sentMessages[] = [$this->scheduler->getClock(), $value];
3631
}
32+
33+
protected function _subscribe(ObserverInterface $observer): DisposableInterface
34+
{
35+
return $this->observable->subscribe($observer);
36+
}
3737
}

0 commit comments

Comments
 (0)