Skip to content

Commit e10bf3b

Browse files
authored
feat(data): add publish events over WebSocket (#14242)
2 parents 9551cad + 499bd27 commit e10bf3b

File tree

7 files changed

+400
-34
lines changed

7 files changed

+400
-34
lines changed

packages/api-graphql/__tests__/AWSAppSyncEventProvider.test.ts

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,56 @@ import { ConnectionState as CS } from '../src/types/PubSub';
99

1010
import { AWSAppSyncEventProvider } from '../src/Providers/AWSAppSyncEventsProvider';
1111

12+
// Mock all calls to signRequest
13+
jest.mock('@aws-amplify/core/internals/aws-client-utils', () => {
14+
const original = jest.requireActual(
15+
'@aws-amplify/core/internals/aws-client-utils',
16+
);
17+
return {
18+
...original,
19+
signRequest: (_request: any, _options: any) => {
20+
return {
21+
method: 'test',
22+
headers: { test: 'test' },
23+
url: new URL('http://example/'),
24+
};
25+
},
26+
};
27+
});
28+
29+
// Mock all calls to signRequest
30+
jest.mock('@aws-amplify/core', () => {
31+
const original = jest.requireActual('@aws-amplify/core');
32+
const session = {
33+
tokens: {
34+
accessToken: {
35+
toString: () => 'test',
36+
},
37+
},
38+
credentials: {
39+
accessKeyId: 'test',
40+
secretAccessKey: 'test',
41+
},
42+
};
43+
return {
44+
...original,
45+
fetchAuthSession: (_request: any, _options: any) => {
46+
return Promise.resolve(session);
47+
},
48+
Amplify: {
49+
Auth: {
50+
fetchAuthSession: async () => session,
51+
},
52+
},
53+
browserOrNode() {
54+
return {
55+
isBrowser: true,
56+
isNode: false,
57+
};
58+
},
59+
};
60+
});
61+
1262
describe('AppSyncEventProvider', () => {
1363
describe('subscribe()', () => {
1464
describe('returned observer', () => {
@@ -245,4 +295,210 @@ describe('AppSyncEventProvider', () => {
245295
});
246296
});
247297
});
298+
describe('publish', () => {
299+
let fakeWebSocketInterface: FakeWebSocketInterface;
300+
const loggerSpy: jest.SpyInstance = jest.spyOn(
301+
ConsoleLogger.prototype,
302+
'_log',
303+
);
304+
let provider: AWSAppSyncEventProvider;
305+
let reachabilityObserver: Observer<{ online: boolean }>;
306+
307+
beforeEach(async () => {
308+
// Set the network to "online" for these tests
309+
jest
310+
.spyOn(Reachability.prototype, 'networkMonitor')
311+
.mockImplementationOnce(() => {
312+
return new Observable(observer => {
313+
reachabilityObserver = observer;
314+
});
315+
})
316+
// Twice because we subscribe to get the initial state then again to monitor reachability
317+
.mockImplementationOnce(() => {
318+
return new Observable(observer => {
319+
reachabilityObserver = observer;
320+
});
321+
});
322+
323+
fakeWebSocketInterface = new FakeWebSocketInterface();
324+
provider = new AWSAppSyncEventProvider();
325+
Object.defineProperty(provider, 'socketStatus', {
326+
value: constants.SOCKET_STATUS.READY,
327+
});
328+
329+
Object.defineProperty(provider, 'awsRealTimeSocket', {
330+
get: function () {
331+
fakeWebSocketInterface.newWebSocket();
332+
return fakeWebSocketInterface.webSocket;
333+
},
334+
});
335+
336+
// Saving this spy and resetting it by hand causes badness
337+
// Saving it causes new websockets to be reachable across past tests that have not fully closed
338+
// Resetting it proactively causes those same past tests to be dealing with null while they reach a settled state
339+
340+
// Set the addEventListener() to do nothing for the test
341+
jest
342+
.spyOn(fakeWebSocketInterface.webSocket, 'addEventListener')
343+
.mockImplementation(() => {});
344+
345+
jest.spyOn(provider as any, '_getNewWebSocket').mockImplementation(() => {
346+
fakeWebSocketInterface.newWebSocket();
347+
return fakeWebSocketInterface.webSocket as WebSocket;
348+
});
349+
350+
// Reduce retry delay for tests to 100ms
351+
Object.defineProperty(constants, 'MAX_DELAY_MS', {
352+
value: 100,
353+
});
354+
// Reduce retry delay for tests to 100ms
355+
Object.defineProperty(constants, 'RECONNECT_DELAY', {
356+
value: 100,
357+
});
358+
});
359+
360+
afterEach(async () => {
361+
provider?.close();
362+
await fakeWebSocketInterface?.closeInterface();
363+
fakeWebSocketInterface?.teardown();
364+
loggerSpy.mockClear();
365+
});
366+
367+
test('authenticating with AWS_IAM', async () => {
368+
expect.assertions(1);
369+
provider.publish({
370+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
371+
query: 'events/test',
372+
variables: { some: 'data' },
373+
authenticationType: 'iam',
374+
});
375+
376+
await fakeWebSocketInterface?.readyForUse;
377+
378+
expect(loggerSpy).toHaveBeenCalledWith(
379+
'DEBUG',
380+
'Authenticating with "iam"',
381+
);
382+
});
383+
384+
test('authenticating with API_KEY', async () => {
385+
expect.assertions(1);
386+
fakeWebSocketInterface.webSocket.readyState = WebSocket.OPEN;
387+
provider.publish({
388+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
389+
query: 'events/test',
390+
variables: { some: 'data' },
391+
authenticationType: 'apiKey',
392+
apiKey: 'test',
393+
});
394+
395+
await fakeWebSocketInterface?.readyForUse;
396+
397+
expect(loggerSpy).toHaveBeenCalledWith(
398+
'DEBUG',
399+
'Authenticating with "apiKey"',
400+
);
401+
});
402+
403+
test('authenticating with OPENID_CONNECT', async () => {
404+
expect.assertions(1);
405+
406+
provider.publish({
407+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
408+
query: 'events/test',
409+
variables: { some: 'data' },
410+
authenticationType: 'oidc',
411+
});
412+
413+
await fakeWebSocketInterface?.readyForUse;
414+
415+
expect(loggerSpy).toHaveBeenCalledWith(
416+
'DEBUG',
417+
'Authenticating with "oidc"',
418+
);
419+
});
420+
421+
test('authenticating with AWS_LAMBDA/custom', async () => {
422+
expect.assertions(1);
423+
424+
provider.publish({
425+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
426+
query: 'events/test',
427+
variables: { some: 'data' },
428+
authenticationType: 'none',
429+
additionalHeaders: {
430+
Authorization: 'test',
431+
},
432+
});
433+
434+
await fakeWebSocketInterface?.readyForUse;
435+
436+
expect(loggerSpy).toHaveBeenCalledWith(
437+
'DEBUG',
438+
'Authenticating with "none"',
439+
);
440+
});
441+
442+
test('authenticating with userPool / custom library options token', async () => {
443+
expect.assertions(1);
444+
445+
provider.publish({
446+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
447+
query: 'events/test',
448+
variables: { some: 'data' },
449+
authenticationType: 'userPool',
450+
/**
451+
* When Amplify is configured with a `header` function
452+
* that returns an `Authorization` token, the GraphQL
453+
* API will pass this function as the `libraryConfigHeaders`
454+
* option to the AWSAppSyncRealTimeProvider's `subscribe`
455+
* function.
456+
*/
457+
libraryConfigHeaders: async () => ({
458+
Authorization: 'test',
459+
}),
460+
});
461+
462+
await fakeWebSocketInterface?.readyForUse;
463+
464+
expect(loggerSpy).toHaveBeenCalledWith(
465+
'DEBUG',
466+
'Authenticating with "userPool"',
467+
);
468+
});
469+
470+
test('authenticating with AWS_LAMBDA/custom w/ custom header function', async () => {
471+
expect.assertions(1);
472+
473+
provider.publish({
474+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
475+
query: 'events/test',
476+
variables: { some: 'data' },
477+
authenticationType: 'none',
478+
additionalHeaders: async () => ({
479+
Authorization: 'test',
480+
}),
481+
});
482+
483+
await fakeWebSocketInterface?.readyForUse;
484+
485+
expect(loggerSpy).toBeCalledWith('DEBUG', 'Authenticating with "none"');
486+
});
487+
488+
test('authenticating with AWS_LAMBDA/custom without Authorization', async () => {
489+
expect.assertions(1);
490+
491+
await expect(
492+
provider.publish({
493+
appSyncGraphqlEndpoint: 'ws://localhost:8080',
494+
query: 'events/test',
495+
variables: { some: 'data' },
496+
authenticationType: 'none',
497+
additionalHeaders: {
498+
Authorization: '',
499+
},
500+
}),
501+
).rejects.toThrow('No auth token specified');
502+
});
503+
});
248504
});

packages/api-graphql/__tests__/events.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,42 @@ describe('Events client', () => {
173173
});
174174
});
175175

176+
describe('publish', () => {
177+
test('happy publish', async () => {
178+
const channel = await events.connect('/');
179+
180+
channel.publish({ some: 'data' });
181+
});
182+
183+
test('publish() becomes invalid after .close() is called', async () => {
184+
const channel = await events.connect('/');
185+
channel.close();
186+
await expect(channel.publish({ some: 'data' })).rejects.toThrow(
187+
'Channel is closed',
188+
);
189+
});
190+
191+
describe('auth modes', () => {
192+
let mockProvider: typeof AppSyncEventProvider;
193+
194+
beforeEach(() => {
195+
mockProvider = AppSyncEventProvider;
196+
});
197+
198+
for (const authMode of authModes) {
199+
test(`auth override: ${authMode}`, async () => {
200+
const channel = await events.connect('/');
201+
202+
channel.publish({ some: 'data' }, { authMode });
203+
204+
expect(mockProvider.publish).toHaveBeenCalledWith(
205+
expect.objectContaining({ authenticationType: authMode }),
206+
);
207+
});
208+
}
209+
});
210+
});
211+
176212
describe('post', () => {
177213
let mockReq: typeof appsyncRequest;
178214

packages/api-graphql/src/Providers/AWSAppSyncEventsProvider/index.ts

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { CustomHeaders } from '@aws-amplify/data-schema/runtime';
1212
import { DEFAULT_KEEP_ALIVE_TIMEOUT, MESSAGE_TYPES } from '../constants';
1313
import { AWSWebSocketProvider } from '../AWSWebSocketProvider';
1414
import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders';
15+
import { serializeEvents } from '../../internals/events/utils';
1516

1617
// resolved/actual AuthMode values. identityPool gets resolves to IAM upstream in InternalGraphQLAPI._graphqlSubscribe
1718
type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
@@ -52,6 +53,7 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
5253
wsProtocolName: WS_PROTOCOL_NAME,
5354
connectUri: CONNECT_URI,
5455
});
56+
this.allowNoSubscriptions = true;
5557
}
5658

5759
getProviderName() {
@@ -73,7 +75,11 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
7375
options: AWSAppSyncEventProviderOptions,
7476
customUserAgentDetails?: CustomUserAgentDetails,
7577
) {
76-
super.publish(options, customUserAgentDetails);
78+
return super.publish(options, customUserAgentDetails);
79+
}
80+
81+
public closeIfNoActiveSubscription() {
82+
this._closeSocketIfRequired();
7783
}
7884

7985
protected async _prepareSubscriptionPayload({
@@ -97,14 +103,14 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
97103
query,
98104
apiKey,
99105
region,
106+
variables,
100107
} = options;
101108

102-
// This will be needed for WS publish
103-
// const data = {
104-
// events: [variables],
105-
// };
106-
107-
const serializedData = JSON.stringify({ channel: query });
109+
const data = {
110+
channel: query,
111+
events: variables !== undefined ? serializeEvents(variables) : undefined,
112+
};
113+
const serializedData = JSON.stringify(data);
108114

109115
const headers = {
110116
...(await awsRealTimeHeaderBasedAuth({
@@ -121,22 +127,23 @@ export class AWSAppSyncEventProvider extends AWSWebSocketProvider {
121127
[USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails),
122128
};
123129

124-
// Commented out code will be needed for WS publish
125130
const subscriptionMessage = {
126131
id: subscriptionId,
127132
channel: query,
128-
// events: [JSON.stringify(variables)],
133+
events: variables !== undefined ? serializeEvents(variables) : undefined,
129134
authorization: {
130135
...headers,
131136
},
132-
// payload: {
133-
// events: serializedData,
134-
// extensions: {
135-
// authorization: {
136-
// ...headers,
137-
// },
138-
// },
139-
// },
137+
payload: {
138+
events:
139+
variables !== undefined ? serializeEvents(variables) : undefined,
140+
channel: query,
141+
extensions: {
142+
authorization: {
143+
...headers,
144+
},
145+
},
146+
},
140147
type: publish
141148
? MESSAGE_TYPES.EVENT_PUBLISH
142149
: MESSAGE_TYPES.EVENT_SUBSCRIBE,

0 commit comments

Comments
 (0)