Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 44 additions & 21 deletions src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ const signalCache = new WeakMap<AbortSignal, Promise<void>>();

type Fallback<T, F> = T extends undefined ? F : T;

type Writeable<T> = { -readonly [P in keyof T]: T[P] };
type Entries<T> = {
[K in keyof T]: [K, T[K]];
}[keyof T][];

/**
* Channel is borrowed from Go.
Expand Down Expand Up @@ -78,12 +80,21 @@ type SelectType<Q> = Q extends SelectRequest<infer X> ? X : never;

export type SelectRequest<T> = { [key: string | symbol]: ReadChannel<T> };

export type SelectResult<T> = Readonly<{
key: string | symbol;
ch: ReadChannel<T>;
m: MessageType<ReadChannel<T>> | undefined;
closed: boolean;
}>;
export type SelectResult<T extends SelectRequest<V>, V = SelectType<T>> = {
[TKey in keyof T]: Readonly<{
key: TKey;
ch: NonNullable<T[TKey]>;
m: MessageType<T[TKey]>;
closed: boolean;
}>;
}[keyof T];

export type SelectOption<T extends SelectRequest<V>, V = SelectType<T>> = {
[TKey in keyof T]: Readonly<{
key: TKey;
ch: NonNullable<T[TKey]>;
}>;
}[keyof T];

/**
* Waits for the first {@link Channel} that is ready based on key.
Expand All @@ -93,7 +104,7 @@ export type SelectResult<T> = Readonly<{
*/
export function select<T extends SelectRequest<V>, V = SelectType<T>>(
o: T,
): Promise<SelectResult<V>>;
): Promise<SelectResult<T, V>>;

/**
* Waits for the first {@link Channel} that is ready based on key.
Expand All @@ -105,24 +116,24 @@ export function select<T extends SelectRequest<V>, V = SelectType<T>>(
export function select<T extends SelectRequest<V>, V = SelectType<T>>(
o: T,
signal: AbortSignal,
): Promise<SelectResult<V> | undefined>;
): Promise<SelectResult<T, V> | undefined>;

export function select<T extends SelectRequest<V>, V = SelectType<T>>(
o: T,
signal?: AbortSignal,
): Promise<SelectResult<V> | undefined> {
): Promise<SelectResult<T, V> | undefined> {
if (signal?.aborted) {
return Promise.resolve().then(() => undefined);
}

const sync = selectDefault(o);
const sync = selectDefault<T, V>(o);
if (sync !== undefined) {
// nb. load-bearing extra Promise.resolve()
return Promise.resolve().then(() => sync);
}

// basically the key type of SelectResult
const options: Promise<Writeable<SelectResult<V>> | undefined>[] = [];
const options: Promise<SelectOption<T, V> | undefined>[] = [];

if (signal !== undefined) {
let signalPromise: Promise<void> | undefined = signalCache.get(signal);
Expand All @@ -135,16 +146,24 @@ export function select<T extends SelectRequest<V>, V = SelectType<T>>(
options.push(signalPromise.then(() => undefined));
}

Object.entries(o).forEach(([key, ch]) => {
// Assertion is needed to provide object entry key value pairs
const entries = Object.entries(o) as Entries<T>;
entries.forEach(([key, ch]) => {
if (ch) {
options.push(ch.wait({ key, ch, m: undefined, closed: false }));
options.push(ch.wait({ key, ch }));
}
});
const out = Promise.race(options)
.then((choice) => {
.then((choice): SelectResult<T, V> | undefined => {
if (choice) {
choice.m = choice.ch.next();
choice.closed = choice.ch.closed;
const { key, ch } = choice;
return {
key,
ch,
// wait() resolving implies that ch.next() will not return undefined
m: choice.ch.next() as MessageType<T[keyof T]>,
closed: choice.ch.closed,
};
}
return choice;
})
Expand All @@ -159,11 +178,15 @@ export function select<T extends SelectRequest<V>, V = SelectType<T>>(
*
* This uses JS' default object ordering: integers >= 0 in order, all others, symbols.
*/
export function selectDefault<T>(o: Partial<SelectRequest<T>>): SelectResult<T> | undefined {
for (const key of Reflect.ownKeys(o)) {
const ch = o[key];
export function selectDefault<T extends SelectRequest<V>, V = SelectType<T>>(
o: Partial<T>,
): SelectResult<T, V> | undefined {
for (const pendingKey of Reflect.ownKeys(o)) {
// Assertion is needed to maintain generic type pairing
const [key, ch] = [pendingKey, o[pendingKey]] as Entries<T>[number];
if (ch?.pending()) {
return { key, ch, m: ch.next(), closed: ch.closed };
// pending() returning true implies that ch.next() will not return undefined
return { key, ch, m: ch.next() as MessageType<T[keyof T]>, closed: ch.closed };
}
}
return undefined;
Expand Down
17 changes: 10 additions & 7 deletions test/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ test('select', async () => {
ch1.push('abc');

const task = async () => {
const out = await select({ ch1: ch1 });
if (out.m) {
results.push(out.m);
}
const out = await select({ ch1 });
out.m satisfies string; // must satisfy string, we only gave it one channel
results.push(out.m);
};

const p1 = task();
Expand Down Expand Up @@ -154,16 +153,20 @@ test('gen', async () => {

test('signal', async () => {
const a = newChannel<string>();
const b = newChannel<number>();
const c = new AbortController();
c.abort();

const out = await select({ a }, c.signal);
const out = await select({ a, b }, c.signal);
switch (out?.key) {
case undefined:
break;
case 'b':
out.m satisfies number;
throw 'XXX should not run';
case 'a':
out.m satisfies string | undefined;
// shouldn't run, fall-through
out.m satisfies string;
throw 'XXX should not run';
default:
assert.fail('bad');
}
Expand Down