Skip to content

Commit a562f64

Browse files
authored
feat: progress notification (#173)
* feat: progress notification * fix: remove optional parameter * fix: increment progress number, ignore total * fix: start tracker for call-actor internal tool
1 parent 9a5c16a commit a562f64

File tree

7 files changed

+211
-19
lines changed

7 files changed

+211
-19
lines changed

src/const.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,5 @@ export const ALGOLIA = {
8080
apiKey: 'e97714a64e2b4b8b8fe0b01cd8592870', // search only (public) API key
8181
indexName: 'test_test_apify_sdk',
8282
};
83+
84+
export const PROGRESS_NOTIFICATION_INTERVAL_MS = 5_000; // 5 seconds

src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ if (STANDBY_MODE) {
4747
log.info('Actor is running in the STANDBY mode.');
4848

4949
app.listen(PORT, () => {
50-
log.info(`The Actor web server is listening for user requests at ${HOST}`);
50+
log.info(`The Actor web server is listening for user requests at ${HOST}:${PORT}`);
5151
});
5252
} else {
5353
log.info('Actor is not designed to run in the NORMAL model (use this mode only for debugging purposes)');

src/mcp/server.ts

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
import { addRemoveTools, callActorGetDataset, defaultTools, getActorsAsTools, toolCategories } from '../tools/index.js';
2727
import { actorNameToToolName, decodeDotPropertyNames } from '../tools/utils.js';
2828
import type { ActorMcpTool, ActorTool, HelperTool, ToolEntry } from '../types.js';
29+
import { createProgressTracker } from '../utils/progress.js';
2930
import { connectMCPClient } from './client.js';
3031
import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC } from './const.js';
3132
import { processParamsGetTools } from './utils.js';
@@ -423,15 +424,26 @@ export class ActorsMcpServer {
423424
// Handle internal tool
424425
if (tool.type === 'internal') {
425426
const internalTool = tool.tool as HelperTool;
427+
428+
// Only create progress tracker for call-actor tool
429+
const progressTracker = internalTool.name === 'call-actor'
430+
? createProgressTracker(progressToken, extra.sendNotification)
431+
: null;
432+
426433
const res = await internalTool.call({
427434
args,
428435
extra,
429436
apifyMcpServer: this,
430437
mcpServer: this.server,
431438
apifyToken,
432439
userRentedActorIds,
440+
progressTracker,
433441
}) as object;
434442

443+
if (progressTracker) {
444+
progressTracker.stop();
445+
}
446+
435447
return { ...res };
436448
}
437449

@@ -477,21 +489,33 @@ export class ActorsMcpServer {
477489
if (tool.type === 'actor') {
478490
const actorTool = tool.tool as ActorTool;
479491

492+
// Create progress tracker if progressToken is available
493+
const progressTracker = createProgressTracker(progressToken, extra.sendNotification);
494+
480495
const callOptions: ActorCallOptions = { memory: actorTool.memoryMbytes };
481-
const { items } = await callActorGetDataset(
482-
actorTool.actorFullName,
483-
args,
484-
apifyToken as string,
485-
callOptions,
486-
);
487-
return {
488-
content: items.items.map((item: Record<string, unknown>) => {
489-
return {
490-
type: 'text',
491-
text: JSON.stringify(item),
492-
};
493-
}),
494-
};
496+
497+
try {
498+
const { items } = await callActorGetDataset(
499+
actorTool.actorFullName,
500+
args,
501+
apifyToken as string,
502+
callOptions,
503+
progressTracker,
504+
);
505+
506+
return {
507+
content: items.items.map((item: Record<string, unknown>) => {
508+
return {
509+
type: 'text',
510+
text: JSON.stringify(item),
511+
};
512+
}),
513+
};
514+
} finally {
515+
if (progressTracker) {
516+
progressTracker.stop();
517+
}
518+
}
495519
}
496520
} catch (error) {
497521
if (error instanceof ApifyApiError) {

src/tools/actor.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { actorDefinitionPrunedCache } from '../state.js';
1919
import type { ActorDefinitionStorage, ActorInfo, InternalTool, ToolEntry } from '../types.js';
2020
import { getActorDefinitionStorageFieldNames } from '../utils/actor.js';
2121
import { getValuesByDotKeys } from '../utils/generic.js';
22+
import type { ProgressTracker } from '../utils/progress.js';
2223
import { getActorDefinition } from './build.js';
2324
import {
2425
actorNameToToolName,
@@ -50,6 +51,7 @@ export type CallActorGetDatasetResult = {
5051
* @param {ActorCallOptions} callOptions - The options to pass to the actor.
5152
* @param {unknown} input - The input to pass to the actor.
5253
* @param {string} apifyToken - The Apify token to use for authentication.
54+
* @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates.
5355
* @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items.
5456
* @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set
5557
*/
@@ -58,16 +60,26 @@ export async function callActorGetDataset(
5860
input: unknown,
5961
apifyToken: string,
6062
callOptions: ActorCallOptions | undefined = undefined,
63+
progressTracker?: ProgressTracker | null,
6164
): Promise<CallActorGetDatasetResult> {
6265
try {
6366
log.info(`Calling Actor ${actorName} with input: ${JSON.stringify(input)}`);
6467

6568
const client = new ApifyClient({ token: apifyToken });
6669
const actorClient = client.actor(actorName);
6770

68-
const actorRun: ActorRun = await actorClient.call(input, callOptions);
69-
const dataset = client.dataset(actorRun.defaultDatasetId);
70-
// const dataset = client.dataset('Ehtn0Y4wIKviFT2WB');
71+
// Start the actor run but don't wait for completion
72+
const actorRun: ActorRun = await actorClient.start(input, callOptions);
73+
74+
// Start progress tracking if tracker is provided
75+
if (progressTracker) {
76+
progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName);
77+
}
78+
79+
// Wait for the actor to complete
80+
const completedRun = await client.run(actorRun.id).waitForFinish();
81+
82+
const dataset = client.dataset(completedRun.defaultDatasetId);
7183
const [items, defaultBuild] = await Promise.all([
7284
dataset.listItems(),
7385
(await actorClient.defaultBuild()).get(),
@@ -301,7 +313,7 @@ export const callActor: ToolEntry = {
301313
inputSchema: zodToJsonSchema(callActorArgs),
302314
ajvValidate: ajv.compile(zodToJsonSchema(callActorArgs)),
303315
call: async (toolArgs) => {
304-
const { apifyMcpServer, args, apifyToken } = toolArgs;
316+
const { apifyMcpServer, args, apifyToken, progressTracker } = toolArgs;
305317
const { actor: actorName, input, callOptions } = callActorArgs.parse(args);
306318

307319
const actors = apifyMcpServer.listActorToolNames();
@@ -356,6 +368,7 @@ You can only use actors that are included in the list; actors not in the list ca
356368
input,
357369
apifyToken,
358370
callOptions,
371+
progressTracker,
359372
);
360373

361374
return {

src/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { ActorDefaultRunOptions, ActorDefinition, ActorStoreList, PricingIn
77
import type { ACTOR_PRICING_MODEL } from './const.js';
88
import type { ActorsMcpServer } from './mcp/server.js';
99
import type { toolCategories } from './tools/index.js';
10+
import type { ProgressTracker } from './utils/progress.js';
1011

1112
export interface ISchemaProperties {
1213
type: string;
@@ -105,6 +106,8 @@ export type InternalToolArgs = {
105106
apifyToken: string;
106107
/** List of Actor IDs that the user has rented */
107108
userRentedActorIds?: string[];
109+
/** Optional progress tracker for long running internal tools, like call-actor */
110+
progressTracker?: ProgressTracker | null;
108111
}
109112

110113
/**

src/utils/progress.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js';
2+
3+
import { ApifyClient } from '../apify-client.js';
4+
import { PROGRESS_NOTIFICATION_INTERVAL_MS } from '../const.js';
5+
6+
export class ProgressTracker {
7+
private progressToken: string | number;
8+
private sendNotification: (notification: ProgressNotification) => Promise<void>;
9+
private currentProgress = 0;
10+
private intervalId?: NodeJS.Timeout;
11+
12+
constructor(
13+
progressToken: string | number,
14+
sendNotification: (notification: ProgressNotification) => Promise<void>,
15+
) {
16+
this.progressToken = progressToken;
17+
this.sendNotification = sendNotification;
18+
}
19+
20+
async updateProgress(message?: string): Promise<void> {
21+
this.currentProgress += 1;
22+
23+
try {
24+
const notification: ProgressNotification = {
25+
method: 'notifications/progress' as const,
26+
params: {
27+
progressToken: this.progressToken,
28+
progress: this.currentProgress,
29+
...(message && { message }),
30+
},
31+
};
32+
33+
await this.sendNotification(notification);
34+
} catch {
35+
// Silent fail - don't break execution
36+
}
37+
}
38+
39+
startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void {
40+
this.stop();
41+
const client = new ApifyClient({ token: apifyToken });
42+
let lastStatus = '';
43+
let lastStatusMessage = '';
44+
45+
this.intervalId = setInterval(async () => {
46+
try {
47+
const run = await client.run(runId).get();
48+
if (!run) return;
49+
50+
const { status, statusMessage } = run;
51+
52+
// Only send notification if status or statusMessage changed
53+
if (status !== lastStatus || statusMessage !== lastStatusMessage) {
54+
lastStatus = status;
55+
lastStatusMessage = statusMessage || '';
56+
57+
const message = statusMessage
58+
? `${actorName}: ${statusMessage}`
59+
: `${actorName}: ${status}`;
60+
61+
await this.updateProgress(message);
62+
63+
// Stop polling if actor finished
64+
if (status === 'SUCCEEDED' || status === 'FAILED' || status === 'ABORTED' || status === 'TIMED-OUT') {
65+
this.stop();
66+
}
67+
}
68+
} catch {
69+
// Silent fail - continue polling
70+
}
71+
}, PROGRESS_NOTIFICATION_INTERVAL_MS);
72+
}
73+
74+
stop(): void {
75+
if (this.intervalId) {
76+
clearInterval(this.intervalId);
77+
this.intervalId = undefined;
78+
}
79+
}
80+
}
81+
82+
export function createProgressTracker(
83+
progressToken: string | number | undefined,
84+
sendNotification: ((notification: ProgressNotification) => Promise<void>) | undefined,
85+
): ProgressTracker | null {
86+
if (!progressToken || !sendNotification) {
87+
return null;
88+
}
89+
90+
return new ProgressTracker(progressToken, sendNotification);
91+
}

tests/unit/utils.progress.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
import { ProgressTracker } from '../../src/utils/progress.js';
4+
5+
describe('ProgressTracker', () => {
6+
it('should send progress notifications correctly', async () => {
7+
const mockSendNotification = vi.fn();
8+
const progressToken = 'test-token-123';
9+
const tracker = new ProgressTracker(progressToken, mockSendNotification);
10+
11+
await tracker.updateProgress('Quarter done');
12+
13+
expect(mockSendNotification).toHaveBeenCalledWith({
14+
method: 'notifications/progress',
15+
params: {
16+
progressToken,
17+
progress: 1,
18+
message: 'Quarter done',
19+
},
20+
});
21+
});
22+
23+
it('should track actor run status updates', async () => {
24+
const mockSendNotification = vi.fn();
25+
const tracker = new ProgressTracker('test-token', mockSendNotification);
26+
27+
// Test with a simple manual update instead of mocking the full actor run flow
28+
await tracker.updateProgress('test-actor: READY');
29+
await tracker.updateProgress('test-actor: RUNNING');
30+
await tracker.updateProgress('test-actor: SUCCEEDED');
31+
32+
expect(mockSendNotification).toHaveBeenCalledTimes(3);
33+
expect(mockSendNotification).toHaveBeenNthCalledWith(1, {
34+
method: 'notifications/progress',
35+
params: {
36+
progressToken: 'test-token',
37+
progress: 1,
38+
message: 'test-actor: READY',
39+
},
40+
});
41+
expect(mockSendNotification).toHaveBeenNthCalledWith(3, {
42+
method: 'notifications/progress',
43+
params: {
44+
progressToken: 'test-token',
45+
progress: 3,
46+
message: 'test-actor: SUCCEEDED',
47+
},
48+
});
49+
});
50+
51+
it('should handle notification send errors gracefully', async () => {
52+
const mockSendNotification = vi.fn().mockRejectedValue(new Error('Network error'));
53+
const tracker = new ProgressTracker('test-token', mockSendNotification);
54+
55+
// Should not throw
56+
await expect(tracker.updateProgress('Test')).resolves.toBeUndefined();
57+
expect(mockSendNotification).toHaveBeenCalled();
58+
});
59+
});

0 commit comments

Comments
 (0)