Skip to content

Commit 6d7c122

Browse files
authored
Client ensures subscribe command is confirmed. (rails#41581)
A SubscriptionGuarantor maintains a set of pending subscriptions, resending the subscribe command unless and until the subscription is confirmed or rejected by the server or cancelled client-side. A race condition in the ActionCable server - where an unsubscribe is sent, followed rapidly by a subscribe, but handled in the reverse order - necessitates this enhancement. Indeed, the subscriptions created and torn down by Turbo Streams amplifies the existence of this race condition.
1 parent c04cf69 commit 6d7c122

File tree

10 files changed

+270
-9
lines changed

10 files changed

+270
-9
lines changed

actioncable/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
* The Action Cable client now ensures successful channel subscriptions:
2+
3+
* The client maintains a set of pending subscriptions until either
4+
the server confirms the subscription or the channel is torn down.
5+
* Rectifies the race condition where an unsubscribe is rapidly followed
6+
by a subscribe (on the same channel identifier) and the requests are
7+
handled out of order by the ActionCable server, thereby ignoring the
8+
subscribe command.
9+
10+
*Daniel Spinosa*
11+
12+
113
## Rails 7.0.0.alpha2 (September 15, 2021) ##
214

315
* No changes.

actioncable/app/assets/javascripts/action_cable.js

Lines changed: 51 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

actioncable/app/assets/javascripts/actioncable.esm.js

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ Connection.prototype.events = {
254254
return this.monitor.recordPing();
255255

256256
case message_types.confirmation:
257+
this.subscriptions.confirmSubscription(identifier);
257258
return this.subscriptions.notify(identifier, "connected");
258259

259260
case message_types.rejection:
@@ -321,9 +322,47 @@ class Subscription {
321322
}
322323
}
323324

325+
class SubscriptionGuarantor {
326+
constructor(subscriptions) {
327+
this.subscriptions = subscriptions;
328+
this.pendingSubscriptions = [];
329+
}
330+
guarantee(subscription) {
331+
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
332+
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
333+
this.pendingSubscriptions.push(subscription);
334+
} else {
335+
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
336+
}
337+
this.startGuaranteeing();
338+
}
339+
forget(subscription) {
340+
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
341+
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
342+
}
343+
startGuaranteeing() {
344+
this.stopGuaranteeing();
345+
this.retrySubscribing();
346+
}
347+
stopGuaranteeing() {
348+
clearTimeout(this.retryTimeout);
349+
}
350+
retrySubscribing() {
351+
this.retryTimeout = setTimeout((() => {
352+
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
353+
this.pendingSubscriptions.map((subscription => {
354+
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
355+
this.subscriptions.subscribe(subscription);
356+
}));
357+
}
358+
}), 500);
359+
}
360+
}
361+
324362
class Subscriptions {
325363
constructor(consumer) {
326364
this.consumer = consumer;
365+
this.guarantor = new SubscriptionGuarantor(this);
327366
this.subscriptions = [];
328367
}
329368
create(channelName, mixin) {
@@ -338,7 +377,7 @@ class Subscriptions {
338377
this.subscriptions.push(subscription);
339378
this.consumer.ensureActiveConnection();
340379
this.notify(subscription, "initialized");
341-
this.sendCommand(subscription, "subscribe");
380+
this.subscribe(subscription);
342381
return subscription;
343382
}
344383
remove(subscription) {
@@ -356,14 +395,15 @@ class Subscriptions {
356395
}));
357396
}
358397
forget(subscription) {
398+
this.guarantor.forget(subscription);
359399
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
360400
return subscription;
361401
}
362402
findAll(identifier) {
363403
return this.subscriptions.filter((s => s.identifier === identifier));
364404
}
365405
reload() {
366-
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
406+
return this.subscriptions.map((subscription => this.subscribe(subscription)));
367407
}
368408
notifyAll(callbackName, ...args) {
369409
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
@@ -377,6 +417,15 @@ class Subscriptions {
377417
}
378418
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
379419
}
420+
subscribe(subscription) {
421+
if (this.sendCommand(subscription, "subscribe")) {
422+
this.guarantor.guarantee(subscription);
423+
}
424+
}
425+
confirmSubscription(identifier) {
426+
logger.log(`Subscription confirmed ${identifier}`);
427+
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
428+
}
380429
sendCommand(subscription, command) {
381430
const {identifier: identifier} = subscription;
382431
return this.consumer.send({
@@ -439,4 +488,4 @@ function getConfig(name) {
439488
}
440489
}
441490

442-
export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger };
491+
export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, SubscriptionGuarantor, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger };

actioncable/app/assets/javascripts/actioncable.js

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@
246246
return this.monitor.recordPing();
247247

248248
case message_types.confirmation:
249+
this.subscriptions.confirmSubscription(identifier);
249250
return this.subscriptions.notify(identifier, "connected");
250251

251252
case message_types.rejection:
@@ -310,9 +311,46 @@
310311
return this.consumer.subscriptions.remove(this);
311312
}
312313
}
314+
class SubscriptionGuarantor {
315+
constructor(subscriptions) {
316+
this.subscriptions = subscriptions;
317+
this.pendingSubscriptions = [];
318+
}
319+
guarantee(subscription) {
320+
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
321+
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
322+
this.pendingSubscriptions.push(subscription);
323+
} else {
324+
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
325+
}
326+
this.startGuaranteeing();
327+
}
328+
forget(subscription) {
329+
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
330+
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
331+
}
332+
startGuaranteeing() {
333+
this.stopGuaranteeing();
334+
this.retrySubscribing();
335+
}
336+
stopGuaranteeing() {
337+
clearTimeout(this.retryTimeout);
338+
}
339+
retrySubscribing() {
340+
this.retryTimeout = setTimeout((() => {
341+
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
342+
this.pendingSubscriptions.map((subscription => {
343+
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
344+
this.subscriptions.subscribe(subscription);
345+
}));
346+
}
347+
}), 500);
348+
}
349+
}
313350
class Subscriptions {
314351
constructor(consumer) {
315352
this.consumer = consumer;
353+
this.guarantor = new SubscriptionGuarantor(this);
316354
this.subscriptions = [];
317355
}
318356
create(channelName, mixin) {
@@ -327,7 +365,7 @@
327365
this.subscriptions.push(subscription);
328366
this.consumer.ensureActiveConnection();
329367
this.notify(subscription, "initialized");
330-
this.sendCommand(subscription, "subscribe");
368+
this.subscribe(subscription);
331369
return subscription;
332370
}
333371
remove(subscription) {
@@ -345,14 +383,15 @@
345383
}));
346384
}
347385
forget(subscription) {
386+
this.guarantor.forget(subscription);
348387
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
349388
return subscription;
350389
}
351390
findAll(identifier) {
352391
return this.subscriptions.filter((s => s.identifier === identifier));
353392
}
354393
reload() {
355-
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
394+
return this.subscriptions.map((subscription => this.subscribe(subscription)));
356395
}
357396
notifyAll(callbackName, ...args) {
358397
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
@@ -366,6 +405,15 @@
366405
}
367406
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
368407
}
408+
subscribe(subscription) {
409+
if (this.sendCommand(subscription, "subscribe")) {
410+
this.guarantor.guarantee(subscription);
411+
}
412+
}
413+
confirmSubscription(identifier) {
414+
logger.log(`Subscription confirmed ${identifier}`);
415+
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
416+
}
369417
sendCommand(subscription, command) {
370418
const {identifier: identifier} = subscription;
371419
return this.consumer.send({
@@ -428,6 +476,7 @@
428476
exports.Consumer = Consumer;
429477
exports.INTERNAL = INTERNAL;
430478
exports.Subscription = Subscription;
479+
exports.SubscriptionGuarantor = SubscriptionGuarantor;
431480
exports.Subscriptions = Subscriptions;
432481
exports.adapters = adapters;
433482
exports.createConsumer = createConsumer;

actioncable/app/javascript/action_cable/connection.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ Connection.prototype.events = {
132132
case message_types.ping:
133133
return this.monitor.recordPing()
134134
case message_types.confirmation:
135+
this.subscriptions.confirmSubscription(identifier)
135136
return this.subscriptions.notify(identifier, "connected")
136137
case message_types.rejection:
137138
return this.subscriptions.reject(identifier)

actioncable/app/javascript/action_cable/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import Consumer, { createWebSocketURL } from "./consumer"
44
import INTERNAL from "./internal"
55
import Subscription from "./subscription"
66
import Subscriptions from "./subscriptions"
7+
import SubscriptionGuarantor from "./subscription_guarantor"
78
import adapters from "./adapters"
89
import logger from "./logger"
910

@@ -14,6 +15,7 @@ export {
1415
INTERNAL,
1516
Subscription,
1617
Subscriptions,
18+
SubscriptionGuarantor,
1719
adapters,
1820
createWebSocketURL,
1921
logger,

0 commit comments

Comments
 (0)