Skip to content

Commit adf3dae

Browse files
authored
fix: change sequential approach to parallel for Iterator first page (#3402)
The `values()` and `entries()` methods on `KeyValueStoreClient` in `@crawlee/memory-storage` previously fetched records sequentially in a loop when resolving the first page as a Promise. This PR replaces the sequential loop with concurrent fetching using `p-limit` (concurrency cap of 25) and `Promise.allSettled`, reducing total wait time for the bulk closes #3395
1 parent 136d368 commit adf3dae

File tree

3 files changed

+26
-18
lines changed

3 files changed

+26
-18
lines changed

packages/memory-storage/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"fs-extra": "^11.0.0",
5757
"json5": "^2.2.3",
5858
"mime-types": "^2.1.35",
59+
"p-limit": "^3.1.0",
5960
"proper-lockfile": "^4.1.2",
6061
"tslib": "^2.4.0"
6162
}

packages/memory-storage/src/resource-clients/key-value-store.ts

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type * as storage from '@crawlee/types';
77
import { s } from '@sapphire/shapeshift';
88
import { move } from 'fs-extra';
99
import mime from 'mime-types';
10+
import pLimit from 'p-limit';
1011

1112
import { scheduleBackgroundTask } from '../background-handler';
1213
import { maybeParseBody } from '../body-parser';
@@ -19,6 +20,7 @@ import { createKeyList, createKeyStringList, isBuffer, isStream } from '../utils
1920
import { BaseClient } from './common/base-client';
2021

2122
const DEFAULT_LOCAL_FILE_EXTENSION = 'bin';
23+
const GET_RECORD_CONCURRENCY = 25;
2224

2325
export interface KeyValueStoreClientOptions {
2426
name?: string;
@@ -173,15 +175,15 @@ export class KeyValueStoreClient extends BaseClient {
173175

174176
const firstPageValuesPromise = (async () => {
175177
const firstPageKeys = await firstPageKeysPromise;
176-
const values: unknown[] = [];
177-
for (const item of firstPageKeys.items) {
178-
if (limit !== undefined && values.length >= limit) break;
179-
const record = await getRecord(item.key);
180-
if (record) {
181-
values.push(record.value);
182-
}
183-
}
184-
return values;
178+
const keysToFetch = limit !== undefined ? firstPageKeys.items.slice(0, limit) : firstPageKeys.items;
179+
const limiter = pLimit(GET_RECORD_CONCURRENCY);
180+
const results = await Promise.allSettled(keysToFetch.map((item) => limiter(() => getRecord(item.key))));
181+
return results
182+
.filter(
183+
(r): r is PromiseFulfilledResult<storage.KeyValueStoreRecord> =>
184+
r.status === 'fulfilled' && r.value !== undefined,
185+
)
186+
.map((r) => r.value.value);
185187
})();
186188

187189
async function* asyncGenerator(): AsyncGenerator<unknown> {
@@ -224,15 +226,19 @@ export class KeyValueStoreClient extends BaseClient {
224226

225227
const firstPageEntriesPromise = (async () => {
226228
const firstPageKeys = await firstPageKeysPromise;
227-
const entries: [string, unknown][] = [];
228-
for (const item of firstPageKeys.items) {
229-
if (limit !== undefined && entries.length >= limit) break;
230-
const record = await getRecord(item.key);
231-
if (record) {
232-
entries.push([item.key, record.value]);
233-
}
234-
}
235-
return entries;
229+
const keysToFetch = limit !== undefined ? firstPageKeys.items.slice(0, limit) : firstPageKeys.items;
230+
const limiter = pLimit(GET_RECORD_CONCURRENCY);
231+
const results = await Promise.allSettled(
232+
keysToFetch.map((item) =>
233+
limiter(() => getRecord(item.key).then((record) => ({ key: item.key, record }))),
234+
),
235+
);
236+
return results
237+
.filter(
238+
(r): r is PromiseFulfilledResult<{ key: string; record: storage.KeyValueStoreRecord }> =>
239+
r.status === 'fulfilled' && r.value.record !== undefined,
240+
)
241+
.map((r) => [r.value.key, r.value.record.value] as [string, unknown]);
236242
})();
237243

238244
async function* asyncGenerator(): AsyncGenerator<[string, unknown]> {

yarn.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,7 @@ __metadata:
991991
fs-extra: "npm:^11.0.0"
992992
json5: "npm:^2.2.3"
993993
mime-types: "npm:^2.1.35"
994+
p-limit: "npm:^3.1.0"
994995
proper-lockfile: "npm:^4.1.2"
995996
tslib: "npm:^2.4.0"
996997
languageName: unknown

0 commit comments

Comments
 (0)