Skip to content

Commit 137355c

Browse files
committed
rework to @effectionx/process and createReplaySignal
1 parent 3f79075 commit 137355c

File tree

8 files changed

+225
-61
lines changed

8 files changed

+225
-61
lines changed

package-lock.json

Lines changed: 100 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/server/package.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
],
2828
"scripts": {
2929
"clean": "echo skip",
30-
"test": "node --import tsx --test test/*.test.ts",
30+
"test": "node --test-timeout=60000 --import tsx --test test/*.test.ts",
3131
"prepack": "npm run build",
3232
"build": "tsdown",
3333
"lint": "echo noop",
@@ -36,9 +36,9 @@
3636
},
3737
"dependencies": {
3838
"effection": "^4.0.0",
39-
"@effectionx/timebox": "^0.3.1",
40-
"@effectionx/tinyexec": "^0.3.1",
41-
"@effectionx/context-api": "^0.2.1"
39+
"@effectionx/context-api": "^0.2.1",
40+
"@effectionx/process": "^0.6.2",
41+
"@effectionx/timebox": "^0.3.1"
4242
},
4343
"devDependencies": {},
4444
"exports": {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import type { Resolve, Subscription } from "effection";
2+
import { action, resource } from "effection";
3+
4+
export function createReplaySignal<T, TClose>() {
5+
const subscribers = new Set<Subscription<T, TClose>>();
6+
// single shared durable queue storage
7+
const queue = createDurableQueue<T, TClose>();
8+
9+
// each subscriber gets its own iterator over the shared items by
10+
// calling `queue.subscribe()` which returns a Stream
11+
const subscribe = resource<Subscription<T, TClose>>(function* (provide) {
12+
const queued = queue.stream();
13+
subscribers.add(queued);
14+
15+
try {
16+
yield* provide({ next: queued.next });
17+
} finally {
18+
subscribers.delete(queued);
19+
}
20+
});
21+
22+
function send(value: T) {
23+
queue.add(value);
24+
}
25+
26+
function close(value?: TClose) {
27+
queue.close(value);
28+
}
29+
30+
return { ...subscribe, send, close };
31+
}
32+
33+
function createDurableQueue<T, TClose = never>() {
34+
type Item = IteratorResult<T, TClose>;
35+
36+
const items: Item[] = [];
37+
38+
// a set of active subscribers; each subscription has its own iterator
39+
// and its own waiting notifier set
40+
const subscribers = new Set<{
41+
notify: Set<Resolve<Item>>;
42+
}>();
43+
44+
function enqueue(item: Item) {
45+
items.push(item);
46+
for (const sub of subscribers) {
47+
if (sub.notify.size > 0) {
48+
const [resolve] = sub.notify;
49+
// use resolve from yield* action to notify waiting subscribers
50+
resolve(item);
51+
}
52+
}
53+
}
54+
55+
function stream(): Subscription<T, TClose> {
56+
const iter = items[Symbol.iterator]();
57+
const notify = new Set<Resolve<Item>>();
58+
const sub = { notify };
59+
subscribers.add(sub);
60+
61+
return {
62+
*next() {
63+
const item = iter.next().value;
64+
// item will be `undefined` when we've iterated to the end of the
65+
// current `items` array; in that case we wait for new items to be
66+
// enqueued and the resolve will be called with the new `Item`.
67+
if (item !== undefined) {
68+
return item;
69+
}
70+
return yield* action<Item>((resolve) => {
71+
notify.add(resolve);
72+
return () => notify.delete(resolve);
73+
});
74+
},
75+
};
76+
}
77+
78+
return {
79+
add: (value: T) => enqueue({ done: false, value }),
80+
close: (value?: TClose) => enqueue({ done: true, value: value as TClose }),
81+
stream,
82+
};
83+
}

packages/server/src/logging.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ export const stdio = createApi("@simulacrum/logging", {
55
*stdout(line: string): Operation<void> {
66
console.log(line);
77
},
8+
*stderr(line: string): Operation<void> {
9+
console.log(line);
10+
},
811
});
912

1013
export const { stdout } = stdio.operations;
14+
export const { stderr } = stdio.operations;

packages/server/src/process.ts

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)