Skip to content
8 changes: 5 additions & 3 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ if (STANDBY_MODE) {
await Actor.fail('If you need to debug a specific Actor, please provide the debugActor and debugActorInput fields in the input');
}
const options = { memory: input.maxActorMemoryBytes } as ActorCallOptions;
const { items } = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options);
const result = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options);

await Actor.pushData(items);
log.info('Pushed items to dataset', { itemCount: items.count });
if (result && result.items) {
await Actor.pushData(result.items);
log.info('Pushed items to dataset', { itemCount: result.items.count });
}
await Actor.exit();
}

Expand Down
12 changes: 11 additions & 1 deletion src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,23 @@ export class ActorsMcpServer {

try {
log.info('Calling Actor', { actorName: actorTool.actorFullName, input: args });
const { runId, datasetId, items } = await callActorGetDataset(
const result = await callActorGetDataset(
actorTool.actorFullName,
args,
apifyToken as string,
callOptions,
progressTracker,
extra.signal,
);

if (!result) {
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
return { };
}

const { runId, datasetId, items } = result;

const content = [
{ type: 'text', text: `Actor finished with runId: ${runId}, datasetId ${datasetId}` },
];
Expand Down
50 changes: 43 additions & 7 deletions src/tools/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ export type CallActorGetDatasetResult = {
* @param {unknown} input - The input to pass to the actor.
* @param {string} apifyToken - The Apify token to use for authentication.
* @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates.
* @returns {Promise<{ actorRun: any, items: object[] }>} - A promise that resolves to an object containing the actor run and dataset items.
* @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run.
* @returns {Promise<CallActorGetDatasetResult | null>} - A promise that resolves to an object containing the actor run and dataset items.
* @throws {Error} - Throws an error if the `APIFY_TOKEN` is not set
*/
export async function callActorGetDataset(
Expand All @@ -54,22 +55,48 @@ export async function callActorGetDataset(
apifyToken: string,
callOptions: ActorCallOptions | undefined = undefined,
progressTracker?: ProgressTracker | null,
): Promise<CallActorGetDatasetResult> {
abortSignal?: AbortSignal,
): Promise<CallActorGetDatasetResult | null> {
const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort
try {
const client = new ApifyClient({ token: apifyToken });
const actorClient = client.actor(actorName);

// Start the actor run but don't wait for completion
// Start the actor run
const actorRun: ActorRun = await actorClient.start(input, callOptions);

// Start progress tracking if tracker is provided
if (progressTracker) {
progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName);
}

// Wait for the actor to complete
const completedRun = await client.run(actorRun.id).waitForFinish();
// Create abort promise that handles both API abort and race rejection
const abortPromise = async () => new Promise<typeof CLIENT_ABORT>((resolve) => {
abortSignal?.addEventListener('abort', async () => {
// Abort the actor run via API
try {
await client.run(actorRun.id).abort({ gracefully: false });
} catch (e) {
log.error('Error aborting Actor run', { error: e, runId: actorRun.id });
}
// Reject to stop waiting
resolve(CLIENT_ABORT);
}, { once: true });
});

// Wait for completion or cancellation
const potentialAbortedRun = await Promise.race([
client.run(actorRun.id).waitForFinish(),
...(abortSignal ? [abortPromise()] : []),
]);

if (potentialAbortedRun === CLIENT_ABORT) {
log.info('Actor run aborted by client', { actorName, input });
return null;
}
const completedRun = potentialAbortedRun as ActorRun;

// Process the completed run
const dataset = client.dataset(completedRun.defaultDatasetId);
const [items, defaultBuild] = await Promise.all([
dataset.listItems(),
Expand Down Expand Up @@ -293,7 +320,7 @@ The step parameter enforces this workflow - you cannot call an Actor without fir
inputSchema: zodToJsonSchema(callActorArgs),
ajvValidate: ajv.compile(zodToJsonSchema(callActorArgs)),
call: async (toolArgs) => {
const { args, apifyToken, progressTracker } = toolArgs;
const { args, apifyToken, progressTracker, extra } = toolArgs;
const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args);

try {
Expand Down Expand Up @@ -342,14 +369,23 @@ The step parameter enforces this workflow - you cannot call an Actor without fir
}
}

const { runId, datasetId, items } = await callActorGetDataset(
const result = await callActorGetDataset(
actorName,
input,
apifyToken,
callOptions,
progressTracker,
extra.signal,
);

if (!result) {
// Receivers of cancellation notifications SHOULD NOT send a response for the cancelled request
// https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/cancellation#behavior-requirements
return { };
}

const { runId, datasetId, items } = result;

const content = [
{ type: 'text', text: `Actor finished with runId: ${runId}, datasetId ${datasetId}` },
];
Expand Down
Loading