Skip to content

Commit b5758bb

Browse files
committed
Allows publish after unsubscribe
1 parent f1201b1 commit b5758bb

File tree

3 files changed

+28
-4
lines changed

3 files changed

+28
-4
lines changed

packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
5353
wsProtocolName: WS_PROTOCOL_NAME,
5454
connectUri: CONNECT_URI,
5555
});
56+
this.allowNoSubscriptions = true;
5657
}
5758

5859
getProviderName() {
@@ -77,6 +78,10 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
7778
return super.publish(options, customUserAgentDetails);
7879
}
7980

81+
public closeIfNoActiveChannel() {
82+
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
83+
}
84+
8085
protected async _prepareSubscriptionPayload({
8186
options,
8287
subscriptionId,

packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ interface AWSWebSocketProviderArgs {
8080
export abstract class AWSWebSocketProvider {
8181
protected logger: ConsoleLogger;
8282
protected subscriptionObserverMap = new Map<string, ObserverQuery>();
83+
protected allowNoSubscriptions = false;
8384

8485
protected awsRealTimeSocket?: WebSocket;
8586
private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED;
@@ -543,10 +544,12 @@ export abstract class AWSWebSocketProvider {
543544
this.subscriptionObserverMap.delete(subscriptionId);
544545

545546
// Verifying 1000ms after removing subscription in case there are new subscription unmount/mount
546-
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
547+
if (!this.allowNoSubscriptions) {
548+
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
549+
}
547550
}
548551

549-
private _closeSocketIfRequired() {
552+
protected _closeSocketIfRequired() {
550553
if (this.subscriptionObserverMap.size > 0) {
551554
// Active subscriptions on the WebSocket
552555
return;

packages/api-graphql/src/internals/events/index.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import { Subscription } from 'rxjs';
55
import { Amplify } from '@aws-amplify/core';
6-
import { DocumentType } from '@aws-amplify/core/internals/utils';
6+
import { DocumentType, amplifyUuid } from '@aws-amplify/core/internals/utils';
77

88
import { AppSyncEventProvider as eventProvider } from '../../Providers/AWSAppSyncEventsProvider';
99

@@ -17,6 +17,9 @@ import type {
1717
SubscriptionObserver,
1818
} from './types';
1919

20+
// Keeps a list of open channels in the websocket
21+
const openChannels = new Set<string>();
22+
2023
/**
2124
* @experimental API may change in future versions
2225
*
@@ -50,12 +53,18 @@ async function connect(
5053

5154
await eventProvider.connect(providerOptions);
5255

56+
const channelId = amplifyUuid();
57+
openChannels.add(channelId);
58+
5359
let _subscription: Subscription;
5460

5561
const sub = (
5662
observer: SubscriptionObserver<any>,
5763
subOptions?: EventsOptions,
5864
): Subscription => {
65+
if (!openChannels.has(channelId)) {
66+
throw new Error('Channel is closed');
67+
}
5968
const subscribeOptions = { ...providerOptions, query: channel };
6069
subscribeOptions.authenticationType = normalizeAuth(
6170
subOptions?.authMode,
@@ -73,6 +82,9 @@ async function connect(
7382
event: DocumentType,
7483
pubOptions?: EventsOptions,
7584
): Promise<any> => {
85+
if (!openChannels.has(channelId)) {
86+
throw new Error('Channel is closed');
87+
}
7688
const publishOptions = {
7789
...providerOptions,
7890
query: channel,
@@ -86,8 +98,12 @@ async function connect(
8698
return eventProvider.publish(publishOptions);
8799
};
88100

89-
const close = () => {
101+
const close = async () => {
90102
_subscription && _subscription.unsubscribe();
103+
openChannels.delete(channelId);
104+
if (openChannels.size === 0) {
105+
eventProvider.closeIfNoActiveChannel();
106+
}
91107
};
92108

93109
return {

0 commit comments

Comments
 (0)