Skip to content

Commit 7e81757

Browse files
shagun-singh-inkeepdimaMachina
authored andcommitted
Fix scheduled trigger invocations being skipped (#2777)
* Fix scheduled trigger invocations being skipped when trigger is edited without changing the next execution time * claude comments
1 parent b653e82 commit 7e81757

File tree

4 files changed

+151
-1
lines changed

4 files changed

+151
-1
lines changed

.changeset/full-shirts-shop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@inkeep/agents-api": patch
3+
---
4+
5+
Fix scheduled trigger invocations being skipped when trigger is edited without changing the next execution time

agents-api/src/domains/run/workflow/functions/scheduledTriggerRunner.ts

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
markCompletedStep,
3737
markFailedStep,
3838
markRunningStep,
39+
resetInvocationToPendingStep,
3940
} from '../steps/scheduledTriggerSteps';
4041

4142
const logger = getLogger('workflow-scheduled-trigger-runner');
@@ -215,14 +216,86 @@ async function _scheduledTriggerRunnerWorkflow(payload: ScheduledTriggerRunnerPa
215216
});
216217
invocation = result.invocation;
217218

218-
if (isOneTime && result.alreadyExists && invocation.status !== 'pending') {
219+
if (
220+
isOneTime &&
221+
result.alreadyExists &&
222+
invocation.status !== 'pending' &&
223+
invocation.status !== 'cancelled'
224+
) {
219225
await logStep('One-time trigger already executed', {
220226
scheduledTriggerId,
221227
invocationId: invocation.id,
222228
status: invocation.status,
223229
});
224230
return { status: 'already_executed', invocationId: invocation.id };
225231
}
232+
233+
if (result.alreadyExists && invocation.status === 'cancelled') {
234+
const scheduledTime = new Date(scheduledFor).getTime();
235+
const now = Date.now();
236+
237+
if (scheduledTime > now) {
238+
await logStep('Cancelled idempotent invocation still in future, resetting to pending', {
239+
scheduledTriggerId,
240+
invocationId: invocation.id,
241+
scheduledFor,
242+
});
243+
let resetResult: Awaited<ReturnType<typeof resetInvocationToPendingStep>> | undefined;
244+
try {
245+
resetResult = await resetInvocationToPendingStep({
246+
tenantId,
247+
projectId,
248+
agentId,
249+
scheduledTriggerId,
250+
invocationId: invocation.id,
251+
});
252+
} catch (err) {
253+
await logStep('Failed to reset cancelled invocation to pending', {
254+
scheduledTriggerId,
255+
invocationId: invocation.id,
256+
error: err instanceof Error ? err.message : String(err),
257+
});
258+
throw err;
259+
}
260+
261+
if (!resetResult) {
262+
await logStep('Reset skipped — invocation status changed concurrently, exiting', {
263+
scheduledTriggerId,
264+
invocationId: invocation.id,
265+
});
266+
return { status: 'skipped' as const, reason: 'concurrent_status_change' };
267+
}
268+
invocation = { ...invocation, status: 'pending' };
269+
} else {
270+
await logStep('Cancelled idempotent invocation in the past, skipping to next iteration', {
271+
scheduledTriggerId,
272+
invocationId: invocation.id,
273+
scheduledFor,
274+
});
275+
276+
if (!isOneTime) {
277+
try {
278+
await startNextIterationStep({
279+
tenantId,
280+
projectId,
281+
agentId,
282+
scheduledTriggerId,
283+
lastScheduledFor: scheduledFor,
284+
currentRunId: runnerId,
285+
});
286+
} catch (err) {
287+
await logStep('Failed to chain after cancelled idempotent invocation', {
288+
scheduledTriggerId,
289+
invocationId: invocation.id,
290+
error: err instanceof Error ? err.message : String(err),
291+
});
292+
throw err;
293+
}
294+
}
295+
296+
return { status: 'skipped_cancelled', invocationId: invocation.id };
297+
}
298+
}
226299
}
227300

228301
await logStep('Got next pending invocation', {

agents-api/src/domains/run/workflow/steps/scheduledTriggerSteps.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
markScheduledTriggerInvocationFailed,
2222
markScheduledTriggerInvocationRunning,
2323
type Part,
24+
resetCancelledInvocationToPending,
2425
resolveRef,
2526
type ScheduledTriggerInvocation,
2627
updateScheduledTriggerInvocationStatus,
@@ -489,6 +490,45 @@ export async function incrementAttemptStep(params: {
489490
});
490491
}
491492

493+
/**
494+
* Step: Reset a cancelled invocation back to pending.
495+
* Used when a restarted workflow finds a cancelled invocation via idempotency
496+
* that is still scheduled for a future time.
497+
*/
498+
export async function resetInvocationToPendingStep(params: {
499+
tenantId: string;
500+
projectId: string;
501+
agentId: string;
502+
scheduledTriggerId: string;
503+
invocationId: string;
504+
}) {
505+
'use step';
506+
507+
const updated = await resetCancelledInvocationToPending(runDbClient)({
508+
scopes: {
509+
tenantId: params.tenantId,
510+
projectId: params.projectId,
511+
agentId: params.agentId,
512+
},
513+
scheduledTriggerId: params.scheduledTriggerId,
514+
invocationId: params.invocationId,
515+
});
516+
517+
if (updated) {
518+
logger.info(
519+
{ scheduledTriggerId: params.scheduledTriggerId, invocationId: params.invocationId },
520+
'Reset cancelled invocation to pending'
521+
);
522+
} else {
523+
logger.warn(
524+
{ scheduledTriggerId: params.scheduledTriggerId, invocationId: params.invocationId },
525+
'Skipped reset — invocation status changed concurrently (no longer cancelled)'
526+
);
527+
}
528+
529+
return updated;
530+
}
531+
492532
/**
493533
* Step: Execute the scheduled trigger using executeAgentAsync.
494534
*

packages/agents-core/src/data-access/runtime/scheduledTriggerInvocations.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,38 @@ export const markScheduledTriggerInvocationFailed =
278278
return result[0] as ScheduledTriggerInvocation | undefined;
279279
};
280280

281+
/**
282+
* Reset a cancelled invocation back to pending.
283+
* Only updates if the current status is 'cancelled' to prevent race conditions.
284+
*/
285+
export const resetCancelledInvocationToPending =
286+
(db: AgentsRunDatabaseClient) =>
287+
async (params: {
288+
scopes: AgentScopeConfig;
289+
scheduledTriggerId: string;
290+
invocationId: string;
291+
}): Promise<ScheduledTriggerInvocation | undefined> => {
292+
const result = await db
293+
.update(scheduledTriggerInvocations)
294+
.set({
295+
status: 'pending',
296+
attemptNumber: 1,
297+
startedAt: null,
298+
completedAt: null,
299+
})
300+
.where(
301+
and(
302+
agentScopedWhere(scheduledTriggerInvocations, params.scopes),
303+
eq(scheduledTriggerInvocations.scheduledTriggerId, params.scheduledTriggerId),
304+
eq(scheduledTriggerInvocations.id, params.invocationId),
305+
eq(scheduledTriggerInvocations.status, 'cancelled')
306+
)
307+
)
308+
.returning();
309+
310+
return result[0] as ScheduledTriggerInvocation | undefined;
311+
};
312+
281313
/**
282314
* Add a conversation ID to the invocation's conversationIds array
283315
* Used to track all conversations created during retries

0 commit comments

Comments
 (0)