Skip to content
10 changes: 9 additions & 1 deletion src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class ActorsMcpServer {
capabilities: {
tools: { listChanged: true },
prompts: { },
logging: {},
// logging: {}, // Because of error in inspector `Server declares logging capability but doesn't implement method: "logging/setLevel"`
},
},
);
Expand Down Expand Up @@ -554,7 +554,9 @@ export class ActorsMcpServer {
apifyToken as string,
callOptions,
progressTracker,
extra.signal,
);

const content = [
{ type: 'text', text: `Actor finished with runId: ${runId}, datasetId ${datasetId}` },
];
Expand All @@ -564,6 +566,12 @@ export class ActorsMcpServer {
});
content.push(...itemContents);
return { content };
} catch (error) {
if (error instanceof Error && error.message === 'Operation cancelled') {
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
return { };
}
} finally {
if (progressTracker) {
progressTracker.stop();
Expand Down
37 changes: 33 additions & 4 deletions src/tools/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export type CallActorGetDatasetResult = {
* @param {unknown} input - The input to pass to the actor.
* @param {string} apifyToken - The Apify token to use for authentication.
* @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates.
* @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items.
* @returns {Promise<CallActorGetDatasetResult>} - A promise that resolves to an object containing the actor run and dataset items.
* @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set
*/
export async function callActorGetDataset(
Expand All @@ -58,22 +58,46 @@ export async function callActorGetDataset(
apifyToken: string,
callOptions: ActorCallOptions | undefined = undefined,
progressTracker?: ProgressTracker | null,
abortSignal?: AbortSignal,
): Promise<CallActorGetDatasetResult> {
try {
const client = new ApifyClient({ token: apifyToken });
const actorClient = client.actor(actorName);

// Start the actor run but don't wait for completion
// Check if already aborted
if (abortSignal?.aborted) {
throw new Error('Operation cancelled');
}

// Start the actor run
const actorRun: ActorRun = await actorClient.start(input, callOptions);

// Start progress tracking if tracker is provided
if (progressTracker) {
progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName);
}

// Wait for the actor to complete
const completedRun = await client.run(actorRun.id).waitForFinish();
// Create abort promise that handles both API abort and race rejection
const abortPromise = async () => new Promise<never>((_, reject) => {
abortSignal?.addEventListener('abort', async () => {
// Abort the actor run via API
try {
await client.run(actorRun.id).abort({ gracefully: true });
} catch (e) {
log.debug('Error aborting Actor run', { error: e, runId: actorRun.id });
}
// Reject to stop waiting
reject(new Error('Operation cancelled'));
}, { once: true });
});

// Wait for completion or cancellation
const completedRun = await Promise.race([
client.run(actorRun.id).waitForFinish(),
...(abortSignal ? [abortPromise()] : []),
]);

// Process the completed run
const dataset = client.dataset(completedRun.defaultDatasetId);
const [items, defaultBuild] = await Promise.all([
dataset.listItems(),
Expand Down Expand Up @@ -330,6 +354,11 @@ export const callActor: ToolEntry = {
})),
};
} catch (error) {
if (error instanceof Error && error.message === 'Operation cancelled') {
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
return { };
}
log.error('Error calling Actor', { error });
return {
content: [
Expand Down
52 changes: 50 additions & 2 deletions tests/integration/suite.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { Client } from '@modelcontextprotocol/sdk/client/index.js';
import type { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { ToolListChangedNotificationSchema } from '@modelcontextprotocol/sdk/types.js';
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest';
import { CallToolResultSchema, ToolListChangedNotificationSchema } from '@modelcontextprotocol/sdk/types.js';
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';

import { ApifyClient } from '../../src/apify-client.js';
import { defaults, HelperTools } from '../../src/const.js';
import { latestNewsOnTopicPrompt } from '../../src/prompts/latest-news-on-topic.js';
import { addRemoveTools, defaultTools, toolCategories, toolCategoriesEnabledByDefault } from '../../src/tools/index.js';
Expand Down Expand Up @@ -493,5 +494,52 @@ export function createIntegrationTestsSuite(
await (client.transport as StreamableHTTPClientTransport).terminateSession();
await client.close();
});

// Cancellation test: start a long-running actor and cancel immediately, then verify it was aborted
it('should abort actor run when request is cancelled', async () => {
const ACTOR_NAME = 'michal.kalita/test-timeout';
const selectedToolName = actorNameToToolName(ACTOR_NAME);
const client = await createClientFn({ enableAddingActors: true });

// Add actor as tool
await addActor(client, ACTOR_NAME);

// Build request and cancel immediately via AbortController
const controller = new AbortController();

const requestPromise = client.request({
method: 'tools/call' as const,
params: {
name: selectedToolName,
arguments: { timeout: 30 },
},
}, CallToolResultSchema, { signal: controller.signal })
// Ignores error "AbortError: This operation was aborted"
.catch(() => undefined);

// Abort right away
setTimeout(() => controller.abort(), 1000);

// Ensure the request completes/cancels before proceeding
await requestPromise;

// Verify via Apify API that a recent run for this actor was aborted
const api = new ApifyClient({ token: process.env.APIFY_TOKEN as string });
const actor = await api.actor(ACTOR_NAME).get();
expect(actor).toBeDefined();
const actId = actor!.id as string;

// Poll up to 30s for the latest run for this actor to reach ABORTED/ABORTING
await vi.waitUntil(async () => {
const runsList = await api.runs().list({ limit: 5, desc: true });
const run = runsList.items.find((r) => r.actId === actId);
if (run) {
return run.status === 'ABORTED' || run.status === 'ABORTING';
}
return false;
}, { timeout: 30000, interval: 1000 });

await client.close();
});
});
}