Skip to content

Commit d6aa264

Browse files
committed
feat: progress notification
1 parent d993b10 commit d6aa264

File tree

5 files changed

+246
-18
lines changed

5 files changed

+246
-18
lines changed

src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ if (STANDBY_MODE) {
5656
await Actor.fail('If you need to debug a specific Actor, please provide the debugActor and debugActorInput fields in the input');
5757
}
5858
const options = { memory: input.maxActorMemoryBytes } as ActorCallOptions;
59-
const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options);
59+
const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options, null);
6060

6161
await Actor.pushData(items);
6262
log.info(`Pushed ${items.count} items to the dataset`);

src/mcp/server.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
import { addRemoveTools, betaTools, callActorGetDataset, defaultTools, getActorsAsTools } from '../tools/index.js';
2727
import { actorNameToToolName, decodeDotPropertyNames } from '../tools/utils.js';
2828
import type { ActorMcpTool, ActorTool, HelperTool, ToolEntry } from '../types.js';
29+
import { createProgressTracker } from '../utils/progress.js';
2930
import { connectMCPClient } from './client.js';
3031
import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC } from './const.js';
3132
import { processParamsGetTools } from './utils.js';
@@ -425,6 +426,16 @@ export class ActorsMcpServer {
425426
// Handle internal tool
426427
if (tool.type === 'internal') {
427428
const internalTool = tool.tool as HelperTool;
429+
430+
// Only create progress tracker for call-actor tool
431+
const progressTracker = internalTool.name === 'call-actor'
432+
? createProgressTracker(progressToken, extra.sendNotification)
433+
: null;
434+
435+
if (progressTracker) {
436+
await progressTracker.updateProgress(0, `Starting: ${internalTool.name}`);
437+
}
438+
428439
const res = await internalTool.call({
429440
args,
430441
extra,
@@ -434,6 +445,10 @@ export class ActorsMcpServer {
434445
userRentedActorIds,
435446
}) as object;
436447

448+
if (progressTracker) {
449+
await progressTracker.complete(`Completed: ${internalTool.name}`);
450+
}
451+
437452
return { ...res };
438453
}
439454

@@ -479,21 +494,37 @@ export class ActorsMcpServer {
479494
if (tool.type === 'actor') {
480495
const actorTool = tool.tool as ActorTool;
481496

497+
// Create progress tracker if progressToken is available
498+
const progressTracker = createProgressTracker(progressToken, extra.sendNotification);
499+
482500
const callOptions: ActorCallOptions = { memory: actorTool.memoryMbytes };
483-
const { items } = await callActorGetDataset(
484-
actorTool.actorFullName,
485-
args,
486-
apifyToken as string,
487-
callOptions,
488-
);
489-
return {
490-
content: items.items.map((item: Record<string, unknown>) => {
491-
return {
492-
type: 'text',
493-
text: JSON.stringify(item),
494-
};
495-
}),
496-
};
501+
502+
try {
503+
const { items } = await callActorGetDataset(
504+
actorTool.actorFullName,
505+
args,
506+
apifyToken as string,
507+
callOptions,
508+
progressTracker,
509+
);
510+
511+
if (progressTracker) {
512+
await progressTracker.complete(`Completed: ${actorTool.actorFullName}`);
513+
}
514+
515+
return {
516+
content: items.items.map((item: Record<string, unknown>) => {
517+
return {
518+
type: 'text',
519+
text: JSON.stringify(item),
520+
};
521+
}),
522+
};
523+
} finally {
524+
if (progressTracker) {
525+
progressTracker.stopPeriodicUpdates();
526+
}
527+
}
497528
}
498529
} catch (error) {
499530
if (error instanceof ApifyApiError) {

src/tools/actor.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { actorDefinitionPrunedCache } from '../state.js';
1919
import type { ActorDefinitionStorage, ActorInfo, InternalTool, ToolEntry } from '../types.js';
2020
import { getActorDefinitionStorageFieldNames } from '../utils/actor.js';
2121
import { getValuesByDotKeys } from '../utils/generic.js';
22+
import type { ProgressTracker } from '../utils/progress.js';
2223
import { getActorDefinition } from './build.js';
2324
import {
2425
actorNameToToolName,
@@ -50,6 +51,7 @@ export type CallActorGetDatasetResult = {
5051
* @param {ActorCallOptions} callOptions - The options to pass to the actor.
5152
* @param {unknown} input - The input to pass to the actor.
5253
* @param {string} apifyToken - The Apify token to use for authentication.
54+
* @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates.
5355
* @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items.
5456
* @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set
5557
*/
@@ -58,16 +60,26 @@ export async function callActorGetDataset(
5860
input: unknown,
5961
apifyToken: string,
6062
callOptions: ActorCallOptions | undefined = undefined,
63+
progressTracker?: ProgressTracker | null,
6164
): Promise<CallActorGetDatasetResult> {
6265
try {
6366
log.info(`Calling Actor ${actorName} with input: ${JSON.stringify(input)}`);
6467

6568
const client = new ApifyClient({ token: apifyToken });
6669
const actorClient = client.actor(actorName);
6770

68-
const actorRun: ActorRun = await actorClient.call(input, callOptions);
69-
const dataset = client.dataset(actorRun.defaultDatasetId);
70-
// const dataset = client.dataset('Ehtn0Y4wIKviFT2WB');
71+
// Start the actor run but don't wait for completion
72+
const actorRun: ActorRun = await actorClient.start(input, callOptions);
73+
74+
// Start progress tracking if tracker is provided
75+
if (progressTracker) {
76+
progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName);
77+
}
78+
79+
// Wait for the actor to complete
80+
const completedRun = await client.run(actorRun.id).waitForFinish();
81+
82+
const dataset = client.dataset(completedRun.defaultDatasetId);
7183
const [items, defaultBuild] = await Promise.all([
7284
dataset.listItems(),
7385
(await actorClient.defaultBuild()).get(),
@@ -356,6 +368,7 @@ You can only use actors that are included in the list; actors not in the list ca
356368
input,
357369
apifyToken,
358370
callOptions,
371+
null,
359372
);
360373

361374
return {

src/utils/progress.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js';
2+
3+
import { ApifyClient } from '../apify-client.js';
4+
5+
export class ProgressTracker {
6+
private progressToken: string | number;
7+
private sendNotification: (notification: ProgressNotification) => Promise<void>;
8+
private currentProgress = 0;
9+
private total = 100;
10+
private intervalId?: NodeJS.Timeout;
11+
12+
constructor(
13+
progressToken: string | number,
14+
sendNotification: (notification: ProgressNotification) => Promise<void>,
15+
total = 100,
16+
) {
17+
this.progressToken = progressToken;
18+
this.sendNotification = sendNotification;
19+
this.total = total;
20+
}
21+
22+
async updateProgress(progress: number, message?: string): Promise<void> {
23+
this.currentProgress = Math.min(progress, this.total);
24+
25+
try {
26+
const notification: ProgressNotification = {
27+
method: 'notifications/progress' as const,
28+
params: {
29+
progressToken: this.progressToken,
30+
progress: this.currentProgress,
31+
total: this.total,
32+
...(message && { message }),
33+
},
34+
};
35+
36+
await this.sendNotification(notification);
37+
} catch {
38+
// Silent fail - don't break execution
39+
}
40+
}
41+
42+
startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void {
43+
this.stopPeriodicUpdates();
44+
const client = new ApifyClient({ token: apifyToken });
45+
let lastStatus = '';
46+
let lastStatusMessage = '';
47+
48+
this.intervalId = setInterval(async () => {
49+
try {
50+
const run = await client.run(runId).get();
51+
if (!run) return;
52+
53+
const { status, statusMessage } = run;
54+
55+
// Only send notification if status or statusMessage changed
56+
if (status !== lastStatus || statusMessage !== lastStatusMessage) {
57+
lastStatus = status;
58+
lastStatusMessage = statusMessage || '';
59+
60+
// Calculate progress based on status
61+
let progress = 0;
62+
if (status === 'RUNNING') progress = 50;
63+
else if (status === 'SUCCEEDED') progress = 100;
64+
else if (status === 'FAILED') progress = 100;
65+
66+
const message = statusMessage
67+
? `${actorName}: ${statusMessage}`
68+
: `${actorName}: ${status}`;
69+
70+
await this.updateProgress(progress, message);
71+
72+
// Stop polling if actor finished
73+
if (status === 'SUCCEEDED' || status === 'FAILED' || status === 'ABORTED' || status === 'TIMED-OUT') {
74+
this.stopPeriodicUpdates();
75+
}
76+
}
77+
} catch {
78+
// Silent fail - continue polling
79+
}
80+
}, 5_000);
81+
}
82+
83+
stopPeriodicUpdates(): void {
84+
if (this.intervalId) {
85+
clearInterval(this.intervalId);
86+
this.intervalId = undefined;
87+
}
88+
}
89+
90+
async complete(message = 'Completed'): Promise<void> {
91+
this.stopPeriodicUpdates();
92+
await this.updateProgress(this.total, message);
93+
}
94+
}
95+
96+
export function createProgressTracker(
97+
progressToken: string | number | undefined,
98+
sendNotification: ((notification: ProgressNotification) => Promise<void>) | undefined,
99+
): ProgressTracker | null {
100+
if (!progressToken || !sendNotification) {
101+
return null;
102+
}
103+
104+
return new ProgressTracker(progressToken, sendNotification);
105+
}

tests/unit/utils.progress.test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
import { ProgressTracker } from '../../src/utils/progress.js';
4+
5+
describe('ProgressTracker', () => {
6+
it('should send progress notifications correctly', async () => {
7+
const mockSendNotification = vi.fn();
8+
const progressToken = 'test-token-123';
9+
const tracker = new ProgressTracker(progressToken, mockSendNotification, 100);
10+
11+
await tracker.updateProgress(25, 'Quarter done');
12+
13+
expect(mockSendNotification).toHaveBeenCalledWith({
14+
method: 'notifications/progress',
15+
params: {
16+
progressToken,
17+
progress: 25,
18+
total: 100,
19+
message: 'Quarter done',
20+
},
21+
});
22+
});
23+
24+
it('should track actor run status updates', async () => {
25+
const mockSendNotification = vi.fn();
26+
const tracker = new ProgressTracker('test-token', mockSendNotification, 100);
27+
28+
// Test with a simple manual update instead of mocking the full actor run flow
29+
await tracker.updateProgress(0, 'test-actor: READY');
30+
await tracker.updateProgress(50, 'test-actor: RUNNING');
31+
await tracker.updateProgress(100, 'test-actor: SUCCEEDED');
32+
33+
expect(mockSendNotification).toHaveBeenCalledTimes(3);
34+
expect(mockSendNotification).toHaveBeenNthCalledWith(1, {
35+
method: 'notifications/progress',
36+
params: {
37+
progressToken: 'test-token',
38+
progress: 0,
39+
total: 100,
40+
message: 'test-actor: READY',
41+
},
42+
});
43+
expect(mockSendNotification).toHaveBeenNthCalledWith(3, {
44+
method: 'notifications/progress',
45+
params: {
46+
progressToken: 'test-token',
47+
progress: 100,
48+
total: 100,
49+
message: 'test-actor: SUCCEEDED',
50+
},
51+
});
52+
});
53+
54+
it('should complete correctly', async () => {
55+
const mockSendNotification = vi.fn();
56+
const tracker = new ProgressTracker('test-token', mockSendNotification, 100);
57+
58+
await tracker.complete('All done!');
59+
60+
expect(mockSendNotification).toHaveBeenCalledWith({
61+
method: 'notifications/progress',
62+
params: {
63+
progressToken: 'test-token',
64+
progress: 100,
65+
total: 100,
66+
message: 'All done!',
67+
},
68+
});
69+
});
70+
71+
it('should handle notification send errors gracefully', async () => {
72+
const mockSendNotification = vi.fn().mockRejectedValue(new Error('Network error'));
73+
const tracker = new ProgressTracker('test-token', mockSendNotification);
74+
75+
// Should not throw
76+
await expect(tracker.updateProgress(50, 'Test')).resolves.toBeUndefined();
77+
expect(mockSendNotification).toHaveBeenCalled();
78+
});
79+
});

0 commit comments

Comments
 (0)