Skip to content

Commit 20f7908

Browse files
committed
Added Closeable collection and shutdown mechanism
1 parent 3acc903 commit 20f7908

File tree

10 files changed

+98
-10
lines changed

10 files changed

+98
-10
lines changed

packages/sequencer/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export * from "./mempool/private/PrivateMempool";
66
export * from "./sequencer/executor/Sequencer";
77
export * from "./sequencer/executor/Sequenceable";
88
export * from "./sequencer/builder/SequencerModule";
9+
export * from "./sequencer/builder/Closeable";
910
export * from "./worker/flow/Flow";
1011
export * from "./worker/flow/Task";
1112
export * from "./worker/flow/JSONTaskSerializer";

packages/sequencer/src/protocol/production/trigger/TimedBlockTrigger.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { inject, injectable } from "tsyringe";
22
import { injectOptional, log } from "@proto-kit/common";
33
import gcd from "compute-gcd";
44

5-
import { Closeable } from "../../../worker/queue/TaskQueue";
5+
import { Closeable } from "../../../sequencer/builder/Closeable";
66
import { BatchProducerModule } from "../BatchProducerModule";
77
import { Mempool } from "../../../mempool/Mempool";
88
import { BlockQueue } from "../../../storage/repositories/BlockStorage";
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { implement } from "@proto-kit/common/dist/config/injectAlias";
2+
3+
export interface Closeable {
4+
close: () => Promise<void>;
5+
}
6+
7+
export function closeable() {
8+
return implement<Closeable>("Closeable");
9+
}

packages/sequencer/src/sequencer/executor/Sequencer.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
import { DependencyContainer, injectable } from "tsyringe";
1919

2020
import { SequencerModule } from "../builder/SequencerModule";
21+
import { Closeable } from "../builder/Closeable";
2122

2223
import { Sequenceable } from "./Sequenceable";
2324

@@ -86,4 +87,13 @@ export class Sequencer<Modules extends SequencerModulesRecord>
8687
await sequencerModule.start();
8788
}
8889
}
90+
91+
public async close() {
92+
log.info("Closing sequencer...");
93+
const closeables = this.container.resolveAll<Closeable>("Closeable");
94+
await Promise.all(
95+
closeables.map(async (closeable) => await closeable.close())
96+
);
97+
log.info("Sequencer closed");
98+
}
8999
}

packages/sequencer/src/worker/flow/Flow.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { inject, injectable } from "tsyringe";
22
import { log, mapSequential } from "@proto-kit/common";
33

4-
import { Closeable, InstantiatedQueue, TaskQueue } from "../queue/TaskQueue";
4+
import { InstantiatedQueue, TaskQueue } from "../queue/TaskQueue";
5+
import { Closeable } from "../../sequencer/builder/Closeable";
56

67
import { Task, TaskPayload } from "./Task";
78

packages/sequencer/src/worker/queue/LocalTaskQueue.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ import { log, mapSequential, noop } from "@proto-kit/common";
22

33
import { sequencerModule } from "../../sequencer/builder/SequencerModule";
44
import { TaskPayload } from "../flow/Task";
5+
import { Closeable } from "../../sequencer/builder/Closeable";
56

6-
import { Closeable, InstantiatedQueue, TaskQueue } from "./TaskQueue";
7+
import { InstantiatedQueue, TaskQueue } from "./TaskQueue";
78
import { ListenerList } from "./ListenerList";
89
import { AbstractTaskQueue } from "./AbstractTaskQueue";
910

packages/sequencer/src/worker/queue/TaskQueue.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { TaskPayload } from "../flow/Task";
2+
import { Closeable } from "../../sequencer/builder/Closeable";
23

34
/**
45
* Definition of a connection-object that can generate queues and workers
@@ -13,11 +14,6 @@ export interface TaskQueue {
1314
options?: { concurrency?: number }
1415
) => Closeable;
1516
}
16-
17-
export interface Closeable {
18-
close: () => Promise<void>;
19-
}
20-
2117
/**
2218
* Object that abstracts a concrete connection to a queue instance.
2319
*/

packages/sequencer/src/worker/worker/FlowTaskWorker.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { log } from "@proto-kit/common";
22

3-
import { Closeable, TaskQueue } from "../queue/TaskQueue";
3+
import { TaskQueue } from "../queue/TaskQueue";
44
import { Task, TaskPayload } from "../flow/Task";
55
import { AbstractStartupTask } from "../flow/AbstractStartupTask";
66
import { UnpreparingTask } from "../flow/UnpreparingTask";
7+
import { Closeable } from "../../sequencer/builder/Closeable";
78

89
const errors = {
910
notComputable: (name: string) =>

packages/sequencer/src/worker/worker/startup/WorkerRegistrationFlow.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { injectable } from "tsyringe";
22
import { log } from "@proto-kit/common";
33

4-
import { Closeable } from "../../queue/TaskQueue";
4+
import { Closeable } from "../../../sequencer/builder/Closeable";
55
import { FlowCreator } from "../../flow/Flow";
66

77
import {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import "reflect-metadata";
2+
import { DependencyFactory, noop, sleep } from "@proto-kit/common";
3+
import { jest } from "@jest/globals";
4+
import { container } from "tsyringe";
5+
6+
import {
7+
Closeable,
8+
closeable,
9+
Sequencer,
10+
sequencerModule,
11+
SequencerModule,
12+
} from "../../../src";
13+
14+
describe("Sequencer close", () => {
15+
it("should close all services", async () => {
16+
const spyFn = jest.fn<() => void>();
17+
18+
@closeable()
19+
@sequencerModule()
20+
class CloseableModule extends SequencerModule implements Closeable {
21+
public async start(): Promise<void> {
22+
noop();
23+
}
24+
25+
public async close() {
26+
await sleep(200);
27+
spyFn.call(undefined);
28+
}
29+
}
30+
31+
@sequencerModule()
32+
class DependencyFactoryModule
33+
extends SequencerModule
34+
implements DependencyFactory
35+
{
36+
public async start(): Promise<void> {
37+
noop();
38+
}
39+
40+
dependencies() {
41+
return {
42+
Dep2: {
43+
useClass: CloseableModule,
44+
},
45+
};
46+
}
47+
}
48+
49+
const sequencer = new (Sequencer.from({
50+
modules: {
51+
Foo: CloseableModule,
52+
Bar: CloseableModule,
53+
D: DependencyFactoryModule,
54+
},
55+
}))();
56+
sequencer.create(() => container.createChildContainer());
57+
sequencer.configure({
58+
Foo: {},
59+
Bar: {},
60+
D: {},
61+
});
62+
63+
sequencer.resolve("D");
64+
65+
await sequencer.close();
66+
67+
expect(spyFn).toHaveBeenCalledTimes(3);
68+
});
69+
});

0 commit comments

Comments
 (0)