Skip to content

Commit 1f0fe69

Browse files
committed
feat(async): add repeatable helper function
1 parent 85fd83b commit 1f0fe69

File tree

6 files changed

+155
-1
lines changed

6 files changed

+155
-1
lines changed

.github/FUNDING.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# These are supported funding model platforms
22

3-
github: [lambdalisue] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
3+
github: [
4+
lambdalisue,
5+
] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
46
patreon: # Replace with a single Patreon username
57
open_collective: # Replace with a single Open Collective username
68
ko_fi: # Replace with a single Ko-fi username

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,27 @@ const iter = pipe(
11151115
console.log(await Array.fromAsync(iter)); // [1, 2, 3, 1, 2, 3]
11161116
```
11171117

1118+
### repeatable
1119+
1120+
Transform an async iterable into a repeatable async iterable. It caches the
1121+
values of the original iterable so that it can be replayed. Useful for replaying
1122+
the costly async iterable.
1123+
1124+
```ts
1125+
import { repeatable } from "@core/iterutil/async/repeatable";
1126+
import { assertEquals } from "@std/assert";
1127+
1128+
const origin = (async function* () {
1129+
yield 1;
1130+
yield 2;
1131+
yield 3;
1132+
})();
1133+
const iter = repeatable(origin);
1134+
assertEquals(await Array.fromAsync(iter), [1, 2, 3]);
1135+
assertEquals(await Array.fromAsync(iter), [1, 2, 3]); // iter can be replayed
1136+
assertEquals(await Array.fromAsync(origin), []); // origin is already consumed
1137+
```
1138+
11181139
### some
11191140

11201141
Returns true if at least one element in the iterable satisfies the provided

async/mod.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export * from "./pairwise.ts";
2121
export * from "./partition.ts";
2222
export * from "./reduce.ts";
2323
export * from "./repeat.ts";
24+
export * from "./repeatable.ts";
2425
export * from "./some.ts";
2526
export * from "./take.ts";
2627
export * from "./take_while.ts";

async/repeatable.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
const done = Symbol("done");
2+
3+
export function repeatable<T>(iterable: AsyncIterable<T>): AsyncIterable<T> {
4+
const cache: T[] = [];
5+
let buildingCache: Promise<void> | undefined = undefined;
6+
let pendingResolvers: ((value: T | typeof done) => void)[] = [];
7+
let finished = false;
8+
9+
return {
10+
[Symbol.asyncIterator]: async function* () {
11+
yield* cache;
12+
13+
if (!finished) {
14+
if (!buildingCache) {
15+
buildingCache = (async () => {
16+
try {
17+
for await (const item of iterable) {
18+
cache.push(item);
19+
pendingResolvers.forEach((resolve) => resolve(item));
20+
pendingResolvers = [];
21+
}
22+
} finally {
23+
finished = true;
24+
pendingResolvers.forEach((resolve) => resolve(done));
25+
pendingResolvers = [];
26+
}
27+
})();
28+
}
29+
}
30+
let index = cache.length;
31+
while (!finished || index < cache.length) {
32+
if (index < cache.length) {
33+
yield cache[index++];
34+
} else {
35+
const nextItem = await new Promise<T | typeof done>((resolve) => {
36+
pendingResolvers.push(resolve);
37+
});
38+
if (nextItem !== done) {
39+
yield nextItem;
40+
index++;
41+
}
42+
}
43+
}
44+
},
45+
};
46+
}

async/repeatable_test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { test } from "@cross/test";
2+
import { delay } from "@std/async/delay";
3+
import { assertEquals } from "@std/assert";
4+
import { repeatable } from "./repeatable.ts";
5+
6+
async function* delayedGenerator(sideEffect?: () => void) {
7+
yield 1;
8+
await delay(100);
9+
yield 2;
10+
await delay(100);
11+
yield 3;
12+
sideEffect?.();
13+
}
14+
15+
await test("repeatable should return the same sequence on multiple iterations", async () => {
16+
const input = delayedGenerator();
17+
const it = repeatable(input);
18+
19+
const result1 = await Array.fromAsync(it);
20+
const result2 = await Array.fromAsync(it);
21+
22+
assertEquals(result1, [1, 2, 3], "First iteration");
23+
assertEquals(result2, [1, 2, 3], "First iteration");
24+
});
25+
26+
await test("repeatable should call internal iterator only once", async () => {
27+
let called = 0;
28+
const input = delayedGenerator(() => called++);
29+
const it = repeatable(input);
30+
31+
const result1 = await Array.fromAsync(it);
32+
const result2 = await Array.fromAsync(it);
33+
34+
assertEquals(result1, [1, 2, 3], "First iteration");
35+
assertEquals(result2, [1, 2, 3], "First iteration");
36+
assertEquals(called, 1, "Internal iterator called only once");
37+
});
38+
39+
await test("repeatable should work correctly when consumed partially and then fully", async () => {
40+
const input = delayedGenerator();
41+
const it = repeatable(input);
42+
43+
const result1: number[] = [];
44+
const firstIter = it[Symbol.asyncIterator]();
45+
46+
result1.push((await firstIter.next()).value); // 1
47+
48+
const result2 = await Array.fromAsync(it);
49+
50+
result1.push((await firstIter.next()).value); // 2
51+
result1.push((await firstIter.next()).value); // 3
52+
53+
assertEquals(result1, [1, 2, 3], "First iteration");
54+
assertEquals(result2, [1, 2, 3], "First iteration");
55+
});
56+
57+
await test("repeatable should cache values and return them immediately on subsequent iterations", async () => {
58+
const input = delayedGenerator();
59+
const it = repeatable(input);
60+
61+
const start = performance.now();
62+
const result1 = await Array.fromAsync(it);
63+
const end1 = performance.now();
64+
const timeTaken1 = end1 - start;
65+
66+
const start2 = performance.now();
67+
const result2 = await Array.fromAsync(it);
68+
const end2 = performance.now();
69+
const timeTaken2 = end2 - start2;
70+
71+
assertEquals(result1, [1, 2, 3], "First iteration");
72+
assertEquals(result2, [1, 2, 3], "Second iteration");
73+
74+
console.debug("Time taken for first consume:", timeTaken1);
75+
console.debug("Time taken for second consume (with cache):", timeTaken2);
76+
77+
if (timeTaken2 > timeTaken1 / 10) {
78+
throw new Error(
79+
"Second consume took too long, cache might not be working.",
80+
);
81+
}
82+
});

deno.jsonc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
"./async/partition": "./async/partition.ts",
2828
"./async/reduce": "./async/reduce.ts",
2929
"./async/repeat": "./async/repeat.ts",
30+
"./async/repeatable": "./async/repeatable.ts",
3031
"./async/some": "./async/some.ts",
3132
"./async/take": "./async/take.ts",
3233
"./async/take-while": "./async/take_while.ts",
@@ -261,6 +262,7 @@
261262
"@core/unknownutil": "jsr:@core/unknownutil@^4.0.1",
262263
"@cross/test": "jsr:@cross/test@^0.0.9",
263264
"@std/assert": "jsr:@std/assert@^1.0.2",
265+
"@std/async": "jsr:@std/async@^1.0.6",
264266
"@std/jsonc": "jsr:@std/jsonc@^1.0.0",
265267
"@std/path": "jsr:@std/path@^1.0.2",
266268
"@std/testing": "jsr:@std/testing@^1.0.0"

0 commit comments

Comments
 (0)