Skip to content

Commit 2658610

Browse files
authored
perf: add lazyIterablePromise to kvs iterators (apify#3413)
Separates the two consumption paths of KeyValueStore.values() and entries(): - await kvStore.values() (promise path) — bulk fetches all records in parallel, same as before - for await (const v of kvStore.values()) (iterator path) — now fetches records one-by-one sequentially, no longer triggering the bulk fetch Previously, the bulk fetch fired immediately via an IIFE regardless of how the result was consumed, meaning iterating still paid the cost of fetching the entire first page upfront. The new createLazyIterablePromise utility in utils.ts returns an object that acts as both a Promise (triggers bulk fetch only when .then/await is called) and an AsyncIterable (streams records lazily). Files changed: 3 files, +210 / -39 lines - key-value-store.ts — rewired values() and entries() to use the new utility - utils.ts — added createLazyIterablePromise - async-iteration.test.ts — tests for lazy behavior
1 parent 070c258 commit 2658610

File tree

3 files changed

+210
-39
lines changed

3 files changed

+210
-39
lines changed

packages/memory-storage/src/resource-clients/key-value-store.ts

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { DEFAULT_API_PARAM_LIMIT, StorageTypes } from '../consts';
1616
import type { StorageImplementation } from '../fs/common';
1717
import { createKeyValueStorageImplementation } from '../fs/key-value-store';
1818
import type { MemoryStorage } from '../index';
19-
import { createKeyList, createKeyStringList, isBuffer, isStream } from '../utils';
19+
import { createKeyList, createKeyStringList, createLazyIterablePromise, isBuffer, isStream } from '../utils';
2020
import { BaseClient } from './common/base-client';
2121

2222
const DEFAULT_LOCAL_FILE_EXTENSION = 'bin';
@@ -173,33 +173,32 @@ export class KeyValueStoreClient extends BaseClient {
173173

174174
const firstPageKeysPromise = keys(options);
175175

176-
const firstPageValuesPromise = (async () => {
176+
const getFirstPageValues = async () => {
177177
const firstPageKeys = await firstPageKeysPromise;
178178
const keysToFetch = limit !== undefined ? firstPageKeys.items.slice(0, limit) : firstPageKeys.items;
179179
const limiter = pLimit(GET_RECORD_CONCURRENCY);
180-
const results = await Promise.allSettled(keysToFetch.map((item) => limiter(() => getRecord(item.key))));
181-
return results
182-
.filter(
183-
(r): r is PromiseFulfilledResult<storage.KeyValueStoreRecord> =>
184-
r.status === 'fulfilled' && r.value !== undefined,
185-
)
186-
.map((r) => r.value.value);
187-
})();
180+
const results = await Promise.all(keysToFetch.map((item) => limiter(() => getRecord(item.key))));
181+
return results.filter((r) => r !== undefined).map((r) => r.value);
182+
};
188183

189184
async function* asyncGenerator(): AsyncGenerator<unknown> {
190-
// Reuse the already-fetched first page values
191-
const firstPageValues = await firstPageValuesPromise;
185+
const firstPageKeys = await firstPageKeysPromise;
192186
let yielded = 0;
193187

194-
for (const value of firstPageValues) {
188+
for (const item of firstPageKeys.items) {
195189
if (limit !== undefined && yielded >= limit) return;
196-
yield value;
197-
yielded++;
190+
const record = await getRecord(item.key);
191+
if (record) {
192+
yield record.value;
193+
yielded++;
194+
}
198195
}
199196

200-
const firstPageKeys = await firstPageKeysPromise;
201197
if (firstPageKeys.nextExclusiveStartKey && (limit === undefined || yielded < limit)) {
202-
for await (const key of keys({ ...options, exclusiveStartKey: firstPageKeys.nextExclusiveStartKey })) {
198+
for await (const key of keys({
199+
...options,
200+
exclusiveStartKey: firstPageKeys.nextExclusiveStartKey,
201+
})) {
203202
if (limit !== undefined && yielded >= limit) return;
204203
const record = await getRecord(key);
205204
if (record) {
@@ -210,9 +209,7 @@ export class KeyValueStoreClient extends BaseClient {
210209
}
211210
}
212211

213-
return Object.defineProperty(firstPageValuesPromise, Symbol.asyncIterator, {
214-
value: asyncGenerator,
215-
}) as AsyncIterable<unknown> & Promise<unknown[]>;
212+
return createLazyIterablePromise(getFirstPageValues, asyncGenerator);
216213
}
217214

218215
entries(
@@ -224,36 +221,38 @@ export class KeyValueStoreClient extends BaseClient {
224221

225222
const firstPageKeysPromise = keys(options);
226223

227-
const firstPageEntriesPromise = (async () => {
224+
const getFirstPageEntries = async () => {
228225
const firstPageKeys = await firstPageKeysPromise;
229226
const keysToFetch = limit !== undefined ? firstPageKeys.items.slice(0, limit) : firstPageKeys.items;
230227
const limiter = pLimit(GET_RECORD_CONCURRENCY);
231-
const results = await Promise.allSettled(
228+
const results = await Promise.all(
232229
keysToFetch.map((item) =>
233230
limiter(() => getRecord(item.key).then((record) => ({ key: item.key, record }))),
234231
),
235232
);
236233
return results
237-
.filter(
238-
(r): r is PromiseFulfilledResult<{ key: string; record: storage.KeyValueStoreRecord }> =>
239-
r.status === 'fulfilled' && r.value.record !== undefined,
240-
)
241-
.map((r) => [r.value.key, r.value.record.value] as [string, unknown]);
242-
})();
234+
.filter((r) => r.record !== undefined)
235+
.map((r) => [r.key, r.record!.value] as [string, unknown]);
236+
};
243237

244238
async function* asyncGenerator(): AsyncGenerator<[string, unknown]> {
245-
const firstPageEntries = await firstPageEntriesPromise;
239+
const firstPageKeys = await firstPageKeysPromise;
246240
let yielded = 0;
247241

248-
for (const entry of firstPageEntries) {
242+
for (const item of firstPageKeys.items) {
249243
if (limit !== undefined && yielded >= limit) return;
250-
yield entry;
251-
yielded++;
244+
const record = await getRecord(item.key);
245+
if (record) {
246+
yield [item.key, record.value];
247+
yielded++;
248+
}
252249
}
253250

254-
const firstPageKeys = await firstPageKeysPromise;
255251
if (firstPageKeys.nextExclusiveStartKey && (limit === undefined || yielded < limit)) {
256-
for await (const key of keys({ ...options, exclusiveStartKey: firstPageKeys.nextExclusiveStartKey })) {
252+
for await (const key of keys({
253+
...options,
254+
exclusiveStartKey: firstPageKeys.nextExclusiveStartKey,
255+
})) {
257256
if (limit !== undefined && yielded >= limit) return;
258257
const record = await getRecord(key);
259258
if (record) {
@@ -264,9 +263,7 @@ export class KeyValueStoreClient extends BaseClient {
264263
}
265264
}
266265

267-
return Object.defineProperty(firstPageEntriesPromise, Symbol.asyncIterator, {
268-
value: asyncGenerator,
269-
}) as AsyncIterable<[string, unknown]> & Promise<[string, unknown][]>;
266+
return createLazyIterablePromise(getFirstPageEntries, asyncGenerator);
270267
}
271268

272269
private async listKeysPage(

packages/memory-storage/src/utils.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,42 @@ export function createPaginatedEntryList<Data>(
222222
value: asyncGenerator,
223223
}) as AsyncIterable<[number, Data]> & Promise<storage.PaginatedList<[number, Data]>>;
224224
}
225+
226+
/**
227+
* Creates an object that acts as both a lazy Promise and an AsyncIterable.
228+
* - When awaited, it triggers `promiseFactory` (bulk fetch, cached after first call).
229+
* - When iterated with `for await...of`, it uses `iteratorFactory` (streaming, no bulk fetch).
230+
*/
231+
export function createLazyIterablePromise<TPromise, TElement>(
232+
promiseFactory: () => Promise<TPromise>,
233+
iteratorFactory: () => AsyncGenerator<TElement>,
234+
): AsyncIterable<TElement> & Promise<TPromise> {
235+
let cached: Promise<TPromise> | null = null;
236+
function getOrCreate(): Promise<TPromise> {
237+
if (!cached) {
238+
cached = promiseFactory();
239+
}
240+
return cached;
241+
}
242+
243+
const result = {
244+
then<TResult1 = TPromise, TResult2 = never>(
245+
onfulfilled?: ((value: TPromise) => TResult1 | PromiseLike<TResult1>) | null,
246+
onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | null,
247+
): Promise<TResult1 | TResult2> {
248+
return getOrCreate().then(onfulfilled, onrejected);
249+
},
250+
catch<TResult = never>(
251+
onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null,
252+
): Promise<TPromise | TResult> {
253+
return getOrCreate().catch(onrejected);
254+
},
255+
finally(onfinally?: (() => void) | null): Promise<TPromise> {
256+
return getOrCreate().finally(onfinally);
257+
},
258+
[Symbol.asyncIterator]: iteratorFactory,
259+
[Symbol.toStringTag]: 'Promise' as const,
260+
};
261+
262+
return result as AsyncIterable<TElement> & Promise<TPromise>;
263+
}

packages/memory-storage/test/async-iteration.test.ts

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import { rm } from 'node:fs/promises';
2-
import { resolve } from 'node:path';
2+
import path from 'node:path';
33

44
import { MemoryStorage } from '@crawlee/memory-storage';
55
import type { DatasetClient, KeyValueStoreClient } from '@crawlee/types';
6+
import { vi } from 'vitest';
7+
8+
import { createLazyIterablePromise } from '../src/utils';
69

710
describe('Async iteration support', () => {
8-
const localDataDirectory = resolve(__dirname, './tmp/async-iteration');
11+
const localDataDirectory = path.resolve(__dirname, './tmp/async-iteration');
912
const storage = new MemoryStorage({
1013
localDataDirectory,
1114
persistStorage: false,
@@ -429,4 +432,136 @@ describe('Async iteration support', () => {
429432
}
430433
});
431434
});
435+
436+
describe('createLazyIterablePromise', () => {
437+
test('promise factory is not called until awaited', async () => {
438+
const promiseFactory = vi.fn(() => Promise.resolve([1, 2, 3]));
439+
async function* iteratorFactory() {
440+
yield 1;
441+
yield 2;
442+
yield 3;
443+
}
444+
445+
const result = createLazyIterablePromise<number[], number>(promiseFactory, iteratorFactory);
446+
447+
// Factory should not be called yet
448+
expect(promiseFactory).not.toHaveBeenCalled();
449+
450+
// Now await it
451+
const values = await result;
452+
expect(promiseFactory).toHaveBeenCalledTimes(1);
453+
expect(values).toStrictEqual([1, 2, 3]);
454+
});
455+
456+
test('iterating does not trigger the promise factory', async () => {
457+
const promiseFactory = vi.fn(() => Promise.resolve([1, 2, 3]));
458+
async function* iteratorFactory() {
459+
yield 10;
460+
yield 20;
461+
yield 30;
462+
}
463+
464+
const result = createLazyIterablePromise<number[], number>(promiseFactory, iteratorFactory);
465+
466+
const items: number[] = [];
467+
for await (const item of result) {
468+
items.push(item);
469+
}
470+
471+
expect(items).toStrictEqual([10, 20, 30]);
472+
expect(promiseFactory).not.toHaveBeenCalled();
473+
});
474+
475+
test('promise factory result is cached across multiple awaits', async () => {
476+
const promiseFactory = vi.fn(() => Promise.resolve([1, 2, 3]));
477+
async function* iteratorFactory() {
478+
yield 1;
479+
}
480+
481+
const result = createLazyIterablePromise<number[], number>(promiseFactory, iteratorFactory);
482+
483+
await result;
484+
await result;
485+
await result;
486+
487+
expect(promiseFactory).toHaveBeenCalledTimes(1);
488+
});
489+
});
490+
491+
describe('KeyValueStore.values lazy promise behavior', () => {
492+
let kvStore: KeyValueStoreClient;
493+
494+
beforeAll(async () => {
495+
const { id } = await storage.keyValueStores().getOrCreate('lazy-test-kvs-values');
496+
kvStore = storage.keyValueStore(id);
497+
498+
for (let i = 0; i < 5; i++) {
499+
await kvStore.setRecord({ key: `key-${i}`, value: { data: i } });
500+
}
501+
});
502+
503+
test('calling values() does not immediately fetch records', async () => {
504+
const getRecordSpy = vi.spyOn(kvStore, 'getRecord');
505+
506+
// Call values() but do not await or iterate
507+
const result = kvStore.values();
508+
509+
// getRecord should not have been called yet (lazy)
510+
// Note: keys may be fetched eagerly, but record values should not
511+
// We need to wait a tick to ensure no async work triggered getRecord
512+
await new Promise((resolve) => setTimeout(resolve, 50));
513+
expect(getRecordSpy).not.toHaveBeenCalled();
514+
515+
// Clean up: consume the result to avoid dangling promises
516+
await result;
517+
getRecordSpy.mockRestore();
518+
});
519+
520+
test('iterating and awaiting produce the same values', async () => {
521+
const awaited = await kvStore.values();
522+
523+
const iterated: unknown[] = [];
524+
for await (const value of kvStore.values()) {
525+
iterated.push(value);
526+
}
527+
528+
expect(awaited).toStrictEqual(iterated);
529+
});
530+
});
531+
532+
describe('KeyValueStore.entries lazy promise behavior', () => {
533+
let kvStore: KeyValueStoreClient;
534+
535+
beforeAll(async () => {
536+
const { id } = await storage.keyValueStores().getOrCreate('lazy-test-kvs-entries');
537+
kvStore = storage.keyValueStore(id);
538+
539+
for (let i = 0; i < 5; i++) {
540+
await kvStore.setRecord({ key: `key-${i}`, value: { data: i } });
541+
}
542+
});
543+
544+
test('calling entries() does not immediately fetch records', async () => {
545+
const getRecordSpy = vi.spyOn(kvStore, 'getRecord');
546+
547+
const result = kvStore.entries();
548+
549+
await new Promise((resolve) => setTimeout(resolve, 50));
550+
expect(getRecordSpy).not.toHaveBeenCalled();
551+
552+
await result;
553+
getRecordSpy.mockRestore();
554+
});
555+
556+
test('iterating and awaiting produce the same entries', async () => {
557+
const awaited = await kvStore.entries();
558+
559+
const iterated: [string, unknown][] = [];
560+
for await (const entry of kvStore.entries()) {
561+
iterated.push(entry);
562+
}
563+
564+
expect(awaited).toStrictEqual(iterated);
565+
});
566+
});
432567
});

0 commit comments

Comments
 (0)