Skip to content

Commit db7bfd8

Browse files
ggazzosampaiodiego
andauthored
feat: move event notification handling in EventService and refactor S… (#315)
Co-authored-by: Diego Sampaio <chinello@gmail.com>
1 parent a72d319 commit db7bfd8

File tree

11 files changed

+238
-60
lines changed

11 files changed

+238
-60
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/** @public */
2+
export type DefaultEventMap = Record<string | symbol, any>;
3+
4+
/** @public */
5+
export type AnyEventTypeOf<EventMap extends DefaultEventMap> = keyof EventMap;
6+
7+
/** @public */
8+
export type AnyEventOf<EventMap extends DefaultEventMap> =
9+
EventMap[keyof EventMap];
10+
11+
/** @public */
12+
export type AnyEventHandlerOf<EventMap extends DefaultEventMap> = {
13+
[EventType in keyof EventMap]: EventMap[EventType] extends void
14+
? () => unknown | Promise<unknown>
15+
: (event: EventMap[EventType]) => unknown | Promise<unknown>;
16+
}[keyof EventMap];
17+
18+
/** @public */
19+
export type EventTypeOf<
20+
EventMap extends DefaultEventMap,
21+
EventValue extends EventMap[keyof EventMap],
22+
> = {
23+
[EventType in keyof EventMap]: EventMap[EventType] extends EventValue
24+
? EventType
25+
: never;
26+
}[keyof EventMap];
27+
28+
/** @public */
29+
export type EventOf<
30+
EventMap extends DefaultEventMap,
31+
EventType extends AnyEventTypeOf<EventMap>,
32+
> = EventMap[EventType] extends void ? never : EventMap[EventType];
33+
34+
/** @public */
35+
export type EventHandlerOf<
36+
EventMap extends DefaultEventMap,
37+
EventType extends AnyEventTypeOf<EventMap>,
38+
> = EventMap[EventType] extends void
39+
? () => unknown | Promise<unknown>
40+
: (event: EventMap[EventType]) => unknown | Promise<unknown>;
41+
42+
/** @public */
43+
export type OffCallbackHandler = () => void;
44+
45+
/** @public */
46+
export interface IAsyncDispatcher<
47+
EventMap extends DefaultEventMap = DefaultEventMap,
48+
> {
49+
on<
50+
T extends AnyEventOf<EventMap>,
51+
EventType extends AnyEventTypeOf<EventMap> = EventTypeOf<EventMap, T>,
52+
>(
53+
type: EventType,
54+
handler: EventHandlerOf<EventMap, EventType>,
55+
): OffCallbackHandler;
56+
once<
57+
T extends AnyEventOf<EventMap>,
58+
EventType extends AnyEventTypeOf<EventMap> = EventTypeOf<EventMap, T>,
59+
>(
60+
type: EventType,
61+
handler: EventHandlerOf<EventMap, EventType>,
62+
): OffCallbackHandler;
63+
off<
64+
T extends AnyEventOf<EventMap>,
65+
EventType extends AnyEventTypeOf<EventMap> = EventTypeOf<EventMap, T>,
66+
>(type: EventType, handler: EventHandlerOf<EventMap, EventType>): void;
67+
68+
has(key: AnyEventTypeOf<EventMap>): boolean;
69+
events(): AnyEventTypeOf<EventMap>[];
70+
71+
emit<
72+
T extends AnyEventOf<EventMap>,
73+
EventType extends AnyEventTypeOf<EventMap> = EventTypeOf<EventMap, T>,
74+
>(
75+
type: EventType,
76+
...[event]: EventOf<EventMap, EventType> extends void
77+
? [undefined?]
78+
: [EventOf<EventMap, EventType>]
79+
): Promise<void>;
80+
}
81+
82+
const kOnce = Symbol('once');
83+
const kEvents = Symbol('events');
84+
85+
/**
86+
* Fully async event dispatcher.
87+
*
88+
* Handlers may be sync or async, but dispatch always awaits them.
89+
*
90+
* @public
91+
*/
92+
export class AsyncDispatcher<EventMap extends DefaultEventMap = DefaultEventMap>
93+
implements IAsyncDispatcher<EventMap>
94+
{
95+
private [kEvents] = new Map<
96+
AnyEventTypeOf<EventMap>,
97+
AnyEventHandlerOf<EventMap>[]
98+
>();
99+
100+
private [kOnce] = new WeakMap<AnyEventHandlerOf<EventMap>, number>();
101+
102+
events(): AnyEventTypeOf<EventMap>[] {
103+
return Array.from(this[kEvents].keys());
104+
}
105+
106+
has(key: AnyEventTypeOf<EventMap>): boolean {
107+
return this[kEvents].has(key);
108+
}
109+
110+
on(type: keyof EventMap, handler: (...args: any[]) => unknown) {
111+
const handlers = this[kEvents].get(type) ?? [];
112+
handlers.push(handler);
113+
this[kEvents].set(type, handlers);
114+
return () => this.off(type, handler);
115+
}
116+
117+
once(type: keyof EventMap, handler: (...args: any[]) => unknown) {
118+
const count = this[kOnce].get(handler) || 0;
119+
this[kOnce].set(handler, count + 1);
120+
return this.on(type, handler);
121+
}
122+
123+
off(type: keyof EventMap, handler: (...args: any[]) => unknown) {
124+
const handlers = this[kEvents].get(type);
125+
if (!handlers) return;
126+
127+
const count = this[kOnce].get(handler) ?? 0;
128+
if (count > 1) {
129+
this[kOnce].set(handler, count - 1);
130+
} else {
131+
this[kOnce].delete(handler);
132+
}
133+
134+
const idx = handlers.findIndex((h) => h === handler);
135+
if (idx !== -1) handlers.splice(idx, 1);
136+
137+
if (handlers.length === 0) {
138+
this[kEvents].delete(type);
139+
}
140+
}
141+
142+
async emit(type: keyof EventMap, ...[event]: any[]): Promise<void> {
143+
const list = [...(this[kEvents].get(type) ?? [])];
144+
145+
for (const handler of list) {
146+
await handler(event);
147+
148+
if (this[kOnce].get(handler)) {
149+
this.off(type, handler);
150+
}
151+
}
152+
}
153+
}

packages/core/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,5 @@ export { makeUnsignedRequest } from './utils/makeRequest';
8484
export type { FetchResponse, MultipartResult } from './utils/fetch';
8585

8686
export { fetch } from './utils/fetch';
87+
88+
export * from './AsyncDispatcher';

packages/federation-sdk/src/index.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import 'reflect-metadata';
22

3-
import type { Emitter } from '@rocket.chat/emitter';
3+
import { Emitter } from '@rocket.chat/emitter';
44
import type { EventStagingStore } from '@rocket.chat/federation-core';
55
import type {
66
EventID,
@@ -185,14 +185,10 @@ export async function init({
185185
'rocketchat_federation_state_graphs',
186186
),
187187
});
188-
189188
const eventEmitterService = container.resolve(EventEmitterService);
190189
if (emitter) {
191190
eventEmitterService.setEmitter(emitter);
192-
} else {
193-
eventEmitterService.initializeStandalone();
194191
}
195-
196192
// this is required to initialize the listener and register the queue handler
197193
container.resolve(StagingAreaListener);
198194

packages/federation-sdk/src/sdk.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { singleton } from 'tsyringe';
55
import { AppConfig, ConfigService } from './services/config.service';
66
import { EduService } from './services/edu.service';
77
import { EventAuthorizationService } from './services/event-authorization.service';
8+
import { EventEmitterService } from './services/event-emitter.service';
89
import { EventService } from './services/event.service';
910
import { FederationRequestService } from './services/federation-request.service';
1011
import { FederationService } from './services/federation.service';
@@ -37,6 +38,7 @@ export class FederationSDK {
3738
private readonly wellKnownService: WellKnownService,
3839
private readonly federationRequestService: FederationRequestService,
3940
private readonly federationService: FederationService,
41+
public readonly eventEmitterService: EventEmitterService,
4042
) {}
4143

4244
createDirectMessageRoom(
Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,73 @@
11
import {
2-
Emitter,
2+
AsyncDispatcher,
33
type EventHandlerOf,
44
type EventOf,
5-
} from '@rocket.chat/emitter';
6-
import { logger } from '@rocket.chat/federation-core';
5+
logger,
6+
} from '@rocket.chat/federation-core';
77
import { singleton } from 'tsyringe';
88

9+
import { Emitter } from '@rocket.chat/emitter';
910
import type { HomeserverEventSignatures } from '..';
1011

1112
@singleton()
1213
export class EventEmitterService {
13-
private emitter: Emitter<HomeserverEventSignatures> =
14+
private emitter: AsyncDispatcher<HomeserverEventSignatures> =
15+
new AsyncDispatcher<HomeserverEventSignatures>();
16+
private oldEmitter: Emitter<HomeserverEventSignatures> =
1417
new Emitter<HomeserverEventSignatures>();
1518

1619
public setEmitter(emitter: Emitter<HomeserverEventSignatures>): void {
17-
this.emitter = emitter;
20+
this.oldEmitter = emitter;
1821
logger.info('EventEmitterService: External emitter injected');
1922
}
2023

21-
public initializeStandalone(): void {
22-
this.emitter = new Emitter<HomeserverEventSignatures>();
23-
logger.info('EventEmitterService: Standalone emitter initialized');
24-
}
25-
26-
public emit<K extends keyof HomeserverEventSignatures>(
24+
public async emit<K extends keyof HomeserverEventSignatures>(
2725
event: K,
2826
...[data]: EventOf<HomeserverEventSignatures, K> extends void
2927
? [undefined?]
3028
: [EventOf<HomeserverEventSignatures, K>]
31-
): void {
32-
this.emitter.emit(event, ...([data] as any));
29+
): Promise<void> {
30+
await this.emitter.emit(event, ...([data] as any));
31+
if (this.oldEmitter) {
32+
await this.oldEmitter.emit(event, ...([data] as any));
33+
}
3334
logger.debug({ msg: `Event emitted: ${event}`, event, data });
3435
}
3536

3637
public on<K extends keyof HomeserverEventSignatures>(
3738
event: K,
3839
handler: EventHandlerOf<HomeserverEventSignatures, K>,
3940
): (() => void) | undefined {
40-
return this.emitter.on(event, handler);
41+
const [handler1, handler2] = [
42+
this.emitter.on(event, handler),
43+
this.oldEmitter.on(event, handler),
44+
];
45+
return () => {
46+
handler1();
47+
handler2();
48+
};
4149
}
4250

4351
public once<K extends keyof HomeserverEventSignatures>(
4452
event: K,
4553
handler: EventHandlerOf<HomeserverEventSignatures, K>,
4654
): (() => void) | undefined {
47-
return this.emitter.once(event, handler);
55+
const [handler1, handler2] = [
56+
this.emitter.once(event, handler),
57+
this.oldEmitter.once(event, handler),
58+
];
59+
return () => {
60+
handler1();
61+
handler2();
62+
};
4863
}
4964

5065
public off<K extends keyof HomeserverEventSignatures>(
5166
event: K,
5267
handler: EventHandlerOf<HomeserverEventSignatures, K>,
5368
): void {
5469
this.emitter.off(event, handler);
55-
}
5670

57-
public getEmitter(): Emitter<HomeserverEventSignatures> {
58-
return this.emitter;
71+
this.oldEmitter.off(event, handler);
5972
}
6073
}

0 commit comments

Comments
 (0)