Skip to content

Commit 6fb10f4

Browse files
committed
Further reworking of how connections, and specifically errors, are handled
1 parent 65fb1e5 commit 6fb10f4

File tree

1 file changed

+62
-31
lines changed

1 file changed

+62
-31
lines changed

src/AsyncClient.php

Lines changed: 62 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use React\Dns\Resolver\Resolver;
66
use React\EventLoop\LoopInterface;
7-
use React\EventLoop\Timer\TimerInterface;
87
use Rx\Disposable\CallbackDisposable;
98
use Rx\Observable;
109
use Rx\ObserverInterface;
@@ -17,8 +16,8 @@ final class AsyncClient
1716
{
1817
const NO_ACTIVITY_TIMEOUT = 120;
1918
const NO_PING_RESPONSE_TIMEOUT = 30;
20-
//const NO_ACTIVITY_TIMEOUT = 12;
21-
//const NO_PING_RESPONSE_TIMEOUT = 3;
19+
20+
protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT;
2221

2322
/**
2423
* @var LoopInterface
@@ -50,16 +49,6 @@ final class AsyncClient
5049
*/
5150
protected $delay = 200;
5251

53-
/**
54-
* @var TimerInterface
55-
*/
56-
private $noActivityTimer;
57-
58-
/**
59-
* @var TimerInterface
60-
*/
61-
private $pingIimeoutTimer;
62-
6352
/**
6453
* @param LoopInterface $loop
6554
* @param string $app Application ID
@@ -93,17 +82,20 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
9382
public function __construct(LoopInterface $loop, WebsocketClient $client)
9483
{
9584
$this->loop = $loop;
96-
$this->messages = $client->shareReplay(1)
85+
$this->messages = $client
9786
// Save this subject for sending stuff
9887
->do(function (MessageSubject $ms) {
99-
echo 'set snedSubject', PHP_EOL;
10088
$this->sendSubject = $ms;
89+
90+
// Resubscribe to an channels we where subscribed to when disconnected
91+
foreach ($this->channels as $channel => $_) {
92+
$this->subscribeOnChannel($channel);
93+
}
10194
})
10295

10396
// Make sure if there is a disconnect or something
10497
// that we unset the sendSubject
10598
->finally(function () {
106-
echo 'unset snedSubject', PHP_EOL;
10799
$this->sendSubject = null;
108100
})
109101

@@ -118,9 +110,8 @@ public function __construct(LoopInterface $loop, WebsocketClient $client)
118110
// if another value comes along, this all gets disposed (because we are using flatMapLatest)
119111
// before the timeouts start get triggered
120112
return Observable::never()
121-
->timeout(self::NO_ACTIVITY_TIMEOUT * 1000)
113+
->timeout($this->noActivityTimeout * 1000)
122114
->catch(function () use ($x) {
123-
echo 'send ping', PHP_EOL;
124115
// ping (do something that causes incoming stream to get a message)
125116
$this->send(['event' => 'pusher:ping']);
126117
// this timeout will actually timeout with a TimeoutException - causing
@@ -129,16 +120,28 @@ public function __construct(LoopInterface $loop, WebsocketClient $client)
129120
})
130121
->startWith($x);
131122
})
123+
124+
// Handle connection level errors
132125
->retryWhen(function (Observable $errors) {
133-
echo __LINE__, ': ', time(), PHP_EOL;
134126
return $errors->flatMap(function (Throwable $throwable) {
135127
return $this->handleLowLevelError($throwable);
136128
});
137129
})
130+
131+
// Decode JSON
138132
->_ApiClients_jsonDecode()
133+
134+
// Deal with connection established messages
139135
->map(function (array $message) {
140-
return Event::createFromMessage($message);
141-
});
136+
$event = Event::createFromMessage($message);
137+
138+
if ($event->getEvent() === 'pusher:connection_established') {
139+
$this->setActivityTimeout($event);
140+
}
141+
142+
return $event;
143+
})
144+
->share();
142145
}
143146

144147
/**
@@ -169,7 +172,7 @@ public function channel(string $channel): Observable
169172
})
170173
->subscribe($observer);
171174

172-
$this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
175+
$this->subscribeOnChannel($channel);
173176

174177
return new CallbackDisposable(function () use ($channel, $subscription) {
175178
$this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
@@ -186,26 +189,54 @@ public function channel(string $channel): Observable
186189
* Send a message through the client
187190
*
188191
* @param array $message Message to send, will be json encoded
192+
*
193+
* @return A bool indicating whether or not the connection was active
194+
* and the given message has been pass onto the connection.
189195
*/
190-
public function send(array $message)
196+
public function send(array $message): bool
191197
{
192198
if ($this->sendSubject === null) {
193-
echo 'send subject is null when trying to send', PHP_EOL;
194-
return;
199+
return false;
195200
}
196201

197-
echo __LINE__, ' Sending JSON: ', json_encode($message), PHP_EOL;
198202
$this->sendSubject->onNext(json_encode($message));
203+
return true;
199204
}
200205

201206
private function handleLowLevelError(Throwable $throwable)
202207
{
203208
$this->delay *= 2;
204-
echo get_class($throwable), PHP_EOL;
205-
/*echo get_class($throwable->getPrevious()), PHP_EOL;
206-
echo get_class($throwable->getPrevious()->getPrevious()), PHP_EOL;
207-
echo get_class($throwable->getPrevious()->getPrevious()->getPrevious()), PHP_EOL;*/
208-
echo __LINE__, ': ', time(), PHP_EOL;
209209
return Observable::timer($this->delay);
210210
}
211+
212+
/**
213+
* @param string $channel
214+
*/
215+
private function subscribeOnChannel(string $channel)
216+
{
217+
$this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
218+
}
219+
220+
221+
/**
222+
* Get connection activity timeout from connection established event
223+
*
224+
* @param Event $event
225+
*/
226+
private function setActivityTimeout(Event $event)
227+
{
228+
$data = $event->getData();
229+
230+
// No activity_timeout found on event
231+
if (!isset($data['activity_timeout'])) {
232+
return;
233+
}
234+
235+
// activity_timeout holds zero or invalid value (we don't want to hammer Pusher)
236+
if ((int)$data['activity_timeout'] <= 0) {
237+
return;
238+
}
239+
240+
$this->noActivityTimeout = (int)$data['activity_timeout'];
241+
}
211242
}

0 commit comments

Comments
 (0)