Skip to content

Commit 0830490

Browse files
improvement(workflow-execution): perf improvements to passing workflow state + decrypted env vars (#2119)
* improvement(execution): load workflow state once instead of 2-3 times * decrypt only in get helper * remove comments * remove comments
1 parent 93e1c51 commit 0830490

File tree

3 files changed

+52
-40
lines changed

3 files changed

+52
-40
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,13 +348,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
348348
workspaceId: workflow.workspaceId,
349349
})
350350

351+
let cachedWorkflowData: {
352+
blocks: Record<string, any>
353+
edges: any[]
354+
loops: Record<string, any>
355+
parallels: Record<string, any>
356+
} | null = null
357+
351358
let processedInput = input
352359
try {
353360
const workflowData = shouldUseDraftState
354361
? await loadWorkflowFromNormalizedTables(workflowId)
355362
: await loadDeployedWorkflowState(workflowId)
356363

357364
if (workflowData) {
365+
cachedWorkflowData = {
366+
blocks: workflowData.blocks,
367+
edges: workflowData.edges,
368+
loops: workflowData.loops || {},
369+
parallels: workflowData.parallels || {},
370+
}
371+
358372
const serializedWorkflow = new Serializer().serializeWorkflow(
359373
workflowData.blocks,
360374
workflowData.edges,
@@ -402,6 +416,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
402416
)
403417
}
404418

419+
const effectiveWorkflowStateOverride = workflowStateOverride || cachedWorkflowData || undefined
420+
405421
if (!enableSSE) {
406422
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
407423
try {
@@ -414,7 +430,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
414430
triggerType,
415431
useDraftState: shouldUseDraftState,
416432
startTime: new Date().toISOString(),
417-
workflowStateOverride,
433+
workflowStateOverride: effectiveWorkflowStateOverride,
418434
}
419435

420436
const snapshot = new ExecutionSnapshot(
@@ -479,8 +495,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
479495
logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`)
480496
} else {
481497
logger.info(`[${requestId}] Using streaming API response`)
482-
const deployedData = await loadDeployedWorkflowState(workflowId)
483-
const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {})
498+
499+
const resolvedSelectedOutputs = resolveOutputIds(
500+
selectedOutputs,
501+
cachedWorkflowData?.blocks || {}
502+
)
484503
const stream = await createStreamingResponse({
485504
requestId,
486505
workflow: {
@@ -677,7 +696,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
677696
triggerType,
678697
useDraftState: shouldUseDraftState,
679698
startTime: new Date().toISOString(),
680-
workflowStateOverride,
699+
workflowStateOverride: effectiveWorkflowStateOverride,
681700
}
682701

683702
const snapshot = new ExecutionSnapshot(

apps/sim/lib/environment/utils.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,18 @@ export async function getPersonalAndWorkspaceEnv(
6767
const workspaceEncrypted: Record<string, string> = (workspaceRows[0]?.variables as any) || {}
6868

6969
const decryptAll = async (src: Record<string, string>) => {
70-
const out: Record<string, string> = {}
71-
for (const [k, v] of Object.entries(src)) {
72-
try {
73-
const { decrypted } = await decryptSecret(v)
74-
out[k] = decrypted
75-
} catch {
76-
out[k] = ''
77-
}
78-
}
79-
return out
70+
const entries = Object.entries(src)
71+
const results = await Promise.all(
72+
entries.map(async ([k, v]) => {
73+
try {
74+
const { decrypted } = await decryptSecret(v)
75+
return [k, decrypted] as const
76+
} catch {
77+
return [k, ''] as const
78+
}
79+
})
80+
)
81+
return Object.fromEntries(results)
8082
}
8183

8284
const [personalDecrypted, workspaceDecrypted] = await Promise.all([

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
99
import { createLogger } from '@/lib/logs/console/logger'
1010
import type { LoggingSession } from '@/lib/logs/execution/logging-session'
1111
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
12-
import { decryptSecret } from '@/lib/utils'
1312
import {
1413
loadDeployedWorkflowState,
1514
loadWorkflowFromNormalizedTables,
@@ -153,38 +152,37 @@ export async function executeWorkflowCore(
153152
// Merge block states
154153
const mergedStates = mergeSubblockState(blocks)
155154

156-
// Get and decrypt environment variables
157-
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
158-
userId,
159-
providedWorkspaceId
160-
)
155+
const { personalEncrypted, workspaceEncrypted, personalDecrypted, workspaceDecrypted } =
156+
await getPersonalAndWorkspaceEnv(userId, providedWorkspaceId)
157+
158+
// Use encrypted values for logging (don't log decrypted secrets)
161159
const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted })
162160

161+
// Use already-decrypted values for execution (no redundant decryption)
162+
const decryptedEnvVars: Record<string, string> = { ...personalDecrypted, ...workspaceDecrypted }
163+
163164
await loggingSession.safeStart({
164165
userId,
165166
workspaceId: providedWorkspaceId,
166167
variables,
167168
skipLogCreation, // Skip if resuming an existing execution
168169
})
169170

170-
// Process block states with env var substitution
171-
const currentBlockStates = await Object.entries(mergedStates).reduce(
172-
async (accPromise, [id, block]) => {
173-
const acc = await accPromise
174-
acc[id] = await Object.entries(block.subBlocks).reduce(
175-
async (subAccPromise, [key, subBlock]) => {
176-
const subAcc = await subAccPromise
171+
// Process block states with env var substitution using pre-decrypted values
172+
const currentBlockStates = Object.entries(mergedStates).reduce(
173+
(acc, [id, block]) => {
174+
acc[id] = Object.entries(block.subBlocks).reduce(
175+
(subAcc, [key, subBlock]) => {
177176
let value = subBlock.value
178177

179178
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
180179
const matches = value.match(/{{([^}]+)}}/g)
181180
if (matches) {
182181
for (const match of matches) {
183182
const varName = match.slice(2, -2)
184-
const encryptedValue = variables[varName]
185-
if (encryptedValue) {
186-
const { decrypted } = await decryptSecret(encryptedValue)
187-
value = (value as string).replace(match, decrypted)
183+
const decryptedValue = decryptedEnvVars[varName]
184+
if (decryptedValue !== undefined) {
185+
value = (value as string).replace(match, decryptedValue)
188186
}
189187
}
190188
}
@@ -193,20 +191,13 @@ export async function executeWorkflowCore(
193191
subAcc[key] = value
194192
return subAcc
195193
},
196-
Promise.resolve({} as Record<string, any>)
194+
{} as Record<string, any>
197195
)
198196
return acc
199197
},
200-
Promise.resolve({} as Record<string, Record<string, any>>)
198+
{} as Record<string, Record<string, any>>
201199
)
202200

203-
// Decrypt all env vars
204-
const decryptedEnvVars: Record<string, string> = {}
205-
for (const [key, encryptedValue] of Object.entries(variables)) {
206-
const { decrypted } = await decryptSecret(encryptedValue)
207-
decryptedEnvVars[key] = decrypted
208-
}
209-
210201
// Process response format
211202
const processedBlockStates = Object.entries(currentBlockStates).reduce(
212203
(acc, [blockId, blockState]) => {

0 commit comments

Comments
 (0)