@@ -19,11 +19,6 @@ final class AsyncClient
19
19
20
20
protected $ noActivityTimeout = self ::NO_ACTIVITY_TIMEOUT ;
21
21
22
- /**
23
- * @var LoopInterface
24
- */
25
- protected $ loop ;
26
-
27
22
/**
28
23
* @var Observable\RefCountObservable
29
24
*/
@@ -49,39 +44,11 @@ final class AsyncClient
49
44
*/
50
45
protected $ delay = 200 ;
51
46
52
- /**
53
- * @param LoopInterface $loop
54
- * @param string $app Application ID
55
- * @param Resolver $resolver Optional DNS resolver
56
- * @return AsyncClient
57
- */
58
- public static function create (LoopInterface $ loop , string $ app , Resolver $ resolver = null ): AsyncClient
59
- {
60
- try {
61
- Scheduler::setAsyncFactory (function () use ($ loop ) {
62
- return new Scheduler \EventLoopScheduler ($ loop );
63
- });
64
- } catch (Throwable $ t ) {
65
- }
66
-
67
- return new self (
68
- $ loop ,
69
- new WebsocketClient (
70
- ApiSettings::createUrl ($ app ),
71
- false ,
72
- [],
73
- $ loop ,
74
- $ resolver
75
- )
76
- );
77
- }
78
-
79
47
/**
80
48
* @internal
81
49
*/
82
- public function __construct (LoopInterface $ loop , WebsocketClient $ client )
50
+ public function __construct (Observable $ client )
83
51
{
84
- $ this ->loop = $ loop ;
85
52
$ this ->messages = $ client
86
53
// Save this subject for sending stuff
87
54
->do (function (MessageSubject $ ms ) {
@@ -144,9 +111,35 @@ public function __construct(LoopInterface $loop, WebsocketClient $client)
144
111
}
145
112
146
113
/**
147
- * Listen on a channel
114
+ * @param LoopInterface $loop
115
+ * @param string $app Application ID
116
+ * @param Resolver $resolver Optional DNS resolver
117
+ * @return AsyncClient
118
+ */
119
+ public static function create (LoopInterface $ loop , string $ app , Resolver $ resolver = null ): AsyncClient
120
+ {
121
+ try {
122
+ Scheduler::setAsyncFactory (function () use ($ loop ) {
123
+ return new Scheduler \EventLoopScheduler ($ loop );
124
+ });
125
+ } catch (Throwable $ t ) {
126
+ }
127
+
128
+ return new self (
129
+ new WebsocketClient (
130
+ ApiSettings::createUrl ($ app ),
131
+ false ,
132
+ [],
133
+ $ loop ,
134
+ $ resolver
135
+ )
136
+ );
137
+ }
138
+
139
+ /**
140
+ * Listen on a channel.
148
141
*
149
- * @param string $channel Channel to listen on
142
+ * @param string $channel Channel to listen on
150
143
* @return Observable
151
144
*/
152
145
public function channel (string $ channel ): Observable
@@ -186,16 +179,17 @@ public function channel(string $channel): Observable
186
179
187
180
// Share stream amount subscribers to this channel
188
181
$ this ->channels [$ channel ] = $ events ->share ();
182
+
189
183
return $ this ->channels [$ channel ];
190
184
}
191
185
192
186
/**
193
- * Send a message through the client
187
+ * Send a message through the client.
194
188
*
195
189
* @param array $message Message to send, will be json encoded
196
190
*
197
191
* @return A bool indicating whether or not the connection was active
198
- * and the given message has been pass onto the connection.
192
+ * and the given message has been pass onto the connection.
199
193
*/
200
194
public function send (array $ message ): bool
201
195
{
@@ -205,12 +199,14 @@ public function send(array $message): bool
205
199
}
206
200
207
201
$ this ->sendSubject ->onNext (json_encode ($ message ));
202
+
208
203
return true ;
209
204
}
210
205
211
206
private function handleLowLevelError (Throwable $ throwable )
212
207
{
213
208
$ this ->delay *= 2 ;
209
+
214
210
return Observable::timer ($ this ->delay );
215
211
}
216
212
@@ -222,9 +218,8 @@ private function subscribeOnChannel(string $channel)
222
218
$ this ->send (['event ' => 'pusher:subscribe ' , 'data ' => ['channel ' => $ channel ]]);
223
219
}
224
220
225
-
226
221
/**
227
- * Get connection activity timeout from connection established event
222
+ * Get connection activity timeout from connection established event.
228
223
*
229
224
* @param Event $event
230
225
*/
0 commit comments