Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/change-pr-345.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@simulacrum/server": minor
---

Bump server to `effection` v4. With the tight dependency fitting into an `effection` runtime, bumping with a minor despite no specific breaking changes directly in this package. Additionally, we swapped to `@effectionx/process` and pulled in some other helpers. This was to prevent edge cases which were noted as part of the upgrade.
142 changes: 128 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
},
"devDependencies": {
"@arethetypeswrong/core": "^0.18.2",
"effection": "^4.0.0",
"publint": "^0.3.13",
"tsdown": "^0.15.4",
"tsx": "^4.20.6",
Expand Down
2 changes: 1 addition & 1 deletion packages/github-api/sync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { main, Operation, until } from "effection";
import { main, type Operation, until } from "effection";
import fs from "fs/promises";
import path from "path";

Expand Down
9 changes: 5 additions & 4 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@
],
"scripts": {
"clean": "echo skip",
"test": "node --import tsx --test test/*.test.ts",
"test": "node --test-timeout=60000 --import tsx --test test/*.test.ts",
"prepack": "npm run build",
"build": "tsdown",
"lint": "echo noop",
"tsc": "tsc --noEmit",
"test:service-main": "node --import tsx ./test/service-main.ts"
},
"dependencies": {
"effection": "^3.6.0",
"@effectionx/tinyexec": "^0.2.0",
"@effectionx/context-api": "^0.1.0"
"effection": "^4.0.0",
"@effectionx/context-api": "^0.2.1",
"@effectionx/process": "^0.6.2",
"@effectionx/timebox": "^0.3.1"
},
"devDependencies": {},
"exports": {
Expand Down
83 changes: 83 additions & 0 deletions packages/server/src/createReplaySignal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { Resolve, Subscription } from "effection";
import { action, resource } from "effection";

export function createReplaySignal<T, TClose>() {
const subscribers = new Set<Subscription<T, TClose>>();
// single shared durable queue storage
const queue = createDurableQueue<T, TClose>();

// each subscriber gets its own iterator over the shared items by
// calling `queue.subscribe()` which returns a Stream
const subscribe = resource<Subscription<T, TClose>>(function* (provide) {
const queued = queue.stream();
subscribers.add(queued);

try {
yield* provide({ next: queued.next });
} finally {
subscribers.delete(queued);
}
});

function send(value: T) {
queue.add(value);
}

function close(value?: TClose) {
queue.close(value);
}

return { ...subscribe, send, close };
}

function createDurableQueue<T, TClose = never>() {
type Item = IteratorResult<T, TClose>;

const items: Item[] = [];

// a set of active subscribers; each subscription has its own iterator
// and its own waiting notifier set
const subscribers = new Set<{
notify: Set<Resolve<Item>>;
}>();

function enqueue(item: Item) {
items.push(item);
for (const sub of subscribers) {
if (sub.notify.size > 0) {
const [resolve] = sub.notify;
// use resolve from yield* action to notify waiting subscribers
resolve(item);
}
}
}

function stream(): Subscription<T, TClose> {
const iter = items[Symbol.iterator]();
const notify = new Set<Resolve<Item>>();
const sub = { notify };
subscribers.add(sub);

return {
*next() {
const item = iter.next().value;
// item will be `undefined` when we've iterated to the end of the
// current `items` array; in that case we wait for new items to be
// enqueued and the resolve will be called with the new `Item`.
if (item !== undefined) {
return item;
}
return yield* action<Item>((resolve) => {
notify.add(resolve);
return () => notify.delete(resolve);
});
},
};
}

return {
add: (value: T) => enqueue({ done: false, value }),
close: (value?: TClose) => enqueue({ done: true, value: value as TClose }),
stream,
};
}
4 changes: 4 additions & 0 deletions packages/server/src/logging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export const stdio = createApi("@simulacrum/logging", {
*stdout(line: string): Operation<void> {
console.log(line);
},
*stderr(line: string): Operation<void> {
console.log(line);
},
});

export const { stdout } = stdio.operations;
export const { stderr } = stdio.operations;
19 changes: 0 additions & 19 deletions packages/server/src/process.ts

This file was deleted.

Loading
Loading