diff --git a/src/clients/dataset-client.ts b/src/clients/dataset-client.ts index 6ddb066..ae5fd0f 100644 --- a/src/clients/dataset-client.ts +++ b/src/clients/dataset-client.ts @@ -1,4 +1,4 @@ -import { ActorRun, Dataset, DatasetClient } from 'apify-client'; +import { DatasetClient } from 'apify-client'; import { RunsTracker } from '../tracker.js'; import { DatasetItem, GreedyIterateOptions, ExtendedDatasetClient, IterateOptions } from '../types.js'; @@ -26,82 +26,74 @@ export class ExtDatasetClient extends DatasetClient im const { pageSize, ...listItemOptions } = options; this.customLogger.info('Iterating Dataset', { pageSize }, { url: this.url }); - let totalItems = 0; + const initialOffset = listItemOptions.offset ?? 0; + let readItems = 0; if (pageSize) { - let offset = 0; - let currentPage = await this.superClient.listItems({ ...listItemOptions, offset, limit: pageSize }); + let currentPage = await this.superClient.listItems({ + ...listItemOptions, + limit: pageSize, + }); while (currentPage.items.length > 0) { - totalItems += currentPage.items.length; + readItems += currentPage.items.length; for (const item of currentPage.items) { yield item; } - offset += pageSize; - currentPage = await this.superClient.listItems({ offset, limit: pageSize }); + currentPage = await this.superClient.listItems({ + ...listItemOptions, + offset: initialOffset + readItems, + limit: pageSize, + }); } } else { const itemList = await this.superClient.listItems(listItemOptions); - totalItems += itemList.items.length; + readItems += itemList.items.length; for (const item of itemList.items) { yield item; } } - this.customLogger.info('Finished reading dataset', { totalItems }, { url: this.url }); + this.customLogger.info('Finished reading dataset', { initialOffset, readItems }, { url: this.url }); } async* greedyIterate(options: GreedyIterateOptions = {}): AsyncGenerator { - const { pageSize = 100, itemsThreshold = 100, pollIntervalSecs = 10, ...listItemOptions } = options; - this.customLogger.info('Greedily iterating Dataset', { pageSize }, { url: this.url }); + const { pollIntervalSecs = 10, ...iterateOptions } = options; + this.customLogger.info('Greedily iterating Dataset', { pageSize: iterateOptions.pageSize }, { url: this.url }); - let readItemsCount = 0; + let currentOffset = iterateOptions.offset ?? 0; - let dataset: Dataset | undefined; - let run: ActorRun | undefined; + const runId = (await this.get())?.actRunId; - // TODO: breaking change - remove itemsThreshold and just listItems at every iteration - while (true) { - dataset = await this.get(); - if (!dataset || !dataset.actRunId) { - this.customLogger.error('Error getting Dataset while iterating greedily', { id: this.id }); - return; - } - - run = await this.apifyClient.run(dataset.actRunId).get(); - if (!run) { - this.customLogger.error('Error getting Run while iterating Dataset greedily', { id: this.id }); - return; - } + let runStatus = runId ? (await this.apifyClient.run(runId).get())?.status : undefined; - if (run.status !== 'READY' && run.status !== 'RUNNING') { - break; - } - - if (dataset.itemCount >= readItemsCount + itemsThreshold) { - const itemList = await this.superClient.listItems({ ...listItemOptions, offset: readItemsCount, limit: pageSize }); - readItemsCount += itemList.count; - for (const item of itemList.items) { + if (runId) { + while (runStatus && ['READY', 'RUNNING'].includes(runStatus)) { + const datasetIterator = this.iterate({ ...iterateOptions, offset: currentOffset }); + for await (const item of datasetIterator) { + currentOffset++; yield item; } - } - await new Promise((resolve) => setTimeout(resolve, pollIntervalSecs * 1000)); + await new Promise((resolve) => setTimeout(resolve, pollIntervalSecs * 1000)); + + runStatus = (await this.apifyClient.run(runId).get())?.status; + } + } else { + this.customLogger.error( + 'Greedy iterate: error getting Dataset or associated run\'s ID; trying to read the remaining items.', + ); } - dataset = await this.get(); - if (!dataset || !dataset.actRunId) { - this.customLogger.error('Error getting Dataset while iterating greedily', { id: this.id }); - return; + if (runId && !runStatus) { + this.customLogger.error( + 'Greedy iterate: error getting associated run\'s status: trying to read the remaining items.', + ); } - while (readItemsCount < dataset.itemCount) { - const itemList = await this.superClient.listItems({ ...listItemOptions, offset: readItemsCount, limit: pageSize }); - if (itemList.count === 0) { break; } - readItemsCount += itemList.count; - for (const item of itemList.items) { - yield item; - } + const datasetIterator = this.iterate({ ...iterateOptions, offset: currentOffset }); + for await (const item of datasetIterator) { + yield item; } } } diff --git a/src/types.ts b/src/types.ts index c7838da..26c8976 100644 --- a/src/types.ts +++ b/src/types.ts @@ -387,13 +387,6 @@ export type IterateOptions = DatasetClientListItemOptions & { } export type GreedyIterateOptions = IterateOptions & { - /** - * Download new items when they are more than the specified threshold, or when the Run terminates.\ - * If zero, the new items are downloaded as soon as they are detected. - * - * @default 100 - */ - itemsThreshold?: number /** * Check the run's status regularly at the specified interval, in seconds. * diff --git a/test/clients/dataset-client.test.ts b/test/clients/dataset-client.test.ts index 7ac1f4e..4093f50 100644 --- a/test/clients/dataset-client.test.ts +++ b/test/clients/dataset-client.test.ts @@ -76,7 +76,7 @@ describe('dataset-client', () => { expect(listItemsSpy).toHaveBeenCalledWith({}); }); - it('iterates the items from the dataset, using pagination', async () => { + it('iterates the items from the dataset, using pagination, passing the given options', async () => { const listItemsSpy = vi.spyOn(DatasetClient.prototype, 'listItems') .mockImplementationOnce(async () => ({ count: 2, @@ -102,7 +102,7 @@ describe('dataset-client', () => { limit: 2, desc: true, })); - const datasetIterator = datasetClient.iterate({ pageSize: 2 }); + const datasetIterator = datasetClient.iterate({ pageSize: 2, fields: ['title'] }); let index = 0; for await (const item of datasetIterator) { expect(item).toEqual(testItems[index]); @@ -110,18 +110,53 @@ describe('dataset-client', () => { } expect(index).toBe(3); expect(listItemsSpy).toHaveBeenCalledTimes(3); - expect(listItemsSpy).toHaveBeenNthCalledWith(1, { offset: 0, limit: 2 }); - expect(listItemsSpy).toHaveBeenNthCalledWith(2, { offset: 2, limit: 2 }); - expect(listItemsSpy).toHaveBeenNthCalledWith(3, { offset: 4, limit: 2 }); + expect(listItemsSpy).toHaveBeenNthCalledWith(1, { offset: 0, limit: 2, fields: ['title'] }); + expect(listItemsSpy).toHaveBeenNthCalledWith(2, { offset: 2, limit: 2, fields: ['title'] }); + expect(listItemsSpy).toHaveBeenNthCalledWith(3, { offset: 4, limit: 2, fields: ['title'] }); }); - }); - describe('greedyIterate', () => { - it('iterates the items from the dataset as soon as one batch is available, using pagination', () => { - // TODO: test + it('iterates the items using pagination and starting from the desired offset', async () => { + const listItemsSpy = vi.spyOn(DatasetClient.prototype, 'listItems') + .mockImplementationOnce(async () => ({ + count: 2, + items: testItems.slice(0, 2), + total: 8, + offset: 5, + limit: 2, + desc: true, + })) + .mockImplementationOnce(async () => ({ + count: 1, + items: testItems.slice(2, 3), + total: 8, + offset: 7, + limit: 2, + desc: true, + })) + .mockImplementationOnce(async () => ({ + count: 0, + items: [], + total: 8, + offset: 9, + limit: 2, + desc: true, + })); + const datasetIterator = datasetClient.iterate({ pageSize: 2, offset: 5 }); + let index = 0; + for await (const item of datasetIterator) { + expect(item).toEqual(testItems[index]); + index++; + } + expect(index).toBe(3); + expect(listItemsSpy).toHaveBeenCalledTimes(3); + expect(listItemsSpy).toHaveBeenNthCalledWith(1, { offset: 5, limit: 2 }); + expect(listItemsSpy).toHaveBeenNthCalledWith(2, { offset: 7, limit: 2 }); + expect(listItemsSpy).toHaveBeenNthCalledWith(3, { offset: 9, limit: 2 }); }); + }); - it('iterates the items from the dataset as soon as new items are available, setting pageSize to 0', () => { + describe('greedyIterate', () => { + it('iterates the items from the dataset until the run has finished', () => { // TODO: test }); });