Skip to content

Commit c58ce6f

Browse files
authored
feat: Make storage clients list methods return value also be asyncIterator of relevant data (#803)
### Description - Extends the return value of storage list methods by `asyncIterator` that can be used to iterate over individual items: - `DatasetClient.listItems` - `KeyValueStoreClient.listKeys` - `RequestQueueClient.listRequests` - Generalize `asyncIterator` pagination tests so that each method is tested with options it supports - Add stricter runtime checks on `limit` and `offset` (They would cause API errors anyway) - Add a comment about the undocumented behavior of `exclusiveStartId` - Update example code in docs ### Example usage It can still be used the same way as before, and additionally, it can be used like this now: ```ts ... for await (const item of datasetClient.listItems({ limit, offset, chunkSize })) { allItems.push(item); } console.log(`Overall fetched ${allItems.length} items`); ``` ### Issues - Closes: #777
1 parent bbc42f5 commit c58ce6f

22 files changed

+585
-242
lines changed

docs/getting-started.md

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -260,28 +260,15 @@ const client = new ApifyClient({ token: 'MY-APIFY-TOKEN' });
260260
// Resource clients accept an ID of the resource.
261261
const datasetClient = client.dataset('dataset-id');
262262

263-
// Number of items per page
263+
// Maximum amount of items to fetch in total
264264
const limit = 1000;
265+
// Maximum amount of items to fetch in one API call
266+
const chunkSize = 100;
265267
// Initial offset
266-
let offset = 0;
267-
// Array to store all items
268-
let allItems = [];
268+
const offset = 0;
269269

270-
while (true) {
271-
const { items, total } = await datasetClient.listItems({ limit, offset });
272-
273-
console.log(`Fetched ${items.length} items`);
274-
275-
// Merge new items with other already loaded items
276-
allItems.push(...items);
277-
278-
// If there are no more items to fetch, exit the loading
279-
if (offset + limit >= total) {
280-
break;
281-
}
282-
283-
offset += limit;
270+
for await (const item of datasetClient.listItems({ limit, offset, chunkSize })) {
271+
// Processs individual item
272+
console.log(item);
284273
}
285-
286-
console.log(`Overall fetched ${allItems.length} items`);
287274
```

src/base/api_client.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { ApifyClient } from '../apify_client';
22
import type { HttpClient } from '../http_client';
3+
import type { PaginatedResponse, PaginationOptions } from '../utils';
34

45
/** @private */
56
export interface ApiClientOptions {
@@ -82,6 +83,57 @@ export abstract class ApiClient {
8283
// The id has the format `username/actor-name`, so we only need to replace the first `/`.
8384
return id.replace('/', '~');
8485
}
86+
87+
/**
88+
* Returns async iterator to iterate through all items and Promise that can be awaited to get first page of results.
89+
*/
90+
protected _listPaginatedFromCallback<T extends PaginationOptions, Data, R extends PaginatedResponse<Data>>(
91+
getPaginatedList: (options?: T) => Promise<R>,
92+
options: T = {} as T,
93+
): AsyncIterable<Data> & Promise<R> {
94+
const minForLimitParam = (a: number | undefined, b: number | undefined): number | undefined => {
95+
// API treats 0 as undefined for limit parameter
96+
if (a === 0) a = undefined;
97+
if (b === 0) b = undefined;
98+
if (a === undefined) return b;
99+
if (b === undefined) return a;
100+
return Math.min(a, b);
101+
};
102+
103+
const paginatedListPromise = getPaginatedList({
104+
...options,
105+
limit: minForLimitParam(options.limit, options.chunkSize),
106+
});
107+
108+
async function* asyncGenerator() {
109+
let currentPage = await paginatedListPromise;
110+
yield* currentPage.items;
111+
const offset = options.offset ?? 0;
112+
const limit = Math.min(options.limit || currentPage.total, currentPage.total);
113+
114+
let currentOffset = offset + currentPage.items.length;
115+
let remainingItems = Math.min(currentPage.total - offset, limit) - currentPage.items.length;
116+
117+
while (
118+
currentPage.items.length > 0 && // Continue only if at least some items were returned in the last page.
119+
remainingItems > 0
120+
) {
121+
const newOptions = {
122+
...options,
123+
limit: minForLimitParam(remainingItems, options.chunkSize),
124+
offset: currentOffset,
125+
};
126+
currentPage = await getPaginatedList(newOptions);
127+
yield* currentPage.items;
128+
currentOffset += currentPage.items.length;
129+
remainingItems -= currentPage.items.length;
130+
}
131+
}
132+
133+
return Object.defineProperty(paginatedListPromise, Symbol.asyncIterator, {
134+
value: asyncGenerator,
135+
}) as unknown as AsyncIterable<Data> & Promise<R>;
136+
}
85137
}
86138

87139
export interface BaseOptions {

src/base/resource_collection_client.ts

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,7 @@ export class ResourceCollectionClient extends ApiClient {
2525
protected _listPaginated<T extends PaginationOptions, Data, R extends PaginatedResponse<Data>>(
2626
options: T = {} as T,
2727
): AsyncIterable<Data> & Promise<R> {
28-
const getPaginatedList = this._list.bind(this);
29-
const paginatedListPromise = getPaginatedList<T, R>(options);
30-
31-
async function* asyncGenerator() {
32-
let currentPage = await paginatedListPromise;
33-
yield* currentPage.items;
34-
const offset = options.offset ?? 0;
35-
const limit = Math.min(options.limit || currentPage.total, currentPage.total);
36-
37-
let currentOffset = offset + currentPage.items.length;
38-
let remainingItems = Math.min(currentPage.total - offset, limit) - currentPage.items.length;
39-
40-
while (
41-
currentPage.items.length > 0 && // Continue only if at least some items were returned in the last page.
42-
remainingItems > 0
43-
) {
44-
const newOptions = { ...options, limit: remainingItems, offset: currentOffset };
45-
currentPage = await getPaginatedList<T, R>(newOptions);
46-
yield* currentPage.items;
47-
currentOffset += currentPage.items.length;
48-
remainingItems -= currentPage.items.length;
49-
}
50-
}
51-
52-
return Object.defineProperty(paginatedListPromise, Symbol.asyncIterator, {
53-
value: asyncGenerator,
54-
}) as unknown as AsyncIterable<Data> & Promise<R>;
28+
return this._listPaginatedFromCallback(this._list.bind(this)<T, R>, options);
5529
}
5630

5731
protected async _create<D, R>(resource: D): Promise<R> {

src/resource_clients/actor_collection.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ export class ActorCollectionClient extends ResourceCollectionClient {
6565
options,
6666
ow.object.exactShape({
6767
my: ow.optional.boolean,
68-
limit: ow.optional.number,
69-
offset: ow.optional.number,
68+
limit: ow.optional.number.not.negative,
69+
offset: ow.optional.number.not.negative,
7070
desc: ow.optional.boolean,
7171
sortBy: ow.optional.string.oneOf(Object.values(ActorListSortBy)),
7272
}),
@@ -96,6 +96,7 @@ export enum ActorListSortBy {
9696

9797
export interface ActorCollectionListOptions extends PaginationOptions {
9898
my?: boolean;
99+
desc?: boolean;
99100
sortBy?: ActorListSortBy;
100101
}
101102

src/resource_clients/actor_env_var_collection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ export class ActorEnvVarCollectionClient extends ResourceCollectionClient {
6868
ow(
6969
options,
7070
ow.object.exactShape({
71-
limit: ow.optional.number,
72-
offset: ow.optional.number,
71+
limit: ow.optional.number.not.negative,
72+
offset: ow.optional.number.not.negative,
7373
desc: ow.optional.boolean,
7474
}),
7575
);

src/resource_clients/actor_version_collection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ export class ActorVersionCollectionClient extends ResourceCollectionClient {
6666
ow(
6767
options,
6868
ow.object.exactShape({
69-
limit: ow.optional.number,
70-
offset: ow.optional.number,
69+
limit: ow.optional.number.not.negative,
70+
offset: ow.optional.number.not.negative,
7171
desc: ow.optional.boolean,
7272
}),
7373
);

src/resource_clients/build_collection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ export class BuildCollectionClient extends ResourceCollectionClient {
6161
ow(
6262
options,
6363
ow.object.exactShape({
64-
limit: ow.optional.number,
65-
offset: ow.optional.number,
64+
limit: ow.optional.number.not.negative,
65+
offset: ow.optional.number.not.negative,
6666
desc: ow.optional.boolean,
6767
}),
6868
);

src/resource_clients/dataset.ts

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
SMALL_TIMEOUT_MILLIS,
1313
} from '../base/resource_client';
1414
import type { ApifyRequestConfig, ApifyResponse } from '../http_client';
15-
import type { PaginatedList } from '../utils';
15+
import type { PaginatedIterator, PaginatedList, PaginationOptions } from '../utils';
1616
import { applyQueryParamsToUrl, cast, catchNotFoundOrThrow, pluckData } from '../utils';
1717

1818
/**
@@ -98,6 +98,7 @@ export class DatasetClient<
9898
*
9999
* @param options - Options for listing items
100100
* @param options.limit - Maximum number of items to return. Default is all items.
101+
* @param options.chunkSize - Maximum number of items returned in one API response. Relevant in the context of asyncIterator.
101102
* @param options.offset - Number of items to skip from the beginning. Default is 0.
102103
* @param options.desc - If `true`, items are returned in descending order (newest first). Default is `false`.
103104
* @param options.fields - Array of field names to include in the results. Omits all other fields.
@@ -132,7 +133,7 @@ export class DatasetClient<
132133
* });
133134
* ```
134135
*/
135-
async listItems(options: DatasetClientListItemOptions = {}): Promise<PaginatedList<Data>> {
136+
listItems(options: DatasetClientListItemOptions = {}): PaginatedIterator<Data> {
136137
ow(
137138
options,
138139
ow.object.exactShape({
@@ -141,8 +142,9 @@ export class DatasetClient<
141142
flatten: ow.optional.array.ofType(ow.string),
142143
fields: ow.optional.array.ofType(ow.string),
143144
omit: ow.optional.array.ofType(ow.string),
144-
limit: ow.optional.number,
145-
offset: ow.optional.number,
145+
limit: ow.optional.number.not.negative,
146+
offset: ow.optional.number.not.negative,
147+
chunkSize: ow.optional.number.positive,
146148
skipEmpty: ow.optional.boolean,
147149
skipHidden: ow.optional.boolean,
148150
unwind: ow.optional.any(ow.string, ow.array.ofType(ow.string)),
@@ -151,14 +153,20 @@ export class DatasetClient<
151153
}),
152154
);
153155

154-
const response = await this.httpClient.call({
155-
url: this._url('items'),
156-
method: 'GET',
157-
params: this._params(options),
158-
timeout: DEFAULT_TIMEOUT_MILLIS,
159-
});
156+
const fetchItems = async (
157+
datasetListOptions: DatasetClientListItemOptions = {},
158+
): Promise<PaginatedList<Data>> => {
159+
const response = await this.httpClient.call({
160+
url: this._url('items'),
161+
method: 'GET',
162+
params: this._params(datasetListOptions),
163+
timeout: DEFAULT_TIMEOUT_MILLIS,
164+
});
165+
166+
return this._createPaginationList(response, datasetListOptions.desc ?? false);
167+
};
160168

161-
return this._createPaginationList(response, options.desc ?? false);
169+
return this._listPaginatedFromCallback(fetchItems, options);
162170
}
163171

164172
/**
@@ -214,8 +222,8 @@ export class DatasetClient<
214222
flatten: ow.optional.array.ofType(ow.string),
215223
fields: ow.optional.array.ofType(ow.string),
216224
omit: ow.optional.array.ofType(ow.string),
217-
limit: ow.optional.number,
218-
offset: ow.optional.number,
225+
limit: ow.optional.number.not.negative,
226+
offset: ow.optional.number.not.negative,
219227
skipEmpty: ow.optional.boolean,
220228
skipHeaderRow: ow.optional.boolean,
221229
skipHidden: ow.optional.boolean,
@@ -354,8 +362,8 @@ export class DatasetClient<
354362
flatten: ow.optional.array.ofType(ow.string),
355363
fields: ow.optional.array.ofType(ow.string),
356364
omit: ow.optional.array.ofType(ow.string),
357-
limit: ow.optional.number,
358-
offset: ow.optional.number,
365+
limit: ow.optional.number.not.negative,
366+
offset: ow.optional.number.not.negative,
359367
skipEmpty: ow.optional.boolean,
360368
skipHidden: ow.optional.boolean,
361369
unwind: ow.optional.any(ow.string, ow.array.ofType(ow.string)),
@@ -448,14 +456,12 @@ export interface DatasetClientUpdateOptions {
448456
* Provides various filtering, pagination, and transformation options to customize
449457
* the output format and content of retrieved items.
450458
*/
451-
export interface DatasetClientListItemOptions {
459+
export interface DatasetClientListItemOptions extends PaginationOptions {
452460
clean?: boolean;
453461
desc?: boolean;
454462
flatten?: string[];
455463
fields?: string[];
456464
omit?: string[];
457-
limit?: number;
458-
offset?: number;
459465
skipEmpty?: boolean;
460466
skipHidden?: boolean;
461467
unwind?: string | string[]; // TODO: when doing a breaking change release, change to string[] only

src/resource_clients/dataset_collection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ export class DatasetCollectionClient extends ResourceCollectionClient {
6363
options,
6464
ow.object.exactShape({
6565
unnamed: ow.optional.boolean,
66-
limit: ow.optional.number,
67-
offset: ow.optional.number,
66+
limit: ow.optional.number.not.negative,
67+
offset: ow.optional.number.not.negative,
6868
desc: ow.optional.boolean,
6969
}),
7070
);

src/resource_clients/key_value_store.ts

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,26 +136,61 @@ export class KeyValueStoreClient extends ResourceClient {
136136
* } while (result.isTruncated);
137137
* ```
138138
*/
139-
async listKeys(options: KeyValueClientListKeysOptions = {}): Promise<KeyValueClientListKeysResult> {
139+
listKeys(
140+
options: KeyValueClientListKeysOptions = {},
141+
): Promise<KeyValueClientListKeysResult> & AsyncIterable<KeyValueListItem> {
140142
ow(
141143
options,
142144
ow.object.exactShape({
143-
limit: ow.optional.number,
145+
limit: ow.optional.number.not.negative,
144146
exclusiveStartKey: ow.optional.string,
145147
collection: ow.optional.string,
146148
prefix: ow.optional.string,
147149
signature: ow.optional.string,
148150
}),
149151
);
150152

151-
const response = await this.httpClient.call({
152-
url: this._url('keys'),
153-
method: 'GET',
154-
params: this._params(options),
155-
timeout: MEDIUM_TIMEOUT_MILLIS,
156-
});
153+
const getPaginatedList = async (
154+
kvsListOptions: KeyValueClientListKeysOptions = {},
155+
): Promise<KeyValueClientListKeysResult> => {
156+
const response = await this.httpClient.call({
157+
url: this._url('keys'),
158+
method: 'GET',
159+
params: this._params(kvsListOptions),
160+
timeout: MEDIUM_TIMEOUT_MILLIS,
161+
});
162+
163+
return cast(parseDateFields(pluckData(response.data)));
164+
};
165+
166+
const paginatedListPromise = getPaginatedList(options);
167+
async function* asyncGenerator() {
168+
let currentPage = await paginatedListPromise;
169+
yield* currentPage.items;
170+
171+
let remainingItems = options.limit ? options.limit - currentPage.items.length : undefined;
172+
173+
while (
174+
currentPage.items.length > 0 && // Continue only if at least some items were returned in the last page.
175+
currentPage.nextExclusiveStartKey !== null && // Continue only if there is some next key.
176+
(remainingItems === undefined || remainingItems > 0) // Continue only if the limit was not exceeded.
177+
) {
178+
const newOptions = {
179+
...options,
180+
limit: remainingItems,
181+
exclusiveStartKey: currentPage.nextExclusiveStartKey,
182+
};
183+
currentPage = await getPaginatedList(newOptions);
184+
yield* currentPage.items;
185+
if (remainingItems) {
186+
remainingItems -= currentPage.items.length;
187+
}
188+
}
189+
}
157190

158-
return cast(parseDateFields(pluckData(response.data)));
191+
return Object.defineProperty(paginatedListPromise, Symbol.asyncIterator, {
192+
value: asyncGenerator,
193+
}) as unknown as AsyncIterable<KeyValueListItem> & Promise<KeyValueClientListKeysResult>;
159194
}
160195

161196
/**
@@ -216,7 +251,7 @@ export class KeyValueStoreClient extends ResourceClient {
216251
ow(
217252
options,
218253
ow.object.exactShape({
219-
limit: ow.optional.number,
254+
limit: ow.optional.number.not.negative,
220255
exclusiveStartKey: ow.optional.string,
221256
collection: ow.optional.string,
222257
prefix: ow.optional.string,

0 commit comments

Comments
 (0)