Skip to content

Commit 418b9ca

Browse files
committed
spike out dag to start services
1 parent ba03271 commit 418b9ca

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

packages/server/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from "./logging.ts";
22
export * from "./service.ts";
3+
export * from "./services.ts";

packages/server/src/services.ts

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import {
2+
type Operation,
3+
resource,
4+
spawn,
5+
suspend,
6+
withResolvers,
7+
} from "effection";
8+
9+
export type ServiceDefinition = {
10+
// The operation that starts the service and returns when the service is ready.
11+
operation: Operation<void>;
12+
deps?: string[];
13+
options?: {
14+
// Keep an options object for future expansion or hooks; currently unused when operation is present
15+
};
16+
// lifecycle hooks
17+
beforeStart?: () => Operation<void>;
18+
afterStart?: () => Operation<void>;
19+
beforeStop?: () => Operation<void>;
20+
afterStop?: () => Operation<void>;
21+
};
22+
23+
export type ServicesMap = Record<string, ServiceDefinition>;
24+
25+
function computeLevels(services: ServicesMap): string[][] {
26+
const indeg: Record<string, number> = {};
27+
const graph: Record<string, Set<string>> = {};
28+
for (const name of Object.keys(services)) {
29+
indeg[name] = 0;
30+
graph[name] = new Set();
31+
}
32+
for (const [name, def] of Object.entries(services)) {
33+
for (const dep of def.deps ?? []) {
34+
if (!(dep in services)) {
35+
throw new Error(
36+
`Service '${name}' depends on unknown service '${dep}'`
37+
);
38+
}
39+
graph[dep].add(name);
40+
indeg[name] = (indeg[name] || 0) + 1;
41+
}
42+
}
43+
44+
const levels: string[][] = [];
45+
let q: string[] = [];
46+
for (const [k, v] of Object.entries(indeg)) {
47+
if (v === 0) q.push(k);
48+
}
49+
50+
let processed = 0;
51+
while (q.length) {
52+
const currentLayer = q.slice();
53+
levels.push(currentLayer);
54+
processed += currentLayer.length;
55+
const next: string[] = [];
56+
for (const n of currentLayer) {
57+
for (const m of graph[n]) {
58+
indeg[m] -= 1;
59+
if (indeg[m] === 0) next.push(m);
60+
}
61+
}
62+
q = next;
63+
}
64+
if (processed !== Object.keys(services).length) {
65+
throw new Error(`Cycle detected in services`);
66+
}
67+
return levels;
68+
}
69+
70+
function* waitForAllReady(
71+
names: string[],
72+
readyResolvers: Map<string, any>
73+
): Operation<void> {
74+
for (const n of names) {
75+
const r = readyResolvers.get(n);
76+
if (r) {
77+
yield* r.operation;
78+
}
79+
}
80+
}
81+
82+
/**
83+
* useServiceGraph
84+
*
85+
* Start a set of services with dependencies (a DAG). Each service must provide an
86+
* Operation<void> that starts the service and returns once the service is ready.
87+
*
88+
* Example usage:
89+
*
90+
* yield* useServiceGraph({
91+
* A: { operation: useService('A', 'node --import tsx ./test/services/service-a.ts') },
92+
* B: { operation: useService('B', 'node --import tsx ./test/services/service-b.ts'), deps: ['A'] }
93+
* });
94+
*
95+
* Services within the same topological layer are started concurrently. Lifecycle
96+
* hooks can be used to perform actions before or after each service starts or stops.
97+
*/
98+
export function useServiceGraph(services: ServicesMap): Operation<void> {
99+
return resource(function* (provide) {
100+
const layers = computeLevels(services);
101+
102+
// Map of readiness resolvers returned by `withResolvers`
103+
const readyResolvers = new Map<
104+
string,
105+
{
106+
operation: Operation<void>;
107+
resolve: () => void;
108+
reject: (err: Error) => void;
109+
}
110+
>();
111+
for (const name of Object.keys(services)) {
112+
const r = withResolvers<void>();
113+
readyResolvers.set(name, {
114+
operation: r.operation,
115+
resolve: r.resolve,
116+
reject: r.reject,
117+
});
118+
}
119+
120+
// Keep track of start order so we can run beforeStop hooks in reverse
121+
const startOrder: string[] = [];
122+
123+
for (const layer of layers) {
124+
// spawn all services in this layer in parallel
125+
for (const name of layer) {
126+
const def = services[name];
127+
// wait for deps to be ready (yield the underlying Promise)
128+
if (def.deps) {
129+
for (const dep of def.deps) {
130+
const r = readyResolvers.get(dep);
131+
if (!r)
132+
throw new Error(
133+
`missing readiness resolver for dependency '${dep}'`
134+
);
135+
yield* r.operation;
136+
}
137+
}
138+
139+
// spawn a child operation that runs the service and keeps it alive with suspend()
140+
yield* spawn(function* () {
141+
try {
142+
if (def.beforeStart) yield* def.beforeStart();
143+
144+
// The caller-supplied operation starts the service and returns when ready.
145+
yield* def.operation;
146+
startOrder.push(name);
147+
const res = readyResolvers.get(name);
148+
if (res) res.resolve();
149+
if (def.afterStart) yield* def.afterStart();
150+
151+
yield* suspend();
152+
} finally {
153+
// run afterStop hooks in child finalizer so they are executed after the
154+
// process has cleaned up
155+
if (def.afterStop) yield* def.afterStop();
156+
}
157+
});
158+
}
159+
// after spawning the whole layer, wait until every service in the layer is ready
160+
yield* waitForAllReady(layer, readyResolvers);
161+
}
162+
163+
try {
164+
yield* provide();
165+
} finally {
166+
// Run beforeStop hooks in reverse start order
167+
for (const name of startOrder.slice().reverse()) {
168+
const def = services[name];
169+
if (def.beforeStop) {
170+
yield* def.beforeStop();
171+
}
172+
}
173+
}
174+
});
175+
}

0 commit comments

Comments
 (0)