Skip to content

Commit e2fa7a8

Browse files
authored
feat: add polling for run-task and URL scraper (#53)
* feat: add polling for run-task and URL scraper * test: add test for run-task and URL scraper polling * refactor: extract polling logic to a separate fn
1 parent 196a283 commit e2fa7a8

File tree

9 files changed

+156
-61
lines changed

9 files changed

+156
-61
lines changed

nodes/Apify/__tests__/Apify.node.spec.ts

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ describe('Apify Node', () => {
9696

9797
describe('actor-tasks', () => {
9898
describe('run-task', () => {
99-
it('should run the run-task workflow', async () => {
100-
const mockRunTask = fixtures.getRunTaskResult();
99+
it('should run the run-task workflow (waitForFinish: false)', async () => {
100+
const mockRunTask = fixtures.runActorResult();
101101

102102
const scope = nock('https://api.apify.com')
103103
.post('/v2/actor-tasks/PwUDLcG3zMyT8E4vq/runs')
104-
.query(true)
104+
.query({ waitForFinish: 0, memory: 1024 })
105105
.reply(200, mockRunTask);
106106

107107
const runTaskWorkflow = require('./workflows/actor-tasks/run-task.workflow.json');
@@ -121,6 +121,37 @@ describe('Apify Node', () => {
121121

122122
expect(scope.isDone()).toBe(true);
123123
});
124+
125+
it('should run the run-task workflow and wait for finish (waitForFinish: true)', async () => {
126+
const mockRunTask = fixtures.runActorResult();
127+
const mockFinishedRun = fixtures.getRunTaskResult();
128+
129+
const scope = nock('https://api.apify.com')
130+
.post('/v2/actor-tasks/PwUDLcG3zMyT8E4vq/runs')
131+
.query({ waitForFinish: 0, memory: 1024 })
132+
.reply(200, mockRunTask)
133+
.get(`/v2/actor-runs/${mockRunTask.data.id}`)
134+
.reply(200, mockFinishedRun);
135+
136+
const runTaskWorkflow = require('./workflows/actor-tasks/run-task-wait-for-finish.workflow.json');
137+
const { waitPromise } = await executeWorkflow({
138+
credentialsHelper,
139+
workflow: runTaskWorkflow,
140+
});
141+
const result = await waitPromise.promise();
142+
143+
const nodeResults = getRunTaskDataByNodeName(result, 'Run task');
144+
expect(nodeResults.length).toBe(1);
145+
const [nodeResult] = nodeResults;
146+
expect(nodeResult.executionStatus).toBe('success');
147+
148+
const data = getTaskData(nodeResult);
149+
// expect polled terminal run as result
150+
expect(data).not.toEqual(mockRunTask.data);
151+
expect(data).toEqual(mockFinishedRun.data);
152+
153+
expect(scope.isDone()).toBe(true);
154+
});
124155
});
125156
});
126157

@@ -187,7 +218,7 @@ describe('Apify Node', () => {
187218
it('should run the run-actor workflow and wait for finish', async () => {
188219
const mockRunActor = fixtures.runActorResult();
189220
const mockBuild = fixtures.getBuildResult();
190-
const mockFinishedRun = fixtures.getRunResult();
221+
const mockFinishedRun = fixtures.getSuccessRunResult();
191222

192223
const scope = nock('https://api.apify.com')
193224
.get('/v2/acts/nFJndFXA5zjCTuudP')
@@ -222,15 +253,18 @@ describe('Apify Node', () => {
222253
});
223254
describe('scrape-single-url', () => {
224255
it('should run the scrape-single-url workflow', async () => {
225-
const mockRunResponse = fixtures.runScrapeSingleUrlActorResult();
256+
const mockRunActor = fixtures.runActorResult();
257+
const mockFinishedRun = fixtures.getSuccessRunResult();
226258
const mockItems = fixtures.getScrapeSingleUrlItemsResult();
227259

228-
const datasetId = mockRunResponse.data.defaultDatasetId;
260+
const datasetId = mockFinishedRun.data.defaultDatasetId;
229261

230262
const scope = nock('https://api.apify.com')
231263
.post(`/v2/acts/${helpers.consts.WEB_CONTENT_SCRAPER_ACTOR_ID}/runs`)
232-
.query({ waitForFinish: 60 })
233-
.reply(200, mockRunResponse)
264+
.query({ waitForFinish: 0 })
265+
.reply(200, mockRunActor)
266+
.get(`/v2/actor-runs/${mockRunActor.data.id}`)
267+
.reply(200, mockFinishedRun)
234268
.get(`/v2/datasets/${datasetId}/items`)
235269
.query({ format: 'json' })
236270
.reply(200, mockItems);

nodes/Apify/__tests__/utils/fixtures.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ export const getSuccessRunResult = () => {
370370
buildId: 'DgGC7ZxWmZ0cnuNIy',
371371
exitCode: 0,
372372
defaultKeyValueStoreId: '4aoPkdUdfoRpf9w7E',
373-
defaultDatasetId: 'U0hEU6N57UfDgqI98',
373+
defaultDatasetId: '63kMAihbWVgBvEAZ2',
374374
defaultRequestQueueId: 'qNGmAgPf3Pb50PhqC',
375375
pricingInfo: {
376376
pricingModel: 'PAY_PER_EVENT',
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
{
2+
"name": "Run task and wait for finish workflow",
3+
"nodes": [
4+
{
5+
"parameters": {
6+
"resource": "Actor tasks",
7+
"operation": "Run task",
8+
"actorTaskId": {
9+
"__rl": true,
10+
"value": "PwUDLcG3zMyT8E4vq",
11+
"mode": "list",
12+
"cachedResultName": "Google Search Results Scraper (Task)",
13+
"cachedResultUrl": "https://console.apify.com/actors/tasks/PwUDLcG3zMyT8E4vq/input"
14+
},
15+
"waitForFinish": true
16+
},
17+
"id": "2ada004c-bc23-435d-a65c-371ebcafb4f5",
18+
"name": "Run task",
19+
"type": "n8n-nodes-apify.apify",
20+
"typeVersion": 1,
21+
"position": [920, 460],
22+
"credentials": {
23+
"apifyApi": {
24+
"id": "dJAynKkN2pRqy3Ko",
25+
"name": "Apify account"
26+
}
27+
}
28+
}
29+
],
30+
"pinData": {},
31+
"connections": {},
32+
"active": false,
33+
"settings": {
34+
"executionOrder": "v1"
35+
},
36+
"versionId": "29897fc5-8618-4467-9665-358072641546",
37+
"meta": {
38+
"templateCredsSetupCompleted": true,
39+
"instanceId": "3f65ba173ae28613be507e94c0a98de4375527c55e31b7fc173a4edee4e2ded3"
40+
},
41+
"id": "OFLCq8UoIVInQv2Y",
42+
"tags": []
43+
}

nodes/Apify/__tests__/workflows/actor-tasks/run-task.workflow.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"name": "Run task workflow",
2+
"name": "Run task no wait workflow",
33
"nodes": [
44
{
55
"parameters": {
@@ -12,7 +12,7 @@
1212
"cachedResultName": "Google Search Results Scraper (Task)",
1313
"cachedResultUrl": "https://console.apify.com/actors/tasks/PwUDLcG3zMyT8E4vq/input"
1414
},
15-
"waitForFinish": 60
15+
"waitForFinish": false
1616
},
1717
"id": "2ada004c-bc23-435d-a65c-371ebcafb4f5",
1818
"name": "Run task",

nodes/Apify/resources/actor-tasks/run-task/execute.ts

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import {
44
NodeApiError,
55
NodeOperationError,
66
} from 'n8n-workflow';
7-
import { apiRequest } from '../../../resources/genericFunctions';
7+
import { apiRequest, pollRunStatus } from '../../../resources/genericFunctions';
88

99
export async function runTask(this: IExecuteFunctions, i: number): Promise<INodeExecutionData> {
1010
const actorTaskId = this.getNodeParameter('actorTaskId', i, undefined, {
1111
extractValue: true,
1212
}) as string;
1313
const input = this.getNodeParameter('customBody', i, {}) as object;
14-
const waitForFinish = this.getNodeParameter('waitForFinish', i, null) as number | null;
14+
const waitForFinish = this.getNodeParameter('waitForFinish', i) as boolean;
1515
const timeout = this.getNodeParameter('timeout', i, null) as number | null;
1616
const memory = this.getNodeParameter('memory', i, null) as number | null;
1717
const build = this.getNodeParameter('build', i, '') as string;
@@ -21,27 +21,29 @@ export async function runTask(this: IExecuteFunctions, i: number): Promise<INode
2121
}
2222

2323
const qs: Record<string, any> = {};
24-
if (waitForFinish != null) qs.waitForFinish = waitForFinish;
2524
if (timeout != null) qs.timeout = timeout;
2625
if (memory != null) qs.memory = memory;
2726
if (build) qs.build = build;
27+
qs.waitForFinish = 0; // always start run without waiting
2828

29-
try {
30-
const apiResult = await apiRequest.call(this, {
31-
method: 'POST',
32-
uri: `/v2/actor-tasks/${actorTaskId}/runs`,
33-
body: input,
34-
qs,
35-
});
29+
const apiResult = await apiRequest.call(this, {
30+
method: 'POST',
31+
uri: `/v2/actor-tasks/${actorTaskId}/runs`,
32+
body: input,
33+
qs,
34+
});
3635

37-
if (!apiResult) {
38-
throw new NodeApiError(this.getNode(), {
39-
message: `Task run for ${actorTaskId} not found`,
40-
});
41-
}
36+
if (!apiResult?.data?.id) {
37+
throw new NodeApiError(this.getNode(), {
38+
message: `Run ID not found after running the task`,
39+
});
40+
}
4241

42+
if (!waitForFinish) {
4343
return { json: { ...apiResult.data } };
44-
} catch (error) {
45-
throw new NodeApiError(this.getNode(), error);
4644
}
45+
46+
const runId = apiResult.data.id;
47+
const lastRunData = await pollRunStatus.call(this, runId);
48+
return { json: { ...lastRunData } };
4749
}

nodes/Apify/resources/actor-tasks/run-task/properties.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,9 @@ export const properties: INodeProperties[] = [
4747
displayName: 'Wait for Finish',
4848
name: 'waitForFinish',
4949
description:
50-
"The maximum number of seconds the server waits for the run to finish. By default, the server doesn't wait for the run to finish and returns immediately. The maximum value is 60 seconds.",
51-
default: null,
52-
type: 'number',
53-
typeOptions: {
54-
maxValue: 60,
55-
},
50+
'Whether to wait for the run to finish. If true, the node will poll the run status until it reaches a terminal state (SUCCEEDED, FAILED, TIMED-OUT, ABORTED) or 5 minutes have passed. If false, the node will return immediately after starting the run.',
51+
default: true,
52+
type: 'boolean',
5653
displayOptions: {
5754
show: {
5855
resource: ['Actor tasks'],

nodes/Apify/resources/actors/run-actor/execute.ts

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import {
44
NodeApiError,
55
NodeOperationError,
66
} from 'n8n-workflow';
7-
import { apiRequest } from '../../genericFunctions';
8-
import * as helpers from '../../../helpers';
7+
import { apiRequest, pollRunStatus } from '../../genericFunctions';
98

109
export async function runActor(this: IExecuteFunctions, i: number): Promise<INodeExecutionData> {
1110
const actorId = this.getNodeParameter('actorId', i, undefined, {
@@ -83,27 +82,7 @@ export async function runActor(this: IExecuteFunctions, i: number): Promise<INod
8382
// This loop is infinite and will only stop when a terminal status is reached,
8483
// or when the workflow maximum timeout is hit, as set in your n8n configuration.
8584
const runId = run.data.id;
86-
let lastRunData = run.data;
87-
while (true) {
88-
try {
89-
const pollResult = await apiRequest.call(this, {
90-
method: 'GET',
91-
uri: `/v2/actor-runs/${runId}`,
92-
});
93-
const status = pollResult?.data?.status;
94-
lastRunData = pollResult?.data;
95-
if (helpers.consts.TERMINAL_RUN_STATUSES.includes(status)) {
96-
break;
97-
}
98-
} catch (err) {
99-
throw new NodeApiError(this.getNode(), {
100-
message: `Error polling run status: ${err}`,
101-
});
102-
}
103-
await new Promise((resolve) =>
104-
setTimeout(resolve, helpers.consts.WAIT_FOR_FINISH_POLL_INTERVAL),
105-
);
106-
}
85+
const lastRunData = await pollRunStatus.call(this, runId);
10786
return {
10887
json: { ...lastRunData },
10988
};

nodes/Apify/resources/actors/scrape-single-url/execute.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { IExecuteFunctions, INodeExecutionData, NodeApiError } from 'n8n-workflow';
2-
import { apiRequest } from '../../../resources/genericFunctions';
2+
import { apiRequest, pollRunStatus } from '../../../resources/genericFunctions';
33
import { consts } from '../../../helpers';
44

55
export async function scrapeSingleUrl(
@@ -24,15 +24,27 @@ export async function scrapeSingleUrl(
2424
saveMarkdown: true,
2525
};
2626

27-
// Run the actor and wait for it to finish
27+
// Run the actor and do not wait for finish
28+
2829
const run = await apiRequest.call(this, {
2930
method: 'POST',
3031
uri: `/v2/acts/${consts.WEB_CONTENT_SCRAPER_ACTOR_ID}/runs`,
3132
body: input,
32-
qs: { waitForFinish: 60 },
33+
qs: { waitForFinish: 0 },
3334
});
3435

35-
const defaultDatasetId = run?.data?.defaultDatasetId || run?.defaultDatasetId;
36+
const runId = run?.data?.id || run?.id;
37+
38+
if (!runId) {
39+
throw new NodeApiError(this.getNode(), {
40+
message: 'No run ID returned from actor run',
41+
});
42+
}
43+
44+
// Poll for terminal status
45+
const lastRunData = await pollRunStatus.call(this, runId);
46+
47+
const defaultDatasetId = lastRunData?.defaultDatasetId;
3648

3749
if (!defaultDatasetId) {
3850
throw new NodeApiError(this.getNode(), {

nodes/Apify/resources/genericFunctions.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,34 @@ export async function apiRequestAllItems(
118118
return combinedData;
119119
}
120120

121+
export async function pollRunStatus(
122+
this: IHookFunctions | IExecuteFunctions | ILoadOptionsFunctions,
123+
runId: string,
124+
): Promise<any> {
125+
let lastRunData: any;
126+
while (true) {
127+
try {
128+
const pollResult = await apiRequest.call(this, {
129+
method: 'GET',
130+
uri: `/v2/actor-runs/${runId}`,
131+
});
132+
const status = pollResult?.data?.status;
133+
lastRunData = pollResult?.data;
134+
if (['SUCCEEDED', 'FAILED', 'TIMED-OUT', 'ABORTED'].includes(status)) {
135+
break;
136+
}
137+
} catch (err) {
138+
throw new NodeApiError(this.getNode(), {
139+
message: `Error polling run status: ${err}`,
140+
});
141+
}
142+
await new Promise(
143+
(resolve) => setTimeout(resolve, 1000), // 1 second polling interval
144+
);
145+
}
146+
return lastRunData;
147+
}
148+
121149
export function getActorOrTaskId(this: IHookFunctions): string {
122150
const resource = this.getNodeParameter('resource', '') as string;
123151
const actorId = this.getNodeParameter('actorId', '') as { value: string };

0 commit comments

Comments
 (0)