Skip to content

Commit 95f3cc1

Browse files
authored
chore: dispatch all events to EventBus (#445)
* chore: create event bus * chore: add EventBus to Orchestration * fix linting errors * fix rebase error * refactor: EventCache publishes raw RumEvent * fix: EventCache.record returns ParsedRumEvent without publishing * chore: publish resource events * chore: publish navigation events * chore: resolve nits * chore: only publish image resource events * chore: only publish the first navigation event * Revert "chore: only publish the first navigation event" This reverts commit 620cc4c. * Revert "chore: only publish image resource events" This reverts commit f4f5f7b. * Revert "chore: resolve nits" This reverts commit 360f281. * Revert "chore: publish navigation events" This reverts commit 10d1854. * Revert "chore: publish resource events" This reverts commit 388c1cc. * Revert "fix: EventCache.record returns ParsedRumEvent without publishing" This reverts commit c2d81eb. * refactor: rename notify to dispatch * chore: rename context.bus to eventBus * chore: add topic enum to eventbus * chore: event bus dispatches with optional key * chore: dispatch resource events with keys * chore: dispatch lvl 2 navigation events with key * nit: rename topic.events to event * chore: fix merge conflict * nit: remove unnecessary import * chore: remove key * chore: remove remaining key usages
1 parent 81213b9 commit 95f3cc1

File tree

10 files changed

+202
-65
lines changed

10 files changed

+202
-65
lines changed

src/event-bus/EventBus.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
export type Subscriber = (message: any) => void;
2+
export enum Topic {
3+
EVENT = 'event'
4+
}
5+
6+
/** A topic-based event bus to facilitate communication between plugins */
7+
export default class EventBus<T = Topic> {
8+
// map<topic, subscriber>
9+
private subscribers = new Map<T, Subscriber[]>();
10+
11+
subscribe(topic: T, subscriber: Subscriber): void {
12+
const list = this.subscribers.get(topic) ?? [];
13+
if (list.length === 0) {
14+
this.subscribers.set(topic, list);
15+
}
16+
list.push(subscriber);
17+
}
18+
19+
unsubscribe(topic: T, subscriber: Subscriber) {
20+
const list = this.subscribers.get(topic);
21+
if (list) {
22+
for (let i = 0; i < list.length; i++) {
23+
if (list[i] === subscriber) {
24+
list.splice(i, 1);
25+
return true;
26+
}
27+
}
28+
}
29+
return false;
30+
}
31+
32+
dispatch(topic: T, message: any): void {
33+
const list = this.subscribers.get(topic);
34+
if (list) {
35+
for (const subscriber of list) {
36+
subscriber(message);
37+
}
38+
}
39+
}
40+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import EventBus from '../EventBus';
2+
3+
export enum MockTopics {
4+
FOOD = 'food',
5+
BOOKS = 'books'
6+
}
7+
describe('EventBus tests', () => {
8+
let eventBus: EventBus<MockTopics>;
9+
const l1 = jest.fn();
10+
const l2 = jest.fn();
11+
beforeEach(() => {
12+
eventBus = new EventBus();
13+
jest.clearAllMocks();
14+
});
15+
test('when dispatch is invoked then all listeners are called', async () => {
16+
// init
17+
eventBus.subscribe(MockTopics.FOOD, l1);
18+
eventBus.subscribe(MockTopics.FOOD, l2);
19+
20+
// run
21+
eventBus.dispatch(MockTopics.FOOD, 'burger');
22+
23+
// assert
24+
expect(l1).toHaveBeenCalledWith('burger');
25+
expect(l2).toHaveBeenCalledWith('burger');
26+
});
27+
28+
test('when subscriber is removed then it is not called', async () => {
29+
// init
30+
eventBus.subscribe(MockTopics.FOOD, l1);
31+
eventBus.subscribe(MockTopics.FOOD, l2);
32+
const removed = eventBus.unsubscribe(MockTopics.FOOD, l2);
33+
34+
// run
35+
eventBus.dispatch(MockTopics.FOOD, 'burger');
36+
37+
// assert
38+
expect(l1).toHaveBeenCalledWith('burger');
39+
expect(removed).toBe(true);
40+
expect(l2).not.toHaveBeenCalled();
41+
});
42+
43+
test('when subscribed to topic A then does not hear topic B', async () => {
44+
eventBus.subscribe(MockTopics.FOOD, l1);
45+
eventBus.subscribe(MockTopics.BOOKS, l2);
46+
47+
// run
48+
eventBus.dispatch(MockTopics.FOOD, 'burger');
49+
50+
// assert
51+
expect(l2).not.toHaveBeenCalled();
52+
});
53+
});

src/event-cache/EventCache.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
UserDetails,
99
RumEvent
1010
} from '../dispatch/dataplane';
11+
import EventBus, { Topic } from '../event-bus/EventBus';
1112

1213
const webClientVersion = '1.14.0';
1314

@@ -37,7 +38,11 @@ export class EventCache {
3738
* @param sessionManager The sessionManager returns user id, session id and handles session timeout.
3839
* @param pageManager The pageManager returns page id.
3940
*/
40-
constructor(applicationDetails: AppMonitorDetails, config: Config) {
41+
constructor(
42+
applicationDetails: AppMonitorDetails,
43+
config: Config,
44+
private eventBus = new EventBus<Topic>()
45+
) {
4146
this.appMonitorDetails = applicationDetails;
4247
this.config = config;
4348
this.enabled = true;
@@ -220,12 +225,20 @@ export class EventCache {
220225
'aws:clientVersion': webClientVersion
221226
};
222227

223-
this.events.push({
224-
details: JSON.stringify(eventData),
228+
const partialEvent = {
225229
id: v4(),
226-
metadata: JSON.stringify(metaData),
227230
timestamp: new Date(),
228231
type
232+
};
233+
this.eventBus.dispatch(Topic.EVENT, {
234+
...partialEvent,
235+
details: eventData,
236+
metadata: metaData
237+
});
238+
this.events.push({
239+
...partialEvent,
240+
details: JSON.stringify(eventData),
241+
metadata: JSON.stringify(metaData)
229242
});
230243
};
231244

src/event-cache/__tests__/EventCache.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { SessionManager } from '../../sessions/SessionManager';
55
import { RumEvent } from '../../dispatch/dataplane';
66
import { DEFAULT_CONFIG, mockFetch } from '../../test-utils/test-utils';
77
import { INSTALL_MODULE, INSTALL_SCRIPT } from '../../utils/constants';
8+
import EventBus, { Topic } from '../../event-bus/EventBus';
9+
jest.mock('../../event-bus/EventBus');
810

911
global.fetch = mockFetch;
1012
const getSession = jest.fn(() => ({
@@ -492,6 +494,60 @@ describe('EventCache tests', () => {
492494
expect(eventCache.getEventBatch().length).toEqual(1);
493495
});
494496

497+
test('when event is recorded then events subscribers are notified with parsed rum event', async () => {
498+
// Init
499+
const EVENT1_SCHEMA = 'com.amazon.rum.event1';
500+
const bus = new EventBus();
501+
const eventCache: EventCache = Utils.createEventCache(
502+
DEFAULT_CONFIG,
503+
bus
504+
);
505+
506+
const event = {
507+
id: expect.stringMatching(/[0-9a-f\-]+/),
508+
timestamp: new Date(),
509+
type: EVENT1_SCHEMA,
510+
metadata: `{"version":"1.0.0","aws:client":"${INSTALL_MODULE}","aws:clientVersion":"${WEB_CLIENT_VERSION}"}`,
511+
details: '{}'
512+
};
513+
514+
// Run
515+
eventCache.recordEvent(EVENT1_SCHEMA, {});
516+
const eventBatch: RumEvent[] = eventCache.getEventBatch();
517+
expect(eventBatch).toEqual(expect.arrayContaining([event]));
518+
// eslint-disable-next-line
519+
expect(bus.dispatch).toHaveBeenCalledWith(
520+
Topic.EVENT,
521+
expect.objectContaining({
522+
id: expect.stringMatching(/[0-9a-f\-]+/),
523+
timestamp: new Date(),
524+
type: EVENT1_SCHEMA,
525+
metadata: expect.objectContaining({
526+
version: '1.0.0',
527+
'aws:client': INSTALL_MODULE,
528+
'aws:clientVersion': WEB_CLIENT_VERSION
529+
}),
530+
details: expect.objectContaining({})
531+
})
532+
);
533+
});
534+
535+
test('when cache is disabled then subscribers are not notified', async () => {
536+
// Init
537+
const EVENT1_SCHEMA = 'com.amazon.rum.event1';
538+
const bus = new EventBus();
539+
const eventCache: EventCache = Utils.createEventCache(
540+
DEFAULT_CONFIG,
541+
bus
542+
);
543+
// Run
544+
eventCache.disable();
545+
eventCache.recordEvent(EVENT1_SCHEMA, {});
546+
const eventBatch: RumEvent[] = eventCache.getEventBatch();
547+
expect(eventBatch).toHaveLength(0);
548+
expect(bus.dispatch).not.toHaveBeenCalled(); // eslint-disable-line
549+
});
550+
495551
test('when event limit is zero then recordEvent records all events', async () => {
496552
// Init
497553
const eventCount = 0;

src/orchestration/Orchestration.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { FetchPlugin } from '../plugins/event-plugins/FetchPlugin';
2424
import { PageViewPlugin } from '../plugins/event-plugins/PageViewPlugin';
2525
import { PageAttributes } from '../sessions/PageManager';
2626
import { INSTALL_MODULE } from '../utils/constants';
27+
import EventBus, { Topic } from '../event-bus/EventBus';
2728

2829
const DEFAULT_REGION = 'us-west-2';
2930
const DEFAULT_ENDPOINT = `https://dataplane.rum.${DEFAULT_REGION}.amazonaws.com`;
@@ -206,6 +207,7 @@ export class Orchestration {
206207
private eventCache: EventCache;
207208
private dispatchManager: Dispatch;
208209
private config: Config;
210+
private eventBus = new EventBus<Topic>();
209211

210212
/**
211213
* Instantiate the CloudWatch RUM web client and begin monitoring the
@@ -444,7 +446,8 @@ export class Orchestration {
444446
config: this.config,
445447
record: this.eventCache.recordEvent,
446448
recordPageView: this.eventCache.recordPageView,
447-
getSession: this.eventCache.getSession
449+
getSession: this.eventCache.getSession,
450+
eventBus: this.eventBus
448451
};
449452

450453
// Initialize PluginManager

src/plugins/event-plugins/__tests__/FetchPlugin.test.ts

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import {
55
} from '../../utils/http-utils';
66
import { advanceTo } from 'jest-date-mock';
77
import {
8+
context,
89
DEFAULT_CONFIG,
10+
getSession,
911
record,
1012
recordPageView,
1113
xRayOffContext,
@@ -73,6 +75,7 @@ describe('FetchPlugin tests', () => {
7375
mockFetchWithErrorObject.mockClear();
7476
mockFetchWithErrorObjectAndStack.mockClear();
7577
record.mockClear();
78+
getSession.mockClear();
7679
});
7780

7881
test('when fetch is called then the plugin records the http request/response', async () => {
@@ -537,17 +540,9 @@ describe('FetchPlugin tests', () => {
537540
logicalServiceName: 'sample.rum.aws.amazon.com',
538541
urlsToInclude: [/aws\.amazon\.com/]
539542
};
540-
const xRayOnContext: PluginContext = {
541-
applicationId: 'b',
542-
applicationVersion: '1.0',
543-
config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } },
544-
record,
545-
recordPageView,
546-
getSession
547-
};
548-
543+
const context = Object.assign({}, xRayOnContext, { getSession });
549544
const plugin: FetchPlugin = new FetchPlugin(config);
550-
plugin.load(xRayOnContext);
545+
plugin.load(context);
551546

552547
// Run
553548
await fetch(URL);
@@ -566,17 +561,10 @@ describe('FetchPlugin tests', () => {
566561
logicalServiceName: 'sample.rum.aws.amazon.com',
567562
urlsToInclude: [/aws\.amazon\.com/]
568563
};
569-
const xRayOnContext: PluginContext = {
570-
applicationId: 'b',
571-
applicationVersion: '1.0',
572-
config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } },
573-
record,
574-
recordPageView,
575-
getSession
576-
};
564+
const context = Object.assign({}, xRayOnContext, { getSession });
577565

578566
const plugin: FetchPlugin = new FetchPlugin(config);
579-
plugin.load(xRayOnContext);
567+
plugin.load(context);
580568

581569
// Run
582570
await fetch(URL);

src/plugins/event-plugins/__tests__/ResourcePlugin.test.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,14 @@ describe('ResourcePlugin tests', () => {
7070
test('when recordResourceUrl is false then the resource name is not recorded', async () => {
7171
// Setup
7272
mockRandom(0); // Retain order in shuffle
73-
const context: PluginContext = {
74-
applicationId: 'b',
75-
applicationVersion: '1.0',
76-
config: { ...DEFAULT_CONFIG, recordResourceUrl: false },
77-
record,
78-
recordPageView,
79-
getSession
80-
};
73+
8174
const plugin: ResourcePlugin = buildResourcePlugin();
75+
const mockContext = Object.assign({}, context, {
76+
config: { ...DEFAULT_CONFIG, recordResourceUrl: false }
77+
});
8278

8379
// Run
84-
plugin.load(context);
80+
plugin.load(mockContext);
8581
window.dispatchEvent(new Event('load'));
8682
plugin.disable();
8783

src/plugins/event-plugins/__tests__/XhrPlugin.test.ts

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { PartialHttpPluginConfig } from '../../utils/http-utils';
22
import { advanceTo } from 'jest-date-mock';
33
import { XhrPlugin } from '../XhrPlugin';
44
import {
5+
context as mockContext,
56
xRayOffContext,
67
xRayOnContext,
78
record,
@@ -537,14 +538,9 @@ describe('XhrPlugin tests', () => {
537538
record: false,
538539
eventCount: 0
539540
}));
540-
const context: PluginContext = {
541-
applicationId: 'b',
542-
applicationVersion: '1.0',
543-
config: DEFAULT_CONFIG,
544-
record,
545-
recordPageView,
546-
getSession
547-
};
541+
542+
const context = { ...mockContext, getSession };
543+
548544
const config: PartialHttpPluginConfig = {
549545
logicalServiceName: 'sample.rum.aws.amazon.com',
550546
urlsToInclude: [/response\.json/]
@@ -574,14 +570,7 @@ describe('XhrPlugin tests', () => {
574570
test('when getSession returns undefined then the plugin does not record a trace', async () => {
575571
// Init
576572
const getSession: jest.MockedFunction<GetSession> = jest.fn();
577-
const context: PluginContext = {
578-
applicationId: 'b',
579-
applicationVersion: '1.0',
580-
config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } },
581-
record,
582-
recordPageView,
583-
getSession
584-
};
573+
const context = { ...mockContext, getSession };
585574
const config: PartialHttpPluginConfig = {
586575
logicalServiceName: 'sample.rum.aws.amazon.com',
587576
urlsToInclude: [/response\.json/],

src/plugins/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { Config } from '../orchestration/Orchestration';
22
import { Session } from '../sessions/SessionManager';
3+
import EventBus, { Topic } from '../event-bus/EventBus';
34

45
export type RecordEvent = (type: string, eventData: object) => void;
56
export type RecordPageView = (pageId: string) => void;
6-
77
export type GetSession = () => Session | undefined;
88

99
export type PluginContext = {
@@ -13,4 +13,5 @@ export type PluginContext = {
1313
record: RecordEvent;
1414
recordPageView: RecordPageView;
1515
getSession: GetSession;
16+
eventBus: EventBus<Topic>;
1617
};

0 commit comments

Comments
 (0)