Skip to content

Commit eea1d0a

Browse files
committed
Added back support for subscribing multiple times to the same channel
1 parent 48bfc16 commit eea1d0a

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

src/AsyncClient.php

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public function channel(string $channel): Observable
128128
->flatMapTo(Observable::empty());
129129

130130
// Observable representing channel events
131-
$events = $channelMessages
131+
$this->channels[$channel] = $channelMessages
132132
->merge($subscribe)
133133
->filter([Event::class, 'subscriptionSucceeded'])
134134
->finally(function () use ($channel) {
@@ -137,9 +137,8 @@ public function channel(string $channel): Observable
137137

138138
// Remove our channel from the channel list so we don't resubscribe in case we reconnect
139139
unset($this->channels[$channel]);
140-
});
141-
142-
$this->channels[$channel] = $events;
140+
})
141+
->singleInstance();
143142

144143
return $this->channels[$channel];
145144
}

tests/AsyncClientTest.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,56 @@ public function testPusherData()
183183
], $webSocket->getSentMessages());
184184
}
185185

186+
public function testPusherDataSameChannel()
187+
{
188+
$observable = $this->createColdObservable([
189+
onNext(320, '{"event":"pusher:connection_established","data":"{\"socket_id\":\"218656.9503498\",\"activity_timeout\":120}"}'),
190+
onNext(340, '{"event":"pusher_internal:subscription_succeeded","data":"{}","channel":"test"}'),
191+
onNext(350, '{"event":"new-listing","data":["test1"],"channel":"test"}'),
192+
onNext(370, '{"event":"new-listing","data":["test10"],"channel":"other"}'),
193+
onNext(390, '{"event":"new-listing","data":["test2"],"channel":"test"}'),
194+
onNext(400, '{"event":"new-listing","data":["test3"],"channel":"test"}'),
195+
onCompleted(900),
196+
]);
197+
198+
$webSocket = new TestWebSocketSubject($observable, $this->scheduler);
199+
200+
$client = new AsyncClient($webSocket);
201+
202+
$results1 = $this->scheduler->createObserver();
203+
$results2 = $this->scheduler->createObserver();
204+
205+
$this->scheduler->scheduleAbsolute($this->scheduler::CREATED, function () use ($client, $results1) {
206+
$client->channel('test')->subscribe($results1);
207+
});
208+
209+
$this->scheduler->scheduleAbsolute(460, function () use ($client, $results2) {
210+
$client->channel('test')->subscribe($results2);
211+
});
212+
213+
$this->scheduler->start();
214+
215+
$this->assertMessages([
216+
onNext(450, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test1"],"channel":"test"}', true))),
217+
onNext(490, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test2"],"channel":"test"}', true))),
218+
onNext(500, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test3"],"channel":"test"}', true))),
219+
onCompleted(1002),
220+
], $results1->getMessages());
221+
222+
$this->assertMessages([
223+
onNext(490, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test2"],"channel":"test"}', true))),
224+
onNext(500, Event::createFromMessage(json_decode('{"event":"new-listing","data":["test3"],"channel":"test"}', true))),
225+
onCompleted(1002),
226+
], $results2->getMessages());
227+
228+
$this->assertSubscriptions([subscribe(100, 1000)], $observable->getSubscriptions());
229+
230+
$this->assertEquals([
231+
[420, '{"event":"pusher:subscribe","data":{"channel":"test"}}'],
232+
[1002, '{"event":"pusher:unsubscribe","data":{"channel":"test"}}'],
233+
], $webSocket->getSentMessages());
234+
}
235+
186236
public function testPusherPing()
187237
{
188238
$observable = $this->createHotObservable([

0 commit comments

Comments
 (0)