Skip to content

Commit 57b6325

Browse files
committed
Removed unnecessary promises. Observables are only converted to promises at the end of the stream.
1 parent 3103359 commit 57b6325

File tree

4 files changed

+16
-37
lines changed

4 files changed

+16
-37
lines changed

examples/reddit-single.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* The App ID isn't a secret and comes from a Pusher blog post:
1010
* @link https://blog.pusher.com/pusher-realtime-reddit-api/
1111
*/
12-
$client = Client::create(require 'reddit.key.php');
12+
$client = new Client(require 'reddit.key.php');
1313

1414
$client->channel((string) $argv[1], function (Event $event) {
1515
echo 'Channel: ', $event->getChannel(), PHP_EOL;

examples/reddit.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* The App ID isn't a secret and comes from a Pusher blog post:
1010
* @link https://blog.pusher.com/pusher-realtime-reddit-api/
1111
*/
12-
$client = Client::create(require 'reddit.key.php');
12+
$client = new Client(require 'reddit.key.php');
1313

1414
$subReddits = $argv;
1515
array_shift($subReddits);

src/AsyncClient.php

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
use Rx\Websocket\MessageSubject;
1414
use Rx\Websocket\WebsocketErrorException;
1515
use Throwable;
16-
use function React\Promise\reject;
17-
use function React\Promise\resolve;
1816

1917
final class AsyncClient
2018
{
@@ -102,17 +100,17 @@ public function __construct(Observable $client)
102100
$event = Event::createFromMessage($message);
103101

104102
if ($event->getEvent() === 'pusher:error') {
105-
return Observable::fromPromise(reject(
106-
new PusherErrorException($event->getData()['message'], $event->getData()['code'])
107-
));
103+
$throwable = new PusherErrorException($event->getData()['message'], $event->getData()['code']);
104+
105+
return Observable::error($throwable);
108106
}
109107

110108
// If this event represents the connection_established event set the timeout
111109
if ($event->getEvent() === 'pusher:connection_established') {
112110
$this->setActivityTimeout($event);
113111
}
114112

115-
return Observable::fromPromise(resolve($event));
113+
return Observable::of($event);
116114
})
117115

118116
// Handle connection level and Pusher procotol errors
@@ -240,15 +238,15 @@ private function handleLowLevelError(Throwable $throwable)
240238
!($throwable instanceof RuntimeException) &&
241239
!($throwable instanceof PusherErrorException)
242240
) {
243-
return Observable::fromPromise(reject($throwable));
241+
return Observable::error($throwable);
244242
}
245243

246244
$code = $throwable->getCode();
247245
$pusherError = ($throwable instanceof WebsocketErrorException || $throwable instanceof PusherErrorException);
248246

249247
// Errors 4000-4099, don't retry connecting
250248
if ($pusherError && $code >= 4000 && $code <= 4099) {
251-
return Observable::fromPromise(reject($throwable));
249+
return Observable::error($throwable);
252250
}
253251

254252
// Errors 4100-4199 reconnect after 1 or more seconds, we do it after 1.001 second

src/Client.php

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44

55
use React\EventLoop\Factory;
66
use React\EventLoop\LoopInterface;
7-
use React\Promise\Deferred;
8-
use Throwable;
7+
use Rx\Observable;
98
use function Clue\React\Block\await;
10-
use function React\Promise\all;
119

1210
final class Client
1311
{
@@ -36,12 +34,7 @@ public function __construct(string $app)
3634
*/
3735
public function channel(string $channel, callable $listener)
3836
{
39-
$this->channels(
40-
[
41-
$channel,
42-
],
43-
$listener
44-
);
37+
$this->channels([$channel], $listener);
4538
}
4639

4740
/**
@@ -50,24 +43,12 @@ public function channel(string $channel, callable $listener)
5043
*/
5144
public function channels(array $channels, callable $listener)
5245
{
53-
$promises = [];
54-
foreach ($channels as $channel) {
55-
$deferred = new Deferred();
56-
$this->client->channel($channel)->subscribe(
57-
$listener,
58-
function (Throwable $throwable) {
59-
throw $throwable;
60-
},
61-
function () use ($deferred) {
62-
$deferred->resolve();
63-
}
64-
);
65-
$promises[] = $deferred->promise();
66-
}
46+
$promise = Observable::fromArray($channels)
47+
->flatMap([$this->client, 'channel'])
48+
->do($listener)
49+
->count()
50+
->toPromise();
6751

68-
await(
69-
all($promises),
70-
$this->loop
71-
);
52+
await($promise, $this->loop);
7253
}
7354
}

0 commit comments

Comments
 (0)