Skip to content

Commit 47da5eb

Browse files
fix(rehydration): consolidate store rehydration code (#1249)
* fix(rehydration): consolidate store rehydration code * fix stale closure
1 parent 37dcde2 commit 47da5eb

File tree

2 files changed

+95
-147
lines changed

2 files changed

+95
-147
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ export function useSubBlockValue<T = any>(
125125
return
126126
}
127127

128+
const currentActiveWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
129+
if (!currentActiveWorkflowId) {
130+
logger.warn('No active workflow ID when setting value', { blockId, subBlockId })
131+
return
132+
}
133+
128134
// Use deep comparison to avoid unnecessary updates for complex objects
129135
if (!isEqual(valueRef.current, newValue)) {
130136
valueRef.current = newValue
@@ -147,10 +153,10 @@ export function useSubBlockValue<T = any>(
147153
useSubBlockStore.setState((state) => ({
148154
workflowValues: {
149155
...state.workflowValues,
150-
[activeWorkflowId || '']: {
151-
...state.workflowValues[activeWorkflowId || ''],
156+
[currentActiveWorkflowId]: {
157+
...state.workflowValues[currentActiveWorkflowId],
152158
[blockId]: {
153-
...state.workflowValues[activeWorkflowId || '']?.[blockId],
159+
...state.workflowValues[currentActiveWorkflowId]?.[blockId],
154160
[subBlockId]: newValue,
155161
},
156162
},
@@ -194,7 +200,6 @@ export function useSubBlockValue<T = any>(
194200
isStreaming,
195201
emitValue,
196202
isShowingDiff,
197-
activeWorkflowId,
198203
]
199204
)
200205

apps/sim/contexts/socket-context.tsx

Lines changed: 86 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -327,91 +327,97 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
327327
}
328328
})
329329

330+
// Shared function to rehydrate workflow stores
331+
const rehydrateWorkflowStores = async (
332+
workflowId: string,
333+
workflowState: any,
334+
source: 'copilot' | 'workflow-state'
335+
) => {
336+
// Import stores dynamically
337+
const [
338+
{ useOperationQueueStore },
339+
{ useWorkflowRegistry },
340+
{ useWorkflowStore },
341+
{ useSubBlockStore },
342+
] = await Promise.all([
343+
import('@/stores/operation-queue/store'),
344+
import('@/stores/workflows/registry/store'),
345+
import('@/stores/workflows/workflow/store'),
346+
import('@/stores/workflows/subblock/store'),
347+
])
348+
349+
// Only proceed if this is the active workflow
350+
const { activeWorkflowId } = useWorkflowRegistry.getState()
351+
if (activeWorkflowId !== workflowId) {
352+
logger.info(`Skipping rehydration - workflow ${workflowId} is not active`)
353+
return false
354+
}
355+
356+
// Check for pending operations
357+
const hasPending = useOperationQueueStore
358+
.getState()
359+
.operations.some((op: any) => op.workflowId === workflowId && op.status !== 'confirmed')
360+
if (hasPending) {
361+
logger.info(`Skipping ${source} rehydration due to pending operations in queue`)
362+
return false
363+
}
364+
365+
// Extract subblock values from blocks
366+
const subblockValues: Record<string, Record<string, any>> = {}
367+
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
368+
const blockState = block as any
369+
subblockValues[blockId] = {}
370+
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
371+
subblockValues[blockId][subblockId] = (subblock as any).value
372+
})
373+
})
374+
375+
// Replace local workflow store with authoritative server state
376+
useWorkflowStore.setState({
377+
blocks: workflowState.blocks || {},
378+
edges: workflowState.edges || [],
379+
loops: workflowState.loops || {},
380+
parallels: workflowState.parallels || {},
381+
lastSaved: workflowState.lastSaved || Date.now(),
382+
isDeployed: workflowState.isDeployed ?? false,
383+
deployedAt: workflowState.deployedAt,
384+
deploymentStatuses: workflowState.deploymentStatuses || {},
385+
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
386+
})
387+
388+
// Replace subblock store values for this workflow
389+
useSubBlockStore.setState((state: any) => ({
390+
workflowValues: {
391+
...state.workflowValues,
392+
[workflowId]: subblockValues,
393+
},
394+
}))
395+
396+
logger.info(`Successfully rehydrated stores from ${source}`)
397+
return true
398+
}
399+
330400
// Copilot workflow edit events (database has been updated, rehydrate stores)
331401
socketInstance.on('copilot-workflow-edit', async (data) => {
332402
logger.info(
333403
`Copilot edited workflow ${data.workflowId} - rehydrating stores from database`
334404
)
335405

336-
if (data.workflowId === urlWorkflowId) {
337-
try {
338-
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
339-
const hasPending = useOperationQueueStore
340-
.getState()
341-
.operations.some(
342-
(op: any) => op.workflowId === data.workflowId && op.status !== 'confirmed'
343-
)
344-
if (hasPending) {
345-
logger.info('Skipping copilot rehydration due to pending operations in queue')
346-
return
347-
}
348-
} catch {}
349-
try {
350-
// Fetch fresh workflow state directly from API
351-
const response = await fetch(`/api/workflows/${data.workflowId}`)
352-
if (response.ok) {
353-
const responseData = await response.json()
354-
const workflowData = responseData.data
355-
356-
if (workflowData?.state) {
357-
logger.info('Rehydrating stores with fresh workflow state from database')
358-
359-
// Import stores dynamically to avoid import issues
360-
Promise.all([
361-
import('@/stores/workflows/workflow/store'),
362-
import('@/stores/workflows/subblock/store'),
363-
])
364-
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
365-
const workflowState = workflowData.state
366-
367-
// Extract subblock values from blocks
368-
const subblockValues: Record<string, Record<string, any>> = {}
369-
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
370-
const blockState = block as any
371-
subblockValues[blockId] = {}
372-
Object.entries(blockState.subBlocks || {}).forEach(
373-
([subblockId, subblock]) => {
374-
subblockValues[blockId][subblockId] = (subblock as any).value
375-
}
376-
)
377-
})
378-
379-
// Replace local workflow store with authoritative server state
380-
useWorkflowStore.setState({
381-
blocks: workflowState.blocks || {},
382-
edges: workflowState.edges || [],
383-
loops: workflowState.loops || {},
384-
parallels: workflowState.parallels || {},
385-
lastSaved: workflowState.lastSaved || Date.now(),
386-
isDeployed: workflowState.isDeployed ?? false,
387-
deployedAt: workflowState.deployedAt,
388-
deploymentStatuses: workflowState.deploymentStatuses || {},
389-
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
390-
})
391-
392-
// Replace subblock store values for this workflow
393-
useSubBlockStore.setState((state: any) => ({
394-
workflowValues: {
395-
...state.workflowValues,
396-
[data.workflowId]: subblockValues,
397-
},
398-
}))
399-
400-
// Note: Auto-layout is already handled by the copilot backend before saving
401-
// No need to trigger additional auto-layout here to avoid ID conflicts
402-
403-
logger.info('Successfully rehydrated stores from database after copilot edit')
404-
})
405-
.catch((error) => {
406-
logger.error('Failed to import stores for copilot rehydration:', error)
407-
})
408-
}
409-
} else {
410-
logger.error('Failed to fetch fresh workflow state:', response.statusText)
406+
try {
407+
// Fetch fresh workflow state directly from API
408+
const response = await fetch(`/api/workflows/${data.workflowId}`)
409+
if (response.ok) {
410+
const responseData = await response.json()
411+
const workflowData = responseData.data
412+
413+
if (workflowData?.state) {
414+
await rehydrateWorkflowStores(data.workflowId, workflowData.state, 'copilot')
411415
}
412-
} catch (error) {
413-
logger.error('Failed to rehydrate stores after copilot edit:', error)
416+
} else {
417+
logger.error('Failed to fetch fresh workflow state:', response.statusText)
414418
}
419+
} catch (error) {
420+
logger.error('Failed to rehydrate stores after copilot edit:', error)
415421
}
416422
})
417423

@@ -465,74 +471,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
465471
logger.debug('Operation confirmed:', data)
466472
})
467473

468-
socketInstance.on('workflow-state', (workflowData) => {
474+
socketInstance.on('workflow-state', async (workflowData) => {
469475
logger.info('Received workflow state from server')
470476

471-
// Update local stores with the fresh workflow state (same logic as YAML editor)
472-
if (workflowData?.state && workflowData.id === urlWorkflowId) {
473-
try {
474-
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
475-
const hasPending = useOperationQueueStore
476-
.getState()
477-
.operations.some(
478-
(op: any) => op.workflowId === workflowData.id && op.status !== 'confirmed'
479-
)
480-
if (hasPending) {
481-
logger.info(
482-
'Skipping workflow-state rehydration due to pending operations in queue'
483-
)
484-
return
485-
}
486-
} catch {}
487-
logger.info('Updating local stores with fresh workflow state from server')
488-
489-
try {
490-
Promise.all([
491-
import('@/stores/workflows/workflow/store'),
492-
import('@/stores/workflows/subblock/store'),
493-
import('@/stores/workflows/registry/store'),
494-
])
495-
.then(([{ useWorkflowStore }, { useSubBlockStore }]) => {
496-
const workflowState = workflowData.state
497-
498-
const subblockValues: Record<string, Record<string, any>> = {}
499-
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
500-
const blockState = block as any
501-
subblockValues[blockId] = {}
502-
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
503-
subblockValues[blockId][subblockId] = (subblock as any).value
504-
})
505-
})
506-
507-
// Replace local workflow store with authoritative server state
508-
useWorkflowStore.setState({
509-
blocks: workflowState.blocks || {},
510-
edges: workflowState.edges || [],
511-
loops: workflowState.loops || {},
512-
parallels: workflowState.parallels || {},
513-
lastSaved: workflowState.lastSaved || Date.now(),
514-
isDeployed: workflowState.isDeployed ?? false,
515-
deployedAt: workflowState.deployedAt,
516-
deploymentStatuses: workflowState.deploymentStatuses || {},
517-
hasActiveWebhook: workflowState.hasActiveWebhook ?? false,
518-
})
519-
520-
// Replace subblock store values for this workflow
521-
useSubBlockStore.setState((state: any) => ({
522-
workflowValues: {
523-
...state.workflowValues,
524-
[workflowData.id]: subblockValues,
525-
},
526-
}))
527-
528-
logger.info('Merged fresh workflow state with local state')
529-
})
530-
.catch((error) => {
531-
logger.error('Failed to import stores for workflow state update:', error)
532-
})
533-
} catch (error) {
534-
logger.error('Failed to update local stores with workflow state:', error)
535-
}
477+
if (workflowData?.state) {
478+
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
536479
}
537480
})
538481

0 commit comments

Comments
 (0)