Skip to content

Commit 544780c

Browse files
icecrasher321waleedlatif1
authored andcommitted
cleanup streaming server
1 parent 20e2246 commit 544780c

File tree

5 files changed

+325
-312
lines changed

5 files changed

+325
-312
lines changed

apps/sim/app/api/chat/[identifier]/route.test.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,6 @@ vi.mock('@/lib/core/utils/request', () => ({
7070
generateRequestId: vi.fn().mockReturnValue('test-request-id'),
7171
}))
7272

73-
vi.mock('@/app/api/workflows/[id]/execute/route', () => ({
74-
createFilteredResult: vi.fn().mockImplementation((result: any) => ({
75-
...result,
76-
logs: undefined,
77-
metadata: result.metadata
78-
? {
79-
...result.metadata,
80-
workflowConnections: undefined,
81-
}
82-
: undefined,
83-
})),
84-
}))
85-
8673
describe('Chat Identifier API Route', () => {
8774
const mockAddCorsHeaders = vi.fn().mockImplementation((response) => response)
8875
const mockValidateChatAuth = vi.fn().mockResolvedValue({ authorized: true })

apps/sim/app/api/chat/[identifier]/route.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ export async function POST(
206206

207207
const { createStreamingResponse } = await import('@/lib/workflows/streaming/streaming')
208208
const { SSE_HEADERS } = await import('@/lib/core/utils/sse')
209-
const { createFilteredResult } = await import('@/app/api/workflows/[id]/execute/route')
210209

211210
const workflowInput: any = { input, conversationId }
212211
if (files && Array.isArray(files) && files.length > 0) {
@@ -267,7 +266,6 @@ export async function POST(
267266
isSecureMode: true,
268267
workflowTriggerType: 'chat',
269268
},
270-
createFilteredResult,
271269
executionId,
272270
})
273271

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -49,126 +49,6 @@ const ExecuteWorkflowSchema = z.object({
4949
export const runtime = 'nodejs'
5050
export const dynamic = 'force-dynamic'
5151

52-
/**
53-
* Execute workflow with streaming support - used by chat and other streaming endpoints
54-
*
55-
* This function assumes preprocessing has already been completed.
56-
* Callers must run preprocessExecution() first to validate workflow, check usage limits,
57-
* and resolve actor before calling this function.
58-
*
59-
* This is a wrapper function that:
60-
* - Supports streaming callbacks (onStream, onBlockComplete)
61-
* - Returns ExecutionResult instead of NextResponse
62-
* - Handles pause/resume logic
63-
*
64-
* Used by:
65-
* - Chat execution (/api/chat/[identifier]/route.ts)
66-
* - Streaming responses (lib/workflows/streaming.ts)
67-
*/
68-
export async function executeWorkflow(
69-
workflow: any,
70-
requestId: string,
71-
input: any | undefined,
72-
actorUserId: string,
73-
streamConfig?: {
74-
enabled: boolean
75-
selectedOutputs?: string[]
76-
isSecureMode?: boolean
77-
workflowTriggerType?: 'api' | 'chat'
78-
onStream?: (streamingExec: any) => Promise<void>
79-
onBlockComplete?: (blockId: string, output: any) => Promise<void>
80-
skipLoggingComplete?: boolean
81-
},
82-
providedExecutionId?: string
83-
): Promise<any> {
84-
const workflowId = workflow.id
85-
const executionId = providedExecutionId || uuidv4()
86-
const triggerType = streamConfig?.workflowTriggerType || 'api'
87-
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
88-
89-
try {
90-
const metadata: ExecutionMetadata = {
91-
requestId,
92-
executionId,
93-
workflowId,
94-
workspaceId: workflow.workspaceId,
95-
userId: actorUserId,
96-
workflowUserId: workflow.userId,
97-
triggerType,
98-
useDraftState: false,
99-
startTime: new Date().toISOString(),
100-
isClientSession: false,
101-
}
102-
103-
const snapshot = new ExecutionSnapshot(
104-
metadata,
105-
workflow,
106-
input,
107-
workflow.variables || {},
108-
streamConfig?.selectedOutputs || []
109-
)
110-
111-
const result = await executeWorkflowCore({
112-
snapshot,
113-
callbacks: {
114-
onStream: streamConfig?.onStream,
115-
onBlockComplete: streamConfig?.onBlockComplete
116-
? async (blockId: string, _blockName: string, _blockType: string, output: any) => {
117-
await streamConfig.onBlockComplete!(blockId, output)
118-
}
119-
: undefined,
120-
},
121-
loggingSession,
122-
})
123-
124-
if (result.status === 'paused') {
125-
if (!result.snapshotSeed) {
126-
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
127-
executionId,
128-
})
129-
} else {
130-
await PauseResumeManager.persistPauseResult({
131-
workflowId,
132-
executionId,
133-
pausePoints: result.pausePoints || [],
134-
snapshotSeed: result.snapshotSeed,
135-
executorUserId: result.metadata?.userId,
136-
})
137-
}
138-
} else {
139-
await PauseResumeManager.processQueuedResumes(executionId)
140-
}
141-
142-
if (streamConfig?.skipLoggingComplete) {
143-
return {
144-
...result,
145-
_streamingMetadata: {
146-
loggingSession,
147-
processedInput: input,
148-
},
149-
}
150-
}
151-
152-
return result
153-
} catch (error: any) {
154-
logger.error(`[${requestId}] Workflow execution failed:`, error)
155-
throw error
156-
}
157-
}
158-
159-
export function createFilteredResult(result: any) {
160-
return {
161-
...result,
162-
logs: undefined,
163-
metadata: result.metadata
164-
? {
165-
...result.metadata,
166-
workflowConnections: undefined,
167-
}
168-
: undefined,
169-
}
170-
}
171-
17252
function resolveOutputIds(
17353
selectedOutputs: string[] | undefined,
17454
blocks: Record<string, any>
@@ -606,7 +486,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
606486
isSecureMode: false,
607487
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
608488
},
609-
createFilteredResult,
610489
executionId,
611490
})
612491

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { v4 as uuidv4 } from 'uuid'
2+
import { createLogger } from '@/lib/logs/console/logger'
3+
import { LoggingSession } from '@/lib/logs/execution/logging-session'
4+
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
5+
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
6+
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
7+
8+
const logger = createLogger('WorkflowExecution')
9+
10+
export interface ExecuteWorkflowOptions {
11+
enabled: boolean
12+
selectedOutputs?: string[]
13+
isSecureMode?: boolean
14+
workflowTriggerType?: 'api' | 'chat'
15+
onStream?: (streamingExec: any) => Promise<void>
16+
onBlockComplete?: (blockId: string, output: any) => Promise<void>
17+
skipLoggingComplete?: boolean
18+
}
19+
20+
export interface WorkflowInfo {
21+
id: string
22+
userId: string
23+
workspaceId?: string | null
24+
isDeployed?: boolean
25+
variables?: Record<string, any>
26+
}
27+
28+
export async function executeWorkflow(
29+
workflow: WorkflowInfo,
30+
requestId: string,
31+
input: any | undefined,
32+
actorUserId: string,
33+
streamConfig?: ExecuteWorkflowOptions,
34+
providedExecutionId?: string
35+
): Promise<any> {
36+
if (!workflow.workspaceId) {
37+
throw new Error(`Workflow ${workflow.id} has no workspaceId`)
38+
}
39+
40+
const workflowId = workflow.id
41+
const workspaceId = workflow.workspaceId
42+
const executionId = providedExecutionId || uuidv4()
43+
const triggerType = streamConfig?.workflowTriggerType || 'api'
44+
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
45+
46+
try {
47+
const metadata: ExecutionMetadata = {
48+
requestId,
49+
executionId,
50+
workflowId,
51+
workspaceId,
52+
userId: actorUserId,
53+
workflowUserId: workflow.userId,
54+
triggerType,
55+
useDraftState: false,
56+
startTime: new Date().toISOString(),
57+
isClientSession: false,
58+
}
59+
60+
const snapshot = new ExecutionSnapshot(
61+
metadata,
62+
workflow,
63+
input,
64+
workflow.variables || {},
65+
streamConfig?.selectedOutputs || []
66+
)
67+
68+
const result = await executeWorkflowCore({
69+
snapshot,
70+
callbacks: {
71+
onStream: streamConfig?.onStream,
72+
onBlockComplete: streamConfig?.onBlockComplete
73+
? async (blockId: string, _blockName: string, _blockType: string, output: any) => {
74+
await streamConfig.onBlockComplete!(blockId, output)
75+
}
76+
: undefined,
77+
},
78+
loggingSession,
79+
})
80+
81+
if (result.status === 'paused') {
82+
if (!result.snapshotSeed) {
83+
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
84+
executionId,
85+
})
86+
} else {
87+
await PauseResumeManager.persistPauseResult({
88+
workflowId,
89+
executionId,
90+
pausePoints: result.pausePoints || [],
91+
snapshotSeed: result.snapshotSeed,
92+
executorUserId: result.metadata?.userId,
93+
})
94+
}
95+
} else {
96+
await PauseResumeManager.processQueuedResumes(executionId)
97+
}
98+
99+
if (streamConfig?.skipLoggingComplete) {
100+
return {
101+
...result,
102+
_streamingMetadata: {
103+
loggingSession,
104+
processedInput: input,
105+
},
106+
}
107+
}
108+
109+
return result
110+
} catch (error: any) {
111+
logger.error(`[${requestId}] Workflow execution failed:`, error)
112+
throw error
113+
}
114+
}

0 commit comments

Comments
 (0)