@@ -29,6 +29,11 @@ final class AsyncClient
29
29
*/
30
30
protected $ channels = [];
31
31
32
+ /**
33
+ * @var int
34
+ */
35
+ protected $ delay = 200 ;
36
+
32
37
/**
33
38
* @param LoopInterface $loop
34
39
* @param string $app Application ID
@@ -61,8 +66,10 @@ public static function create(LoopInterface $loop, string $app, Resolver $resolv
61
66
public function __construct (WebsocketClient $ client )
62
67
{
63
68
//Only create one connection and share the most recent among all subscriber
64
- $ this ->client = $ client ->retryWhen (function (Observable $ errors ) {
65
- return $ this ->handleLowLevelError ($ errors );
69
+ $ this ->client = $ client ->retryWhen (function (Observable $ errors ) {
70
+ return $ errors ->flatMap (function (Throwable $ throwable ) {
71
+ return $ this ->handleLowLevelError ($ throwable );
72
+ });
66
73
})->shareReplay (1 );
67
74
$ this ->messages = $ this ->client
68
75
->flatMap (function (MessageSubject $ ms ) {
@@ -131,14 +138,10 @@ public function send(array $message)
131
138
});
132
139
}
133
140
134
- private function handleLowLevelError (Observable $ errors )
141
+ private function handleLowLevelError (Throwable $ throwable )
135
142
{
136
- $ stream = $ errors ->subscribe (
137
- function (Throwable $ throwable ) use (&$ stream ) {
138
- echo (string )$ throwable , PHP_EOL ;
139
- }
140
- );
143
+ $ this ->delay *= 2 ;
141
144
echo __LINE__ , ': ' , time (), PHP_EOL ;
142
- return $ errors ->delay ( 200 );
145
+ return Observable:: timer ( $ this ->delay );
143
146
}
144
147
}
0 commit comments