Skip to content

Commit 7de721e

Browse files
fix(logs): logging with error issues for model costs (#2169)
* fix(async-execution): restore async executions * fix schedules trace span collection' * fix execution trace spans for schedules + cost tracking when workflow errors
1 parent 3e83fb3 commit 7de721e

File tree

4 files changed

+100
-20
lines changed

4 files changed

+100
-20
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import { tasks } from '@trigger.dev/sdk'
12
import { type NextRequest, NextResponse } from 'next/server'
23
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
34
import { z } from 'zod'
45
import { checkHybridAuth } from '@/lib/auth/hybrid'
6+
import { env, isTruthy } from '@/lib/core/config/env'
57
import { generateRequestId } from '@/lib/core/utils/request'
68
import { SSE_HEADERS } from '@/lib/core/utils/sse'
9+
import { getBaseUrl } from '@/lib/core/utils/urls'
710
import { processInputFileFields } from '@/lib/execution/files'
811
import { preprocessExecution } from '@/lib/execution/preprocessing'
912
import { createLogger } from '@/lib/logs/console/logger'
@@ -17,6 +20,7 @@ import {
1720
} from '@/lib/workflows/persistence/utils'
1821
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
1922
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
23+
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
2024
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
2125
import type { StreamingExecution } from '@/executor/types'
2226
import { Serializer } from '@/serializer'
@@ -217,6 +221,64 @@ function resolveOutputIds(
217221
})
218222
}
219223

224+
type AsyncExecutionParams = {
225+
requestId: string
226+
workflowId: string
227+
userId: string
228+
input: any
229+
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
230+
}
231+
232+
/**
233+
* Handles async workflow execution by queueing a background job.
234+
* Returns immediately with a 202 Accepted response containing the job ID.
235+
*/
236+
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
237+
const { requestId, workflowId, userId, input, triggerType } = params
238+
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
239+
240+
if (!useTrigger) {
241+
logger.warn(`[${requestId}] Async mode requested but TRIGGER_DEV_ENABLED is false`)
242+
return NextResponse.json(
243+
{ error: 'Async execution is not enabled. Set TRIGGER_DEV_ENABLED=true to use async mode.' },
244+
{ status: 400 }
245+
)
246+
}
247+
248+
const payload: WorkflowExecutionPayload = {
249+
workflowId,
250+
userId,
251+
input,
252+
triggerType,
253+
}
254+
255+
try {
256+
const handle = await tasks.trigger('workflow-execution', payload)
257+
258+
logger.info(`[${requestId}] Queued async workflow execution`, {
259+
workflowId,
260+
jobId: handle.id,
261+
})
262+
263+
return NextResponse.json(
264+
{
265+
success: true,
266+
async: true,
267+
jobId: handle.id,
268+
message: 'Workflow execution queued',
269+
statusUrl: `${getBaseUrl()}/api/jobs/${handle.id}`,
270+
},
271+
{ status: 202 }
272+
)
273+
} catch (error: any) {
274+
logger.error(`[${requestId}] Failed to queue async execution`, error)
275+
return NextResponse.json(
276+
{ error: `Failed to queue async execution: ${error.message}` },
277+
{ status: 500 }
278+
)
279+
}
280+
}
281+
220282
/**
221283
* POST /api/workflows/[id]/execute
222284
*
@@ -291,6 +353,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
291353

292354
const streamHeader = req.headers.get('X-Stream-Response') === 'true'
293355
const enableSSE = streamHeader || streamParam === true
356+
const executionModeHeader = req.headers.get('X-Execution-Mode')
357+
const isAsyncMode = executionModeHeader === 'async'
294358

295359
logger.info(`[${requestId}] Starting server-side execution`, {
296360
workflowId,
@@ -301,6 +365,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
301365
streamParam,
302366
streamHeader,
303367
enableSSE,
368+
isAsyncMode,
304369
})
305370

306371
const executionId = uuidv4()
@@ -349,6 +414,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
349414
workspaceId: workflow.workspaceId,
350415
})
351416

417+
if (isAsyncMode) {
418+
return handleAsyncExecution({
419+
requestId,
420+
workflowId,
421+
userId: actorUserId,
422+
input,
423+
triggerType: loggingTriggerType,
424+
})
425+
}
426+
352427
let cachedWorkflowData: {
353428
blocks: Record<string, any>
354429
edges: any[]

apps/sim/background/schedule-execution.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
99
import { preprocessExecution } from '@/lib/execution/preprocessing'
1010
import { createLogger } from '@/lib/logs/console/logger'
1111
import { LoggingSession } from '@/lib/logs/execution/logging-session'
12+
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
1213
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
1314
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
1415
import {
@@ -22,6 +23,7 @@ import {
2223
getSubBlockValue,
2324
} from '@/lib/workflows/schedules/utils'
2425
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
26+
import type { ExecutionResult } from '@/executor/types'
2527
import { mergeSubblockState } from '@/stores/workflows/server-utils'
2628

2729
const logger = createLogger('TriggerScheduleExecution')
@@ -293,14 +295,17 @@ async function runWorkflowExecution({
293295
)
294296

295297
try {
298+
const executionResult = (earlyError as any)?.executionResult as ExecutionResult | undefined
299+
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
300+
296301
await loggingSession.safeCompleteWithError({
297302
error: {
298303
message: `Schedule execution failed: ${
299304
earlyError instanceof Error ? earlyError.message : String(earlyError)
300305
}`,
301306
stackTrace: earlyError instanceof Error ? earlyError.stack : undefined,
302307
},
303-
traceSpans: [],
308+
traceSpans,
304309
})
305310
} catch (loggingError) {
306311
logger.error(`[${requestId}] Failed to complete log entry for schedule failure`, loggingError)

apps/sim/lib/logs/execution/logging-factory.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,9 @@ export function calculateCostSummary(traceSpans: any[]): {
118118
totalCost += span.cost.total || 0
119119
totalInputCost += span.cost.input || 0
120120
totalOutputCost += span.cost.output || 0
121-
const promptTokens = span.tokens?.prompt ?? span.tokens?.input ?? 0
122-
const completionTokens = span.tokens?.completion ?? span.tokens?.output ?? 0
123121
totalTokens += span.tokens?.total || 0
124-
totalPromptTokens += promptTokens
125-
totalCompletionTokens += completionTokens
122+
totalPromptTokens += span.tokens?.input ?? span.tokens?.prompt ?? 0
123+
totalCompletionTokens += span.tokens?.output ?? span.tokens?.completion ?? 0
126124

127125
if (span.model) {
128126
const model = span.model
@@ -137,8 +135,8 @@ export function calculateCostSummary(traceSpans: any[]): {
137135
models[model].input += span.cost.input || 0
138136
models[model].output += span.cost.output || 0
139137
models[model].total += span.cost.total || 0
140-
models[model].tokens.prompt += promptTokens
141-
models[model].tokens.completion += completionTokens
138+
models[model].tokens.prompt += span.tokens?.input ?? span.tokens?.prompt ?? 0
139+
models[model].tokens.completion += span.tokens?.output ?? span.tokens?.completion ?? 0
142140
models[model].tokens.total += span.tokens?.total || 0
143141
}
144142
}

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -186,21 +186,23 @@ export class LoggingSession {
186186
const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0
187187
const startTime = new Date(endTime.getTime() - Math.max(1, durationMs))
188188

189-
const costSummary = {
190-
totalCost: BASE_EXECUTION_CHARGE,
191-
totalInputCost: 0,
192-
totalOutputCost: 0,
193-
totalTokens: 0,
194-
totalPromptTokens: 0,
195-
totalCompletionTokens: 0,
196-
baseExecutionCharge: BASE_EXECUTION_CHARGE,
197-
modelCost: 0,
198-
models: {},
199-
}
189+
const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0
200190

201-
const message = error?.message || 'Execution failed before starting blocks'
191+
const costSummary = hasProvidedSpans
192+
? calculateCostSummary(traceSpans)
193+
: {
194+
totalCost: BASE_EXECUTION_CHARGE,
195+
totalInputCost: 0,
196+
totalOutputCost: 0,
197+
totalTokens: 0,
198+
totalPromptTokens: 0,
199+
totalCompletionTokens: 0,
200+
baseExecutionCharge: BASE_EXECUTION_CHARGE,
201+
modelCost: 0,
202+
models: {},
203+
}
202204

203-
const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0
205+
const message = error?.message || 'Execution failed before starting blocks'
204206

205207
const errorSpan: TraceSpan = {
206208
id: 'workflow-error-root',

0 commit comments

Comments
 (0)