Skip to content
17 changes: 16 additions & 1 deletion src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,22 @@ export class ActorsMcpServer {

try {
log.info('Calling Actor', { actorName: actorTool.actorFullName, input: args });
const { runId, datasetId, items } = await callActorGetDataset(
const result = await callActorGetDataset(
actorTool.actorFullName,
args,
apifyToken as string,
callOptions,
progressTracker,
extra.signal,
);

if (!result) {
// If the actor was aborted by the client, we don't want to return anything
return { };
}

const { runId, datasetId, items } = result;

const content = [
{ type: 'text', text: `Actor finished with runId: ${runId}, datasetId ${datasetId}` },
];
Expand All @@ -540,6 +549,12 @@ export class ActorsMcpServer {
});
content.push(...itemContents);
return { content };
} catch (error) {
if (error instanceof Error && error.message === 'Operation cancelled') {
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
return { };
}
} finally {
if (progressTracker) {
progressTracker.stop();
Expand Down
57 changes: 52 additions & 5 deletions src/tools/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,73 @@ export type CallActorGetDatasetResult = {
* @param {unknown} input - The input to pass to the actor.
* @param {string} apifyToken - The Apify token to use for authentication.
* @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates.
* @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items.
* @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run.
* @returns {Promise<CallActorGetDatasetResult | null>} - A promise that resolves to an object containing the actor run and dataset items.
* @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set
*/
export async function callActorGetDataset(
actorName: string,
input: unknown,
apifyToken: string,
callOptions: ActorCallOptions | undefined,
progressTracker?: ProgressTracker | null,
): Promise<CallActorGetDatasetResult>; // Without abort signal Result or Error is returned
export async function callActorGetDataset(
actorName: string,
input: unknown,
apifyToken: string,
callOptions: ActorCallOptions | undefined,
progressTracker: ProgressTracker | null,
abortSignal: AbortSignal,
): Promise<CallActorGetDatasetResult | null>; // With abort signal, null is returned if the actor was aborted by the client
export async function callActorGetDataset(
actorName: string,
input: unknown,
apifyToken: string,
callOptions: ActorCallOptions | undefined = undefined,
progressTracker?: ProgressTracker | null,
): Promise<CallActorGetDatasetResult> {
abortSignal?: AbortSignal,
): Promise<CallActorGetDatasetResult | null> {
const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort
try {
const client = new ApifyClient({ token: apifyToken });
const actorClient = client.actor(actorName);

// Start the actor run but don't wait for completion
// Start the actor run
const actorRun: ActorRun = await actorClient.start(input, callOptions);

// Start progress tracking if tracker is provided
if (progressTracker) {
progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName);
}

// Wait for the actor to complete
const completedRun = await client.run(actorRun.id).waitForFinish();
// Create abort promise that handles both API abort and race rejection
const abortPromise = async () => new Promise<typeof CLIENT_ABORT>((resolve) => {
abortSignal?.addEventListener('abort', async () => {
// Abort the actor run via API
try {
await client.run(actorRun.id).abort({ gracefully: true });
} catch (e) {
log.error('Error aborting Actor run', { error: e, runId: actorRun.id });
}
// Reject to stop waiting
resolve(CLIENT_ABORT);
}, { once: true });
});

// Wait for completion or cancellation
const potentialAbortedRun = await Promise.race([
client.run(actorRun.id).waitForFinish(),
...(abortSignal ? [abortPromise()] : []),
]);

if (potentialAbortedRun === CLIENT_ABORT) {
log.info('Actor run aborted by client', { actorName, input });
return null;
}
const completedRun = potentialAbortedRun as ActorRun;

// Process the completed run
const dataset = client.dataset(completedRun.defaultDatasetId);
const [items, defaultBuild] = await Promise.all([
dataset.listItems(),
Expand Down Expand Up @@ -362,6 +404,11 @@ The step parameter enforces this workflow - you cannot call an Actor without fir

return { content };
} catch (error) {
if (error instanceof Error && error.message === 'Operation cancelled') {
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
return { };
}
log.error('Error with Actor operation', { error, actorName, step });
return {
content: [
Expand Down
50 changes: 48 additions & 2 deletions tests/integration/suite.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { Client } from '@modelcontextprotocol/sdk/client/index.js';
import type { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { ToolListChangedNotificationSchema } from '@modelcontextprotocol/sdk/types.js';
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest';
import { CallToolResultSchema, ToolListChangedNotificationSchema } from '@modelcontextprotocol/sdk/types.js';
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';

import { ApifyClient } from '../../src/apify-client.js';
import { defaults, HelperTools } from '../../src/const.js';
import { addTool } from '../../src/tools/helpers.js';
import { defaultTools, toolCategories } from '../../src/tools/index.js';
Expand Down Expand Up @@ -629,6 +630,51 @@ export function createIntegrationTestsSuite(
await client.close();
});

// Cancellation test: start a long-running actor and cancel immediately, then verify it was aborted
it('should abort actor run when request is cancelled', async () => {
const ACTOR_NAME = 'michal.kalita/test-timeout';
const selectedToolName = actorNameToToolName(ACTOR_NAME);
const client = await createClientFn({ enableAddingActors: true });

// Add actor as tool
await addActor(client, ACTOR_NAME);

// Build request and cancel immediately via AbortController
const controller = new AbortController();

const requestPromise = client.request({
method: 'tools/call' as const,
params: {
name: selectedToolName,
arguments: { timeout: 30 },
},
}, CallToolResultSchema, { signal: controller.signal })
// Ignores error "AbortError: This operation was aborted"
.catch(() => undefined);

// Abort right away
setTimeout(() => controller.abort(), 1000);

// Ensure the request completes/cancels before proceeding
await requestPromise;

// Verify via Apify API that a recent run for this actor was aborted
const api = new ApifyClient({ token: process.env.APIFY_TOKEN as string });
const actor = await api.actor(ACTOR_NAME).get();
expect(actor).toBeDefined();
const actId = actor!.id as string;

// Poll up to 30s for the latest run for this actor to reach ABORTED/ABORTING
await vi.waitUntil(async () => {
const runsList = await api.runs().list({ limit: 5, desc: true });
const run = runsList.items.find((r) => r.actId === actId);
if (run) {
return run.status === 'ABORTED' || run.status === 'ABORTING';
}
return false;
}, { timeout: 30000, interval: 1000 });
});

// Environment variable tests - only applicable to stdio transport
it.runIf(options.transport === 'stdio')('should load actors from ACTORS environment variable', async () => {
const actors = ['apify/python-example', 'apify/rag-web-browser'];
Expand Down