Skip to content

Commit 8659ad3

Browse files
committed
WIP Dead connection detection
1 parent de085a6 commit 8659ad3

File tree

1 file changed

+60
-4
lines changed

1 file changed

+60
-4
lines changed

src/AsyncClient.php

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use React\Dns\Resolver\Resolver;
66
use React\EventLoop\LoopInterface;
7+
use React\EventLoop\Timer\TimerInterface;
78
use Rx\Disposable\CallbackDisposable;
89
use Rx\Observable;
910
use Rx\ObserverInterface;
@@ -14,6 +15,19 @@
1415

1516
final class AsyncClient
1617
{
18+
const NO_ACTIVITY_TIMEOUT = 120;
19+
const NO_PING_RESPONSE_TIMEOUT = 30;
20+
21+
/**
22+
* @var LoopInterface
23+
*/
24+
protected $loop;
25+
26+
/**
27+
* @var WebsocketClient
28+
*/
29+
protected $lowLevelClient;
30+
1731
/**
1832
* @var Observable\RefCountObservable
1933
*/
@@ -34,6 +48,16 @@ final class AsyncClient
3448
*/
3549
protected $delay = 200;
3650

51+
/**
52+
* @var TimerInterface
53+
*/
54+
private $noActivityTimer;
55+
56+
/**
57+
* @var TimerInterface
58+
*/
59+
private $pingIimeoutTimer;
60+
3761
/**
3862
* @param LoopInterface $loop
3963
* @param string $app Application ID
@@ -50,6 +74,7 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
5074
}
5175

5276
return new self(
77+
$loop,
5378
new WebsocketClient(
5479
ApiSettings::createUrl($app),
5580
false,
@@ -63,22 +88,25 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
6388
/**
6489
* @internal
6590
*/
66-
public function __construct(WebsocketClient $client)
91+
public function __construct(LoopInterface $loop, WebsocketClient $client)
6792
{
93+
$this->loop = $loop;
6894
//Only create one connection and share the most recent among all subscriber
69-
$this->client = $client->retryWhen(function (Observable $errors) {
95+
$this->lowLevelClient = $client;
96+
$this->client = $this->lowLevelClient->retryWhen(function (Observable $errors) {
97+
echo __LINE__, ': ', time(), PHP_EOL;
98+
$this->resetActivityTimer();
7099
return $errors->flatMap(function (Throwable $throwable) {
71100
return $this->handleLowLevelError($throwable);
72101
});
73102
})->shareReplay(1);
74103
$this->messages = $this->client
75104
->flatMap(function (MessageSubject $ms) {
76-
//var_export($ms);
77105
return $ms;
78106
})
79107
->_ApiClients_jsonDecode()
80108
->map(function (array $message) {
81-
//var_export($message);
109+
$this->resetActivityTimer();
82110
return Event::createFromMessage($message);
83111
});
84112
}
@@ -134,14 +162,42 @@ public function send(array $message)
134162
$this->client
135163
->take(1)
136164
->subscribe(function (MessageSubject $ms) use ($message) {
165+
$this->resetActivityTimer();
137166
$ms->send(json_encode($message));
138167
});
139168
}
140169

141170
private function handleLowLevelError(Throwable $throwable)
142171
{
172+
$this->resetActivityTimer();
143173
$this->delay *= 2;
174+
echo get_class($throwable), PHP_EOL;
175+
echo get_class($throwable->getPrevious()), PHP_EOL;
176+
echo get_class($throwable->getPrevious()->getPrevious()), PHP_EOL;
177+
echo get_class($throwable->getPrevious()->getPrevious()->getPrevious()), PHP_EOL;
144178
echo __LINE__, ': ', time(), PHP_EOL;
145179
return Observable::timer($this->delay);
146180
}
181+
182+
private function resetActivityTimer()
183+
{
184+
echo 'resetActivityTimer', PHP_EOL;
185+
if ($this->noActivityTimer instanceof TimerInterface) {
186+
$this->noActivityTimer->cancel();
187+
}
188+
189+
$this->noActivityTimer = $this->loop->addTimer(
190+
self::NO_ACTIVITY_TIMEOUT,
191+
function () {
192+
echo 'resetActivityTimer:tick', PHP_EOL;
193+
$this->send(['event' => 'pusher:ping']);
194+
$this->pingIimeoutTimer = $this->loop->addTimer(
195+
self::NO_PING_RESPONSE_TIMEOUT,
196+
function () {
197+
$this->lowLevelClient->dispose();
198+
}
199+
);
200+
}
201+
);
202+
}
147203
}

0 commit comments

Comments
 (0)