Skip to content

Commit c86f2a0

Browse files
authored
fix(copilot): fix execute workflow from diff store (#1894)
* Fix run from diff store * Fix copilot run workflow
1 parent 41f3d50 commit c86f2a0

File tree

6 files changed

+222
-14
lines changed

6 files changed

+222
-14
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ const ExecuteWorkflowSchema = z.object({
3030
useDraftState: z.boolean().optional(),
3131
input: z.any().optional(),
3232
startBlockId: z.string().optional(),
33+
// Optional workflow state override (for executing diff workflows)
34+
workflowStateOverride: z
35+
.object({
36+
blocks: z.record(z.any()),
37+
edges: z.array(z.any()),
38+
loops: z.record(z.any()).optional(),
39+
parallels: z.record(z.any()).optional(),
40+
})
41+
.optional(),
3342
})
3443

3544
export const runtime = 'nodejs'
@@ -310,14 +319,22 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
310319
stream: streamParam,
311320
useDraftState,
312321
input: validatedInput,
322+
workflowStateOverride,
313323
} = validation.data
314324

315325
// For API key auth, the entire body is the input (except for our control fields)
316326
// For session auth, the input is explicitly provided in the input field
317327
const input =
318328
auth.authType === 'api_key'
319329
? (() => {
320-
const { selectedOutputs, triggerType, stream, useDraftState, ...rest } = body
330+
const {
331+
selectedOutputs,
332+
triggerType,
333+
stream,
334+
useDraftState,
335+
workflowStateOverride,
336+
...rest
337+
} = body
321338
return Object.keys(rest).length > 0 ? rest : validatedInput
322339
})()
323340
: validatedInput
@@ -460,6 +477,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
460477
triggerType,
461478
useDraftState: shouldUseDraftState,
462479
startTime: new Date().toISOString(),
480+
workflowStateOverride,
463481
}
464482

465483
const snapshot = new ExecutionSnapshot(
@@ -714,6 +732,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
714732
triggerType,
715733
useDraftState: shouldUseDraftState,
716734
startTime: new Date().toISOString(),
735+
workflowStateOverride,
717736
}
718737

719738
const snapshot = new ExecutionSnapshot(

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { useCallback, useState } from 'react'
22
import { v4 as uuidv4 } from 'uuid'
3+
import { shallow } from 'zustand/shallow'
34
import { createLogger } from '@/lib/logs/console/logger'
45
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
56
import { processStreamingBlockLogs } from '@/lib/tokenization'
@@ -16,6 +17,7 @@ import { useExecutionStore } from '@/stores/execution/store'
1617
import { useVariablesStore } from '@/stores/panel/variables/store'
1718
import { useEnvironmentStore } from '@/stores/settings/environment/store'
1819
import { useTerminalConsoleStore } from '@/stores/terminal'
20+
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
1921
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
2022
import { mergeSubblockState } from '@/stores/workflows/utils'
2123
import { useCurrentWorkflow } from './use-current-workflow'
@@ -99,6 +101,26 @@ export function useWorkflowExecution() {
99101
} = useExecutionStore()
100102
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
101103
const executionStream = useExecutionStream()
104+
const {
105+
diffWorkflow: executionDiffWorkflow,
106+
isDiffReady: isDiffWorkflowReady,
107+
isShowingDiff: isViewingDiff,
108+
} = useWorkflowDiffStore(
109+
useCallback(
110+
(state) => ({
111+
diffWorkflow: state.diffWorkflow,
112+
isDiffReady: state.isDiffReady,
113+
isShowingDiff: state.isShowingDiff,
114+
}),
115+
[]
116+
),
117+
shallow
118+
)
119+
const hasActiveDiffWorkflow =
120+
isDiffWorkflowReady &&
121+
isViewingDiff &&
122+
!!executionDiffWorkflow &&
123+
Object.keys(executionDiffWorkflow.blocks || {}).length > 0
102124

103125
/**
104126
* Validates debug state before performing debug operations
@@ -645,8 +667,14 @@ export function useWorkflowExecution() {
645667
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
646668
overrideTriggerType?: 'chat' | 'manual' | 'api'
647669
): Promise<ExecutionResult | StreamingExecution> => {
648-
// Use currentWorkflow but check if we're in diff mode
649-
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow
670+
// Use diff workflow for execution when available, regardless of canvas view state
671+
const executionWorkflowState =
672+
hasActiveDiffWorkflow && executionDiffWorkflow ? executionDiffWorkflow : null
673+
const usingDiffForExecution = executionWorkflowState !== null
674+
const workflowBlocks = (executionWorkflowState?.blocks ??
675+
currentWorkflow.blocks) as typeof currentWorkflow.blocks
676+
const workflowEdges = (executionWorkflowState?.edges ??
677+
currentWorkflow.edges) as typeof currentWorkflow.edges
650678

651679
// Filter out blocks without type (these are layout-only blocks)
652680
const validBlocks = Object.entries(workflowBlocks).reduce(
@@ -665,6 +693,9 @@ export function useWorkflowExecution() {
665693

666694
logger.info('Executing workflow', {
667695
isDiffMode: currentWorkflow.isDiffMode,
696+
usingDiffForExecution,
697+
isViewingDiff,
698+
executingDiffWorkflow: usingDiffForExecution && isViewingDiff,
668699
isExecutingFromChat,
669700
totalBlocksCount: Object.keys(workflowBlocks).length,
670701
validBlocksCount: Object.keys(validBlocks).length,
@@ -838,6 +869,15 @@ export function useWorkflowExecution() {
838869
selectedOutputs,
839870
triggerType: overrideTriggerType || 'manual',
840871
useDraftState: true,
872+
// Pass diff workflow state if available for execution
873+
workflowStateOverride: executionWorkflowState
874+
? {
875+
blocks: executionWorkflowState.blocks,
876+
edges: executionWorkflowState.edges,
877+
loops: executionWorkflowState.loops,
878+
parallels: executionWorkflowState.parallels,
879+
}
880+
: undefined,
841881
callbacks: {
842882
onExecutionStarted: (data) => {
843883
logger.info('Server execution started:', data)
Lines changed: 136 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
import { v4 as uuidv4 } from 'uuid'
12
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
3+
import { useTerminalConsoleStore } from '@/stores/terminal'
4+
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
25
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
36

47
export interface WorkflowExecutionOptions {
@@ -11,7 +14,7 @@ export interface WorkflowExecutionOptions {
1114

1215
/**
1316
* Execute workflow with full logging (used by copilot tools)
14-
* This now delegates to the server-side executor via API
17+
* Handles SSE streaming and populates console logs in real-time
1518
*/
1619
export async function executeWorkflowWithFullLogging(
1720
options: WorkflowExecutionOptions = {}
@@ -22,25 +25,148 @@ export async function executeWorkflowWithFullLogging(
2225
throw new Error('No active workflow')
2326
}
2427

25-
// For copilot tool calls, we use non-SSE execution to get a simple result
28+
// Check if there's an active diff workflow to execute
29+
const { diffWorkflow, isDiffReady, isShowingDiff } = useWorkflowDiffStore.getState()
30+
const hasActiveDiffWorkflow =
31+
isDiffReady &&
32+
isShowingDiff &&
33+
!!diffWorkflow &&
34+
Object.keys(diffWorkflow.blocks || {}).length > 0
35+
36+
const executionId = options.executionId || uuidv4()
37+
const { addConsole } = useTerminalConsoleStore.getState()
38+
39+
// Build request payload
40+
const payload: any = {
41+
input: options.workflowInput,
42+
stream: true,
43+
triggerType: options.overrideTriggerType || 'manual',
44+
useDraftState: true,
45+
}
46+
47+
// Add diff workflow override if active
48+
if (hasActiveDiffWorkflow) {
49+
payload.workflowStateOverride = {
50+
blocks: diffWorkflow.blocks,
51+
edges: diffWorkflow.edges,
52+
loops: diffWorkflow.loops,
53+
parallels: diffWorkflow.parallels,
54+
}
55+
}
56+
2657
const response = await fetch(`/api/workflows/${activeWorkflowId}/execute`, {
2758
method: 'POST',
2859
headers: {
2960
'Content-Type': 'application/json',
3061
},
31-
body: JSON.stringify({
32-
input: options.workflowInput,
33-
stream: false, // Copilot doesn't need SSE streaming
34-
triggerType: options.overrideTriggerType || 'manual',
35-
useDraftState: true,
36-
}),
62+
body: JSON.stringify(payload),
3763
})
3864

3965
if (!response.ok) {
4066
const error = await response.json()
4167
throw new Error(error.error || 'Workflow execution failed')
4268
}
4369

44-
const result = await response.json()
45-
return result as ExecutionResult
70+
if (!response.body) {
71+
throw new Error('No response body')
72+
}
73+
74+
// Parse SSE stream
75+
const reader = response.body.getReader()
76+
const decoder = new TextDecoder()
77+
let buffer = ''
78+
let executionResult: ExecutionResult = {
79+
success: false,
80+
output: {},
81+
logs: [],
82+
}
83+
84+
try {
85+
while (true) {
86+
const { done, value } = await reader.read()
87+
if (done) break
88+
89+
buffer += decoder.decode(value, { stream: true })
90+
const lines = buffer.split('\n\n')
91+
buffer = lines.pop() || ''
92+
93+
for (const line of lines) {
94+
if (!line.trim() || !line.startsWith('data: ')) continue
95+
96+
const data = line.substring(6).trim()
97+
if (data === '[DONE]') continue
98+
99+
try {
100+
const event = JSON.parse(data)
101+
102+
switch (event.type) {
103+
case 'block:completed':
104+
addConsole({
105+
input: event.data.input || {},
106+
output: event.data.output,
107+
success: true,
108+
durationMs: event.data.durationMs,
109+
startedAt: new Date(Date.now() - event.data.durationMs).toISOString(),
110+
endedAt: new Date().toISOString(),
111+
workflowId: activeWorkflowId,
112+
blockId: event.data.blockId,
113+
executionId,
114+
blockName: event.data.blockName,
115+
blockType: event.data.blockType,
116+
iterationCurrent: event.data.iterationCurrent,
117+
iterationTotal: event.data.iterationTotal,
118+
iterationType: event.data.iterationType,
119+
})
120+
121+
if (options.onBlockComplete) {
122+
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
123+
}
124+
break
125+
126+
case 'block:error':
127+
addConsole({
128+
input: event.data.input || {},
129+
output: {},
130+
success: false,
131+
error: event.data.error,
132+
durationMs: event.data.durationMs,
133+
startedAt: new Date(Date.now() - event.data.durationMs).toISOString(),
134+
endedAt: new Date().toISOString(),
135+
workflowId: activeWorkflowId,
136+
blockId: event.data.blockId,
137+
executionId,
138+
blockName: event.data.blockName,
139+
blockType: event.data.blockType,
140+
iterationCurrent: event.data.iterationCurrent,
141+
iterationTotal: event.data.iterationTotal,
142+
iterationType: event.data.iterationType,
143+
})
144+
break
145+
146+
case 'execution:completed':
147+
executionResult = {
148+
success: event.data.success,
149+
output: event.data.output,
150+
logs: [],
151+
metadata: {
152+
duration: event.data.duration,
153+
startTime: event.data.startTime,
154+
endTime: event.data.endTime,
155+
},
156+
}
157+
break
158+
159+
case 'execution:error':
160+
throw new Error(event.data.error || 'Execution failed')
161+
}
162+
} catch (parseError) {
163+
// Skip malformed SSE events
164+
}
165+
}
166+
}
167+
} finally {
168+
reader.releaseLock()
169+
}
170+
171+
return executionResult
46172
}

apps/sim/executor/execution/snapshot.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ export interface ExecutionMetadata {
1313
startTime: string
1414
pendingBlocks?: string[]
1515
resumeFromSnapshot?: boolean
16+
workflowStateOverride?: {
17+
blocks: Record<string, any>
18+
edges: Edge[]
19+
loops?: Record<string, any>
20+
parallels?: Record<string, any>
21+
}
1622
}
1723

1824
export interface ExecutionCallbacks {

apps/sim/hooks/use-execution-stream.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ export interface ExecuteStreamOptions {
6161
startBlockId?: string
6262
triggerType?: string
6363
useDraftState?: boolean
64+
workflowStateOverride?: {
65+
blocks: Record<string, any>
66+
edges: any[]
67+
loops?: Record<string, any>
68+
parallels?: Record<string, any>
69+
}
6470
callbacks?: ExecutionStreamCallbacks
6571
}
6672

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,18 @@ export async function executeWorkflowCore(
114114
let loops
115115
let parallels
116116

117-
if (useDraftState) {
117+
// Use workflowStateOverride if provided (for diff workflows)
118+
if (metadata.workflowStateOverride) {
119+
blocks = metadata.workflowStateOverride.blocks
120+
edges = metadata.workflowStateOverride.edges
121+
loops = metadata.workflowStateOverride.loops || {}
122+
parallels = metadata.workflowStateOverride.parallels || {}
123+
124+
logger.info(`[${requestId}] Using workflow state override (diff workflow execution)`, {
125+
blocksCount: Object.keys(blocks).length,
126+
edgesCount: edges.length,
127+
})
128+
} else if (useDraftState) {
118129
const draftData = await loadWorkflowFromNormalizedTables(workflowId)
119130

120131
if (!draftData) {

0 commit comments

Comments
 (0)