Skip to content

Commit 25ac917

Browse files
fix(workflow-block): clearing child workflow input format field must lazy cascade parent workflow state deletion (#2038)
1 parent d51a756 commit 25ac917

File tree

2 files changed

+175
-1
lines changed

2 files changed

+175
-1
lines changed

apps/sim/executor/handlers/workflow/workflow-handler.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type {
1212
} from '@/executor/types'
1313
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
1414
import { parseJSON } from '@/executor/utils/json'
15+
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
1516
import { Serializer } from '@/serializer'
1617
import type { SerializedBlock } from '@/serializer/types'
1718
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
@@ -86,7 +87,15 @@ export class WorkflowBlockHandler implements BlockHandler {
8687
const normalized = parseJSON(inputs.inputMapping, inputs.inputMapping)
8788

8889
if (normalized && typeof normalized === 'object' && !Array.isArray(normalized)) {
89-
childWorkflowInput = normalized as Record<string, any>
90+
// Perform lazy cleanup: remove orphaned fields from inputMapping
91+
// that no longer exist in the child workflow's inputFormat
92+
const cleanedMapping = await lazyCleanupInputMapping(
93+
ctx.workflowId || 'unknown',
94+
block.id,
95+
normalized,
96+
childWorkflow.rawBlocks || {}
97+
)
98+
childWorkflowInput = cleanedMapping as Record<string, any>
9099
} else {
91100
childWorkflowInput = {}
92101
}
@@ -209,6 +218,7 @@ export class WorkflowBlockHandler implements BlockHandler {
209218
name: workflowData.name,
210219
serializedState: serializedWorkflow,
211220
variables: workflowVariables,
221+
rawBlocks: workflowState.blocks,
212222
}
213223
}
214224

@@ -281,6 +291,7 @@ export class WorkflowBlockHandler implements BlockHandler {
281291
name: wfData?.name || DEFAULTS.WORKFLOW_NAME,
282292
serializedState: serializedWorkflow,
283293
variables: workflowVariables,
294+
rawBlocks: deployedState.blocks,
284295
}
285296
}
286297

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import { db } from '@sim/db'
2+
import { workflowBlocks } from '@sim/db/schema'
3+
import { and, eq } from 'drizzle-orm'
4+
import { createLogger } from '@/lib/logs/console/logger'
5+
6+
const logger = createLogger('LazyCleanup')
7+
8+
/**
9+
* Extract valid field names from a child workflow's start block inputFormat
10+
*
11+
* @param childWorkflowBlocks - The blocks from the child workflow state
12+
* @returns Set of valid field names defined in the child's inputFormat
13+
*/
14+
function extractValidInputFieldNames(childWorkflowBlocks: Record<string, any>): Set<string> | null {
15+
const validFieldNames = new Set<string>()
16+
17+
const startBlock = Object.values(childWorkflowBlocks).find((block: any) => {
18+
const blockType = block?.type
19+
return blockType === 'start_trigger' || blockType === 'input_trigger' || blockType === 'starter'
20+
})
21+
22+
if (!startBlock) {
23+
logger.debug('No start block found in child workflow')
24+
return null
25+
}
26+
27+
const inputFormat =
28+
(startBlock as any)?.subBlocks?.inputFormat?.value ??
29+
(startBlock as any)?.config?.params?.inputFormat
30+
31+
if (!Array.isArray(inputFormat)) {
32+
logger.debug('No inputFormat array found in child workflow start block')
33+
return null
34+
}
35+
36+
// Extract field names
37+
for (const field of inputFormat) {
38+
if (field?.name && typeof field.name === 'string') {
39+
const fieldName = field.name.trim()
40+
if (fieldName) {
41+
validFieldNames.add(fieldName)
42+
}
43+
}
44+
}
45+
46+
return validFieldNames
47+
}
48+
49+
/**
50+
* Clean up orphaned inputMapping fields that don't exist in child workflow's inputFormat.
51+
* This is a lazy cleanup that only runs at execution time and only persists if changes are needed.
52+
*
53+
* @param parentWorkflowId - The parent workflow ID
54+
* @param parentBlockId - The workflow block ID in the parent
55+
* @param currentInputMapping - The current inputMapping value from the parent block
56+
* @param childWorkflowBlocks - The blocks from the child workflow
57+
* @returns The cleaned inputMapping (only different if cleanup was needed)
58+
*/
59+
export async function lazyCleanupInputMapping(
60+
parentWorkflowId: string,
61+
parentBlockId: string,
62+
currentInputMapping: any,
63+
childWorkflowBlocks: Record<string, any>
64+
): Promise<any> {
65+
try {
66+
if (
67+
!currentInputMapping ||
68+
typeof currentInputMapping !== 'object' ||
69+
Array.isArray(currentInputMapping)
70+
) {
71+
return currentInputMapping
72+
}
73+
74+
const validFieldNames = extractValidInputFieldNames(childWorkflowBlocks)
75+
76+
if (!validFieldNames || validFieldNames.size === 0) {
77+
logger.debug('Child workflow has no inputFormat fields, skipping cleanup')
78+
return currentInputMapping
79+
}
80+
81+
const orphanedFields: string[] = []
82+
for (const fieldName of Object.keys(currentInputMapping)) {
83+
if (!validFieldNames.has(fieldName)) {
84+
orphanedFields.push(fieldName)
85+
}
86+
}
87+
88+
if (orphanedFields.length === 0) {
89+
return currentInputMapping
90+
}
91+
92+
const cleanedMapping: Record<string, any> = {}
93+
for (const [fieldName, fieldValue] of Object.entries(currentInputMapping)) {
94+
if (validFieldNames.has(fieldName)) {
95+
cleanedMapping[fieldName] = fieldValue
96+
}
97+
}
98+
99+
logger.info(
100+
`Lazy cleanup: Removing ${orphanedFields.length} orphaned field(s) from inputMapping in workflow ${parentWorkflowId}, block ${parentBlockId}: ${orphanedFields.join(', ')}`
101+
)
102+
103+
persistCleanedMapping(parentWorkflowId, parentBlockId, cleanedMapping).catch((error) => {
104+
logger.error('Failed to persist cleaned inputMapping:', error)
105+
})
106+
107+
return cleanedMapping
108+
} catch (error) {
109+
logger.error('Error in lazy cleanup:', error)
110+
return currentInputMapping
111+
}
112+
}
113+
114+
/**
115+
* Persist the cleaned inputMapping to the database
116+
*
117+
* @param workflowId - The workflow ID
118+
* @param blockId - The block ID
119+
* @param cleanedMapping - The cleaned inputMapping value
120+
*/
121+
async function persistCleanedMapping(
122+
workflowId: string,
123+
blockId: string,
124+
cleanedMapping: Record<string, any>
125+
): Promise<void> {
126+
try {
127+
await db.transaction(async (tx) => {
128+
const [block] = await tx
129+
.select({ subBlocks: workflowBlocks.subBlocks })
130+
.from(workflowBlocks)
131+
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
132+
.limit(1)
133+
134+
if (!block) {
135+
logger.warn(`Block ${blockId} not found in workflow ${workflowId}, skipping persistence`)
136+
return
137+
}
138+
139+
const subBlocks = (block.subBlocks as Record<string, any>) || {}
140+
141+
if (subBlocks.inputMapping) {
142+
subBlocks.inputMapping = {
143+
...subBlocks.inputMapping,
144+
value: cleanedMapping,
145+
}
146+
147+
// Persist updated subBlocks
148+
await tx
149+
.update(workflowBlocks)
150+
.set({
151+
subBlocks: subBlocks,
152+
updatedAt: new Date(),
153+
})
154+
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
155+
156+
logger.info(`Successfully persisted cleaned inputMapping for block ${blockId}`)
157+
}
158+
})
159+
} catch (error) {
160+
logger.error('Error persisting cleaned mapping:', error)
161+
throw error
162+
}
163+
}

0 commit comments

Comments
 (0)