Skip to content

Commit 842d9b7

Browse files
committed
Added failsafe in case producer is busy
1 parent 7dded84 commit 842d9b7

File tree

2 files changed

+63
-7
lines changed

2 files changed

+63
-7
lines changed
Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,68 @@
11
import { EventsRecord } from "./EventEmittingComponent";
22

33
type ListenersHolder<Events extends EventsRecord> = {
4-
// eslint-disable-next-line putout/putout
5-
[key in keyof Events]?: ((...args: Events[key]) => void)[];
4+
[key in keyof Events]?: {
5+
id: number;
6+
listener: (...args: Events[key]) => void;
7+
}[];
68
};
79

810
export class EventEmitter<Events extends EventsRecord> {
911
private readonly listeners: ListenersHolder<Events> = {};
1012

13+
private counter = 0;
14+
15+
// Fields used for offSelf()
16+
private currentListenerId: number | undefined = undefined;
17+
18+
private currentListenerEventName: keyof Events | undefined = undefined;
19+
1120
public emit(event: keyof Events, ...parameters: Events[typeof event]) {
1221
const listeners = this.listeners[event];
13-
if(listeners !== undefined) {
22+
if (listeners !== undefined) {
23+
this.currentListenerEventName = event;
24+
1425
listeners.forEach((listener) => {
15-
listener(...parameters);
26+
this.currentListenerId = listener.id;
27+
28+
listener.listener(...parameters);
29+
30+
this.currentListenerId = undefined;
1631
});
32+
this.currentListenerEventName = undefined;
1733
}
1834
}
1935

2036
public on<Key extends keyof Events>(
2137
event: Key,
2238
listener: (...args: Events[Key]) => void
23-
) {
24-
(this.listeners[event] ??= []).push(listener);
39+
): number {
40+
// eslint-disable-next-line no-multi-assign
41+
const id = (this.counter += 1);
42+
(this.listeners[event] ??= []).push({
43+
id,
44+
listener,
45+
});
46+
return id;
47+
}
48+
49+
// eslint-disable-next-line no-warning-comments
50+
// TODO Improve to be thread-safe
51+
public offSelf() {
52+
if (
53+
this.currentListenerEventName !== undefined &&
54+
this.currentListenerId !== undefined
55+
) {
56+
this.off(this.currentListenerEventName, this.currentListenerId);
57+
}
58+
}
59+
60+
public off<Key extends keyof Events>(event: Key, id: number) {
61+
const listeners = this.listeners[event];
62+
if (listeners !== undefined) {
63+
this.listeners[event] = listeners.filter(
64+
(listener) => listener.id !== id
65+
);
66+
}
2567
}
2668
}

packages/sequencer/src/protocol/production/trigger/AutomaticBlockTrigger.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,21 @@ export class AutomaticBlockTrigger
2828
// eslint-disable-next-line @typescript-eslint/no-misused-promises
2929
this.mempool.events.on("transactionAdded", async () => {
3030
log.info("Transaction received, creating block...");
31-
await this.unprovenProducerModule.tryProduceUnprovenBlock();
31+
const block = await this.unprovenProducerModule.tryProduceUnprovenBlock();
32+
33+
// In case the block producer was busy, we need to re-trigger production
34+
// as soon as the previous production was finished
35+
if (block === undefined) {
36+
this.unprovenProducerModule.events.on(
37+
"unprovenBlockProduced",
38+
async () => {
39+
// eslint-disable-next-line max-len
40+
// Make sure this comes before await, because otherwise we have a race condition
41+
this.unprovenProducerModule.events.offSelf();
42+
await this.unprovenProducerModule.tryProduceUnprovenBlock();
43+
}
44+
);
45+
}
3246
});
3347
}
3448
}

0 commit comments

Comments
 (0)