4
4
namespace WyriHaximus \Pusher ;
5
5
6
6
use React \EventLoop \LoopInterface ;
7
+ use Rx \Disposable \CallbackDisposable ;
7
8
use Rx \Observable ;
8
9
use Rx \ObservableInterface ;
9
10
use Rx \Observer \CallbackObserver ;
10
11
use Rx \ObserverInterface ;
11
12
use Rx \React \Promise ;
12
13
use Rx \Scheduler \EventLoopScheduler ;
14
+ use Rx \SchedulerInterface ;
13
15
use Rx \Websocket \Client ;
16
+ use Rx \Websocket \MessageSubject ;
14
17
use WyriHaximus \ApiClient \Transport \Client as Transport ;
15
18
use WyriHaximus \ApiClient \Transport \Factory ;
16
19
use function React \Promise \resolve ;
@@ -23,7 +26,7 @@ class AsyncClient
23
26
protected $ app ;
24
27
protected $ url ;
25
28
protected $ client ;
26
- protected $ connection ;
29
+ protected $ message ;
27
30
protected $ channels = [];
28
31
29
32
public function __construct (LoopInterface $ loop , string $ app , Transport $ transport = null )
@@ -42,40 +45,45 @@ public function __construct(LoopInterface $loop, string $app, Transport $transpo
42
45
$ this ->app .
43
46
'?client=wyrihaximus-php-pusher-client&version=0.0.1&protocol=7 '
44
47
;
45
-
46
- $ this ->client = new Client ($ this ->url );
47
- $ this ->connection = Observable::create (function (ObserverInterface $ observer ) {
48
- $ this ->client ->subscribe (new CallbackObserver (function ($ ms ) use ($ observer ) {
49
- $ ms ->subscribe (new CallbackObserver (
50
- function ($ message ) use ($ observer ) {
51
- $ observer ->onNext ($ message );
52
- }
53
- ), new EventLoopScheduler (getLoop ()));
54
- }));
55
- }, new EventLoopScheduler ($ loop ))->map (function ($ message ) {
56
- return json_decode ((string )$ message , true );
57
- });
48
+ $ this ->client = (new Client ($ this ->url ))->shareReplay (1 ); //Only create one connection and share the most recent among all subscriber
49
+ $ this ->messages = $ this ->client
50
+ ->flatMap (function (MessageSubject $ ms ) {
51
+ return $ ms ;
52
+ })
53
+ ->map ('json_decode ' );
58
54
}
59
55
60
- public function subscribe (string $ channel ): ObservableInterface
56
+ public function channel (string $ channel ): ObservableInterface
61
57
{
62
- $ this ->channels [$ channel ] = $ channel ;
63
- $ this ->send ([
64
- 'event ' => 'pusher:subscribe ' ,
65
- 'data ' => [
66
- 'channel ' => $ channel ,
67
- ],
68
- ]);
58
+ $ channelMessages = $ this ->messages ->filter (function ($ event ) use ($ channel ) {
59
+ return isset ($ event ->channel ) && $ event ->channel == $ channel ;
60
+ });
61
+
62
+ $ events = Observable::create (function (ObserverInterface $ observer , SchedulerInterface $ scheduler ) use ($ channel , $ channelMessages ) {
69
63
70
- return $ this ->connection ->filter (function (array $ event ) use ($ channel ) {
71
- return isset ($ event ['channel ' ]) && $ event ['channel ' ] == $ channel ;
72
- })->filter (function (array $ event ) use ($ channel ) {
73
- return $ event ['event ' ] !== 'pusher_internal:subscription_succeeded ' ;
64
+ $ subscription = $ channelMessages
65
+ ->filter (function ($ msg ) {
66
+ return $ msg ->event !== 'pusher_internal:subscription_succeeded ' ;
67
+ })
68
+ ->subscribe ($ observer , $ scheduler );
69
+
70
+ $ this ->send (['event ' => 'pusher:subscribe ' , 'data ' => ['channel ' => $ channel ]]);
71
+
72
+ return new CallbackDisposable (function () use ($ channel , $ subscription ) {
73
+ $ this ->send (['event ' => 'pusher:unsubscribe ' , 'data ' => ['channel ' => $ channel ]]);
74
+ $ subscription ->dispose ();
75
+ });
74
76
});
77
+
78
+ return $ events ->share ();
75
79
}
76
80
77
81
public function send (array $ message )
78
82
{
79
- $ this ->client ->send (json_encode ($ message ));
83
+ $ this ->client
84
+ ->take (1 )
85
+ ->subscribeCallback (function (MessageSubject $ ms ) use ($ message ) {
86
+ $ ms ->send (json_encode ($ message ));
87
+ });
80
88
}
81
89
}
0 commit comments