File tree Expand file tree Collapse file tree 1 file changed +2
-1
lines changed Expand file tree Collapse file tree 1 file changed +2
-1
lines changed Original file line number Diff line number Diff line change 11
11
use Rx \Observable ;
12
12
use Rx \Operator \CutOperator ;
13
13
use Rx \React \Promise ;
14
+ use Rx \Scheduler \ImmediateScheduler ;
14
15
15
16
final class AsyncStreamingClient implements AsyncStreamingClientInterface
16
17
{
@@ -60,7 +61,7 @@ protected function stream(RequestInterface $request): Observable
60
61
return Promise::toObservable ($ this ->client ->handle (new StreamingRequestCommand (
61
62
$ request
62
63
)))->switchLatest ()->lift (function () {
63
- return new CutOperator (self ::STREAM_DELIMITER );
64
+ return new CutOperator (self ::STREAM_DELIMITER , new ImmediateScheduler () );
64
65
})->filter (function (string $ json ) {
65
66
return trim ($ json ) !== '' ; // To keep the stream alive Twitter sends an empty line at times
66
67
})->_ApiClients_jsonDecode ()->flatMap (function (array $ document ) {
You can’t perform that action at this time.
0 commit comments