Skip to content

Commit d3850c6

Browse files
jboldataras
andauthored
bump server to effection v4 (#345)
* bump server to effection v4 * change file Bump server to `effection` v4. With the tight dependency fitting into an `effection` runtime, bumping with a minor despite no specific breaking changes directly in this package. * let github sim use root effection * switch to timebox package * add tests for 3 main cases * include logs for debugging * Increased timeout * rework to @effectionx/process and createReplaySignal * Update change log for effection v4 upgrade Updated to include changes related to the upgrade to `effection` v4 and the addition of `@effectionx/process` for improved functionality. --------- Co-authored-by: Taras Mankovski <[email protected]>
1 parent c3d75f5 commit d3850c6

File tree

12 files changed

+341
-162
lines changed

12 files changed

+341
-162
lines changed

.changes/change-pr-345.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@simulacrum/server": minor
3+
---
4+
5+
Bump server to `effection` v4. With the tight dependency fitting into an `effection` runtime, bumping with a minor despite no specific breaking changes directly in this package. Additionally, we swapped to `@effectionx/process` and pulled in some other helpers. This was to prevent edge cases which were noted as part of the upgrade.

package-lock.json

Lines changed: 128 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
},
3939
"devDependencies": {
4040
"@arethetypeswrong/core": "^0.18.2",
41+
"effection": "^4.0.0",
4142
"publint": "^0.3.13",
4243
"tsdown": "^0.15.4",
4344
"tsx": "^4.20.6",

packages/github-api/sync.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { main, Operation, until } from "effection";
1+
import { main, type Operation, until } from "effection";
22
import fs from "fs/promises";
33
import path from "path";
44

packages/server/package.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,18 @@
2727
],
2828
"scripts": {
2929
"clean": "echo skip",
30-
"test": "node --import tsx --test test/*.test.ts",
30+
"test": "node --test-timeout=60000 --import tsx --test test/*.test.ts",
3131
"prepack": "npm run build",
3232
"build": "tsdown",
3333
"lint": "echo noop",
3434
"tsc": "tsc --noEmit",
3535
"test:service-main": "node --import tsx ./test/service-main.ts"
3636
},
3737
"dependencies": {
38-
"effection": "^3.6.0",
39-
"@effectionx/tinyexec": "^0.2.0",
40-
"@effectionx/context-api": "^0.1.0"
38+
"effection": "^4.0.0",
39+
"@effectionx/context-api": "^0.2.1",
40+
"@effectionx/process": "^0.6.2",
41+
"@effectionx/timebox": "^0.3.1"
4142
},
4243
"devDependencies": {},
4344
"exports": {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import type { Resolve, Subscription } from "effection";
2+
import { action, resource } from "effection";
3+
4+
export function createReplaySignal<T, TClose>() {
5+
const subscribers = new Set<Subscription<T, TClose>>();
6+
// single shared durable queue storage
7+
const queue = createDurableQueue<T, TClose>();
8+
9+
// each subscriber gets its own iterator over the shared items by
10+
// calling `queue.subscribe()` which returns a Stream
11+
const subscribe = resource<Subscription<T, TClose>>(function* (provide) {
12+
const queued = queue.stream();
13+
subscribers.add(queued);
14+
15+
try {
16+
yield* provide({ next: queued.next });
17+
} finally {
18+
subscribers.delete(queued);
19+
}
20+
});
21+
22+
function send(value: T) {
23+
queue.add(value);
24+
}
25+
26+
function close(value?: TClose) {
27+
queue.close(value);
28+
}
29+
30+
return { ...subscribe, send, close };
31+
}
32+
33+
function createDurableQueue<T, TClose = never>() {
34+
type Item = IteratorResult<T, TClose>;
35+
36+
const items: Item[] = [];
37+
38+
// a set of active subscribers; each subscription has its own iterator
39+
// and its own waiting notifier set
40+
const subscribers = new Set<{
41+
notify: Set<Resolve<Item>>;
42+
}>();
43+
44+
function enqueue(item: Item) {
45+
items.push(item);
46+
for (const sub of subscribers) {
47+
if (sub.notify.size > 0) {
48+
const [resolve] = sub.notify;
49+
// use resolve from yield* action to notify waiting subscribers
50+
resolve(item);
51+
}
52+
}
53+
}
54+
55+
function stream(): Subscription<T, TClose> {
56+
const iter = items[Symbol.iterator]();
57+
const notify = new Set<Resolve<Item>>();
58+
const sub = { notify };
59+
subscribers.add(sub);
60+
61+
return {
62+
*next() {
63+
const item = iter.next().value;
64+
// item will be `undefined` when we've iterated to the end of the
65+
// current `items` array; in that case we wait for new items to be
66+
// enqueued and the resolve will be called with the new `Item`.
67+
if (item !== undefined) {
68+
return item;
69+
}
70+
return yield* action<Item>((resolve) => {
71+
notify.add(resolve);
72+
return () => notify.delete(resolve);
73+
});
74+
},
75+
};
76+
}
77+
78+
return {
79+
add: (value: T) => enqueue({ done: false, value }),
80+
close: (value?: TClose) => enqueue({ done: true, value: value as TClose }),
81+
stream,
82+
};
83+
}

packages/server/src/logging.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ export const stdio = createApi("@simulacrum/logging", {
55
*stdout(line: string): Operation<void> {
66
console.log(line);
77
},
8+
*stderr(line: string): Operation<void> {
9+
console.log(line);
10+
},
811
});
912

1013
export const { stdout } = stdio.operations;
14+
export const { stderr } = stdio.operations;

packages/server/src/process.ts

Lines changed: 0 additions & 19 deletions
This file was deleted.

0 commit comments

Comments
 (0)