Skip to content

Commit bfcb861

Browse files
committed
Updated clients with docblocks, made them final, moved certain responsibilities to ApiClients
1 parent e1d178e commit bfcb861

File tree

2 files changed

+56
-13
lines changed

2 files changed

+56
-13
lines changed

src/AsyncClient.php

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,47 @@
1414
use function EventLoop\getLoop;
1515
use function EventLoop\setLoop;
1616

17-
class AsyncClient
17+
final class AsyncClient
1818
{
19-
protected $transport;
20-
protected $app;
21-
protected $url;
19+
/**
20+
* @var Observable\RefCountObservable
21+
*/
2222
protected $client;
23+
24+
/**
25+
* @var Observable\AnonymousObservable
26+
*/
2327
protected $messages;
28+
29+
/**
30+
* @var array
31+
*/
2432
protected $channels = [];
2533

34+
/**
35+
* @param LoopInterface $loop
36+
* @param string $app Application ID
37+
*/
2638
public function __construct(LoopInterface $loop, string $app)
2739
{
40+
// Set loop into global look accessor
2841
setLoop($loop);
29-
$this->app = $app;
30-
$this->url = 'wss://ws.pusherapp.com/app/' .
31-
$this->app .
32-
'?client=wyrihaximus-php-pusher-client&version=0.0.1&protocol=7'
33-
;
42+
3443
//Only create one connection and share the most recent among all subscriber
35-
$this->client = (new WebsocketClient($this->url))->shareReplay(1);
44+
$this->client = (new WebsocketClient(ApiSettings::createUrl($app)))->shareReplay(1);
3645
$this->messages = $this->client
3746
->flatMap(function (MessageSubject $ms) {
3847
return $ms;
3948
})
4049
->map('json_decode');
4150
}
4251

52+
/**
53+
* Listen on a channel
54+
*
55+
* @param string $channel Channel to listen on
56+
* @return ObservableInterface
57+
*/
4358
public function channel(string $channel): ObservableInterface
4459
{
4560
if (isset($this->channels[$channel])) {
@@ -75,6 +90,11 @@ public function channel(string $channel): ObservableInterface
7590
return $this->channels[$channel];
7691
}
7792

93+
/**
94+
* Send a message through the client
95+
*
96+
* @param array $message Message to send, will be json encoded
97+
*/
7898
public function send(array $message)
7999
{
80100
$this->client

src/Client.php

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,37 @@
33
namespace ApiClients\Pusher;
44

55
use React\EventLoop\Factory;
6+
use React\EventLoop\LoopInterface;
67
use React\Promise\Deferred;
78
use Rx\Observer\CallbackObserver;
89
use function Clue\React\Block\await;
10+
use function React\Promise\all;
911

10-
class Client
12+
final class Client
1113
{
14+
/**
15+
* @var LoopInterface
16+
*/
1217
protected $loop;
18+
19+
/**
20+
* @var AsyncClient
21+
*/
1322
protected $client;
1423

24+
/**
25+
* @param string $app Application ID
26+
*/
1527
public function __construct(string $app)
1628
{
1729
$this->loop = Factory::create();
1830
$this->client = new AsyncClient($this->loop, $app);
1931
}
2032

33+
/**
34+
* @param string $channel Channel to listen on
35+
* @param callable $listener Listener to call on new messages
36+
*/
2137
public function channel(string $channel, callable $listener)
2238
{
2339
$this->channels(
@@ -28,10 +44,15 @@ public function channel(string $channel, callable $listener)
2844
);
2945
}
3046

47+
/**
48+
* @param string[] $channels Channels to listen on
49+
* @param callable $listener Listener to call on new messages
50+
*/
3151
public function channels(array $channels, callable $listener)
3252
{
33-
$deferred = new Deferred();
53+
$promises = [];
3454
foreach ($channels as $channel) {
55+
$deferred = new Deferred();
3556
$this->client->channel($channel)->subscribe(
3657
new CallbackObserver(
3758
$listener,
@@ -41,9 +62,11 @@ function () use ($deferred) {
4162
}
4263
)
4364
);
65+
$promises[] = $deferred->promise();
4466
}
67+
4568
await(
46-
$deferred->promise(),
69+
all($promises),
4770
$this->loop
4871
);
4972
}

0 commit comments

Comments
 (0)