Skip to content

Commit ee4a8aa

Browse files
authored
Merge pull request #5 from samthor/cache-exp
mux
2 parents 11e7bce + ecd0d16 commit ee4a8aa

File tree

11 files changed

+461
-9
lines changed

11 files changed

+461
-9
lines changed

src/cache.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,19 @@ export function once<T = void>(fn: () => T) {
6868
return result;
6969
};
7070
}
71+
72+
/**
73+
* Lazy {@link WeakMap} factory.
74+
*/
75+
export function lazyWeak<W extends Object, V>(fn: (w: W) => V): (w: W) => V {
76+
const wm = new WeakMap<W, V>();
77+
78+
return (w: W): V => {
79+
let value = wm.get(w);
80+
if (value === undefined) {
81+
value = fn(w);
82+
wm.set(w, value);
83+
}
84+
return value;
85+
};
86+
}

src/generator.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { promiseWithResolvers, unresolvedPromise } from './promise.js';
2-
import { AbortSignalArgs } from './types.js';
3-
import type { WorkQueue } from './queue.js';
4-
import { promiseForSignal, symbolAbortSignal } from './internal.js';
1+
import { promiseWithResolvers, unresolvedPromise } from './promise.ts';
2+
import type { AbortSignalArgs } from './types.ts';
3+
import type { WorkQueue } from './queue.ts';
4+
import { promiseForSignal, symbolAbortSignal } from './internal.ts';
55

66
/**
77
* Combines the N passed async generators into a single generator which yields items in order,
@@ -209,3 +209,23 @@ export class AsyncGeneratorCache<T, Y> {
209209
return this._knownValues;
210210
}
211211
}
212+
213+
/**
214+
* Helper which iterates over a {@link AsyncGenerator}.
215+
*
216+
* Allows relatively easy access to the return value.
217+
*/
218+
export async function forEachAsync<T, TReturn, TNext>(
219+
async: AsyncGenerator<T, TReturn, TNext>,
220+
cb: (x: T) => Promise<TNext> | TNext,
221+
): Promise<TReturn> {
222+
let result: IteratorResult<T, TReturn>;
223+
let next: [TNext] | [] = [];
224+
225+
while (!(result = await async.next(...next)).done) {
226+
const internalResult = await cb(result.value);
227+
next = [internalResult];
228+
}
229+
230+
return result.value;
231+
}

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export * from './listener.ts';
1515
export * from './maps.js';
1616
export * from './matcher.js';
1717
export * from './memoize.js';
18-
export * from './multiplex.ts';
18+
export * from './mux.ts';
1919
export * from './notify.ts';
2020
export * from './object-utils.js';
2121
export * from './primitives.js';

src/internal.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { unresolvedPromise } from './promise.js';
1+
import { unresolvedPromise } from './promise.ts';
22

33
/**
44
* Returns a {@link Promise} that rejects with {@link symbolAbortSignal} when aborted.

src/limit.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Configuration for {@link buildLimiter}.
3+
*/
4+
export type LimitConfig = {
5+
/**
6+
* Maximum and initial total of tokens in this limiter.
7+
*
8+
* This must be `>=1` to be valid.
9+
*/
10+
b: number;
11+
12+
/**
13+
* Rate of renewal of tokens, per second.
14+
*
15+
* This must be `>0` to be valid.
16+
*/
17+
r: number;
18+
};
19+
20+
/**
21+
* Fetches a {@link Promise} which resolves when a token is available.
22+
*
23+
* Throws if the {@link AbortSignal} is or becomes aborted before a token is available.
24+
*/
25+
export type Limiter = (signal: AbortSignal) => Promise<void>;
26+
27+
/**
28+
* Builds a rate-limiter which can be called to consume a token.
29+
*
30+
* Throws if the config is invalid (cowardly refuses to make an invalid limiter).
31+
* Without a passed config, uses a default of 100 tokens, renews at 10/sec.
32+
*/
33+
export function buildLimiter(c?: LimitConfig): Limiter {
34+
const maxTokens = (c?.b ?? 100.0) * 1.0;
35+
const rateOfIncrease = (c?.r ?? 10) * 1.0;
36+
37+
if (maxTokens <= 0 || rateOfIncrease <= 0) {
38+
throw new Error(`invalid LimitConfig, no requests allowed`);
39+
}
40+
41+
let last = performance.now();
42+
let tokens = maxTokens;
43+
44+
return async (signal: AbortSignal): Promise<void> => {
45+
for (;;) {
46+
signal.throwIfAborted();
47+
48+
const now = performance.now();
49+
const delta = (now - last) / 1000.0;
50+
const increase = delta * rateOfIncrease;
51+
tokens = Math.min(maxTokens, tokens + increase);
52+
last = now;
53+
const secondsToWait = (1.0 - tokens) / rateOfIncrease;
54+
55+
if (secondsToWait <= 0.0) {
56+
tokens -= 1.0;
57+
return; // success
58+
}
59+
60+
await new Promise<void>((r) => {
61+
const timeout = setTimeout(r, secondsToWait * 1000.0);
62+
signal.addEventListener('abort', () => {
63+
clearTimeout(timeout);
64+
r();
65+
});
66+
});
67+
}
68+
};
69+
}

src/listener.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export type NamedListeners<T extends Record<string, any>> = {
4444
export type SoloListener<V> = {
4545
addListener(listener: (data: V) => void, signal: AbortSignal): void;
4646
dispatch(data: V): boolean;
47-
any(handler: (signal: AbortSignal) => void, signal: AbortSignal);
47+
any(handler: (signal: AbortSignal) => void, signal?: AbortSignal);
4848
hasAny(): boolean;
4949
};
5050

src/mux.ts

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import { soloListener } from './listener.ts';
2+
import { objectNotifyAll, objectWait } from './notify.ts';
3+
import { buildLinkQueue } from './queue.ts';
4+
5+
export type CallToken = Object;
6+
7+
export type MuxTask<Out> = { token: CallToken } & ({ signal: AbortSignal } | { data: Out });
8+
9+
type InternalMuxTask<In, Out> =
10+
| { token: CallToken } & (
11+
| { signal: AbortSignal; respond: (x: SimpleResponse<In>) => void }
12+
| { data: Out }
13+
);
14+
15+
export interface MuxSession<In, Out> {
16+
signal: AbortSignal;
17+
18+
/**
19+
* Are there any active calls as part of this {@link MuxSession}?
20+
* The runner should probably not return if this is the case.
21+
*/
22+
hasActive(): boolean;
23+
24+
/**
25+
* Synchronously return the next task.
26+
* This may return `undefined` even if the session is still active, check {@link hasActive}.
27+
*/
28+
nextTask(): MuxTask<Out> | undefined;
29+
30+
/**
31+
* Wait for another task to become ready.
32+
*/
33+
waitForTask(): Promise<true>;
34+
35+
/**
36+
* Process a message received for this {@link CallToken}.
37+
*/
38+
handle(token: CallToken, data: In): void;
39+
40+
/**
41+
* Process a remote shutdown of this {@link CallToken}.
42+
*/
43+
stop(token: CallToken, error?: Error): void;
44+
}
45+
46+
export type MuxFn<In, Out> = (session: MuxSession<In, Out>) => Promise<any>;
47+
48+
type SimpleResponse<X> = { data?: X; error?: Error };
49+
50+
class MuxSessionImpl<In, Out> implements MuxSession<In, Out> {
51+
readonly controller = new AbortController();
52+
public readonly signal = this.controller.signal;
53+
54+
readonly active = new Map<CallToken, (x: SimpleResponse<In>) => void>();
55+
56+
constructor(private readonly tasks: InternalMuxTask<In, Out>[]) {}
57+
58+
hasActive(): boolean {
59+
return this.active.size !== 0;
60+
}
61+
62+
nextTask(): MuxTask<Out> | undefined {
63+
const t = this.tasks.shift();
64+
if (!t) {
65+
return;
66+
}
67+
68+
if ('respond' in t) {
69+
if (t.signal.aborted) {
70+
return this.nextTask();
71+
}
72+
t.signal.addEventListener('abort', () => this.active.delete(t.token));
73+
this.active.set(t.token, t.respond);
74+
return { token: t.token, signal: t.signal };
75+
} else if (!this.active.has(t.token)) {
76+
// silently drop messages for already-dead calls
77+
return this.nextTask();
78+
}
79+
80+
return t;
81+
}
82+
83+
async waitForTask(): Promise<true> {
84+
while (!this.tasks.length) {
85+
await objectWait(this.tasks);
86+
}
87+
return true;
88+
}
89+
90+
handle(token: CallToken, data: In): void {
91+
const fn = this.active.get(token);
92+
fn?.({ data });
93+
}
94+
95+
stop(token: CallToken, error?: Error): void {
96+
const fn = this.active.get(token);
97+
fn?.({ error });
98+
this.active.delete(token);
99+
}
100+
}
101+
102+
export interface Call<In, Out> {
103+
/**
104+
* Listen to data from the other side.
105+
*/
106+
readonly gen: AsyncGenerator<In, Error | void, void>;
107+
108+
/**
109+
* Send the packet.
110+
* This may queue the packet rather than send immediately.
111+
*/
112+
send(data: Out): void;
113+
}
114+
115+
export type MuxCall<In, Out> = {
116+
call(signal: AbortSignal): Call<In, Out>;
117+
addListener(cb: (error: Error) => void, signal: AbortSignal): void;
118+
};
119+
120+
/**
121+
* Builds a {@link MuxCall}.
122+
*/
123+
export function buildMux<In, Out>(runner: MuxFn<In, Out>): MuxCall<In, Out> {
124+
let activeRunner: Promise<any> | undefined;
125+
const tasks: InternalMuxTask<In, Out>[] = [];
126+
127+
const listener = soloListener<Error>();
128+
129+
const maybeStartRunner = () => {
130+
if (!tasks.length || activeRunner !== undefined) {
131+
return;
132+
}
133+
134+
const session = new MuxSessionImpl(tasks);
135+
const response: { error?: Error } = {};
136+
137+
activeRunner = runner(session)
138+
.catch((error) => {
139+
response.error = error;
140+
listener.dispatch(error);
141+
})
142+
.finally(() => {
143+
activeRunner = undefined;
144+
maybeStartRunner();
145+
146+
// kill all still-active because the runner shut down
147+
session.active.forEach((cb) => cb(response));
148+
});
149+
};
150+
151+
const call = (signal: AbortSignal): Call<In, Out> => {
152+
if (signal.aborted) {
153+
const gen = (async function* () {
154+
try {
155+
signal.throwIfAborted();
156+
} catch (e: any) {
157+
return e instanceof Error ? e : new Error(e);
158+
}
159+
})();
160+
return { send() {}, gen };
161+
}
162+
163+
const queue = buildLinkQueue<SimpleResponse<In>>();
164+
const listener = queue.join(signal);
165+
166+
const gen = (async function* () {
167+
for (;;) {
168+
const m = await listener.next();
169+
if (m?.data !== undefined) {
170+
yield m?.data as In;
171+
}
172+
if (m?.error || m?.data === undefined) {
173+
return m?.error as Error | void;
174+
}
175+
}
176+
})();
177+
178+
// something object-like
179+
const token: CallToken = new Object();
180+
tasks.push({ token, signal, respond: queue.push.bind(queue) });
181+
objectNotifyAll(tasks);
182+
183+
const send = (out: Out) => {
184+
if (!signal.aborted) {
185+
tasks.push({ token, data: out });
186+
}
187+
};
188+
189+
maybeStartRunner();
190+
191+
return { gen, send };
192+
};
193+
194+
return { call, addListener: listener.addListener };
195+
}

src/queue.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { promiseWithResolvers } from './promise.js';
2-
import { promiseVoidForSignal } from './signal.js';
1+
import { promiseWithResolvers } from './promise.ts';
2+
import { promiseVoidForSignal } from './signal.ts';
33

44
export class WorkQueue<T> {
55
private pending: T[] = [];

0 commit comments

Comments
 (0)