Skip to content

Commit 816f050

Browse files
committed
Working dead connection detection, thanks to @mbonneau via https://gist.github.com/mbonneau/e470e87c4139733cd132931aea74d0cc
1 parent 8659ad3 commit 816f050

File tree

1 file changed

+55
-47
lines changed

1 file changed

+55
-47
lines changed

src/AsyncClient.php

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@ final class AsyncClient
1717
{
1818
const NO_ACTIVITY_TIMEOUT = 120;
1919
const NO_PING_RESPONSE_TIMEOUT = 30;
20+
//const NO_ACTIVITY_TIMEOUT = 12;
21+
//const NO_PING_RESPONSE_TIMEOUT = 3;
2022

2123
/**
2224
* @var LoopInterface
2325
*/
2426
protected $loop;
2527

26-
/**
27-
* @var WebsocketClient
28-
*/
29-
protected $lowLevelClient;
30-
3128
/**
3229
* @var Observable\RefCountObservable
3330
*/
@@ -38,6 +35,11 @@ final class AsyncClient
3835
*/
3936
protected $messages;
4037

38+
/**
39+
* @var MessageSubject
40+
*/
41+
protected $sendSubject;
42+
4143
/**
4244
* @var array
4345
*/
@@ -91,22 +93,50 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
9193
public function __construct(LoopInterface $loop, WebsocketClient $client)
9294
{
9395
$this->loop = $loop;
94-
//Only create one connection and share the most recent among all subscriber
95-
$this->lowLevelClient = $client;
96-
$this->client = $this->lowLevelClient->retryWhen(function (Observable $errors) {
97-
echo __LINE__, ': ', time(), PHP_EOL;
98-
$this->resetActivityTimer();
99-
return $errors->flatMap(function (Throwable $throwable) {
100-
return $this->handleLowLevelError($throwable);
101-
});
102-
})->shareReplay(1);
103-
$this->messages = $this->client
96+
$this->messages = $client->shareReplay(1)
97+
// Save this subject for sending stuff
98+
->do(function (MessageSubject $ms) {
99+
echo 'set snedSubject', PHP_EOL;
100+
$this->sendSubject = $ms;
101+
})
102+
103+
// Make sure if there is a disconnect or something
104+
// that we unset the sendSubject
105+
->finally(function () {
106+
echo 'unset snedSubject', PHP_EOL;
107+
$this->sendSubject = null;
108+
})
109+
110+
104111
->flatMap(function (MessageSubject $ms) {
105112
return $ms;
106113
})
114+
115+
// This is the ping/timeout functionality
116+
->flatMapLatest(function ($x) {
117+
// this Observable emits the current value immediately
118+
// if another value comes along, this all gets disposed (because we are using flatMapLatest)
119+
// before the timeouts start get triggered
120+
return Observable::never()
121+
->timeout(self::NO_ACTIVITY_TIMEOUT * 1000)
122+
->catch(function () use ($x) {
123+
echo 'send ping', PHP_EOL;
124+
// ping (do something that causes incoming stream to get a message)
125+
$this->send(['event' => 'pusher:ping']);
126+
// this timeout will actually timeout with a TimeoutException - causing
127+
// everything above this to dispose
128+
return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000);
129+
})
130+
->startWith($x);
131+
})
132+
->retryWhen(function (Observable $errors) {
133+
echo __LINE__, ': ', time(), PHP_EOL;
134+
return $errors->flatMap(function (Throwable $throwable) {
135+
return $this->handleLowLevelError($throwable);
136+
});
137+
})
107138
->_ApiClients_jsonDecode()
108139
->map(function (array $message) {
109-
$this->resetActivityTimer();
110140
return Event::createFromMessage($message);
111141
});
112142
}
@@ -159,45 +189,23 @@ public function channel(string $channel): Observable
159189
*/
160190
public function send(array $message)
161191
{
162-
$this->client
163-
->take(1)
164-
->subscribe(function (MessageSubject $ms) use ($message) {
165-
$this->resetActivityTimer();
166-
$ms->send(json_encode($message));
167-
});
192+
if ($this->sendSubject === null) {
193+
echo 'send subject is null when trying to send', PHP_EOL;
194+
return;
195+
}
196+
197+
echo __LINE__, ' Sending JSON: ', json_encode($message), PHP_EOL;
198+
$this->sendSubject->onNext(json_encode($message));
168199
}
169200

170201
private function handleLowLevelError(Throwable $throwable)
171202
{
172-
$this->resetActivityTimer();
173203
$this->delay *= 2;
174204
echo get_class($throwable), PHP_EOL;
175-
echo get_class($throwable->getPrevious()), PHP_EOL;
205+
/*echo get_class($throwable->getPrevious()), PHP_EOL;
176206
echo get_class($throwable->getPrevious()->getPrevious()), PHP_EOL;
177-
echo get_class($throwable->getPrevious()->getPrevious()->getPrevious()), PHP_EOL;
207+
echo get_class($throwable->getPrevious()->getPrevious()->getPrevious()), PHP_EOL;*/
178208
echo __LINE__, ': ', time(), PHP_EOL;
179209
return Observable::timer($this->delay);
180210
}
181-
182-
private function resetActivityTimer()
183-
{
184-
echo 'resetActivityTimer', PHP_EOL;
185-
if ($this->noActivityTimer instanceof TimerInterface) {
186-
$this->noActivityTimer->cancel();
187-
}
188-
189-
$this->noActivityTimer = $this->loop->addTimer(
190-
self::NO_ACTIVITY_TIMEOUT,
191-
function () {
192-
echo 'resetActivityTimer:tick', PHP_EOL;
193-
$this->send(['event' => 'pusher:ping']);
194-
$this->pingIimeoutTimer = $this->loop->addTimer(
195-
self::NO_PING_RESPONSE_TIMEOUT,
196-
function () {
197-
$this->lowLevelClient->dispose();
198-
}
199-
);
200-
}
201-
);
202-
}
203211
}

0 commit comments

Comments
 (0)