Skip to content

Commit db4ad80

Browse files
Vikhyath MondretiVikhyath Mondreti
authored andcommitted
fix subflow ops to go through queue
1 parent 0023e8d commit db4ad80

File tree

2 files changed

+138
-74
lines changed

2 files changed

+138
-74
lines changed

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -362,19 +362,7 @@ export function useCollaborativeWorkflow() {
362362
const { operationId, error, retryable } = data
363363
logger.warn('Operation failed', { operationId, error, retryable })
364364

365-
const retryFunction = (operation: any) => {
366-
const { operation: op, target, payload } = operation.operation
367-
368-
if (op === 'subblock-update' && target === 'subblock') {
369-
// Use subblock-update channel for subblock operations
370-
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, operation.id)
371-
} else {
372-
// Use workflow-operation channel for block/edge/subflow operations
373-
emitWorkflowOperation(op, target, payload, operation.id)
374-
}
375-
}
376-
377-
failOperation(operationId, retryFunction)
365+
failOperation(operationId)
378366
}
379367

380368
// Register event handlers
@@ -478,14 +466,38 @@ export function useCollaborativeWorkflow() {
478466
autoConnectEdge, // Include edge data for atomic operation
479467
}
480468

469+
// Skip if applying remote changes
470+
if (isApplyingRemoteChange.current) {
471+
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
472+
if (autoConnectEdge) {
473+
workflowStore.addEdge(autoConnectEdge)
474+
}
475+
return
476+
}
477+
478+
// Generate operation ID for queue tracking
479+
const operationId = crypto.randomUUID()
480+
481+
// Add to queue for retry mechanism
482+
addToQueue({
483+
id: operationId,
484+
operation: {
485+
operation: 'add',
486+
target: 'block',
487+
payload: completeBlockData,
488+
},
489+
workflowId: activeWorkflowId || '',
490+
userId: session?.user?.id || 'unknown',
491+
})
492+
493+
// Apply locally first (immediate UI feedback)
481494
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
482495
if (autoConnectEdge) {
483496
workflowStore.addEdge(autoConnectEdge)
484497
}
485498

486-
if (!isApplyingRemoteChange.current) {
487-
emitWorkflowOperation('add', 'block', completeBlockData)
488-
}
499+
// Emit to server with operation ID for tracking
500+
emitWorkflowOperation('add', 'block', completeBlockData, operationId)
489501
return
490502
}
491503

apps/sim/stores/operation-queue/store.ts

Lines changed: 110 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export interface QueuedOperation {
1313
workflowId: string
1414
timestamp: number
1515
retryCount: number
16-
status: 'pending' | 'confirmed' | 'failed'
16+
status: 'pending' | 'processing' | 'confirmed' | 'failed'
1717
userId: string
1818
}
1919

@@ -24,9 +24,11 @@ interface OperationQueueState {
2424

2525
addToQueue: (operation: Omit<QueuedOperation, 'timestamp' | 'retryCount' | 'status'>) => void
2626
confirmOperation: (operationId: string) => void
27-
failOperation: (operationId: string, emitFunction: (operation: QueuedOperation) => void) => void
27+
failOperation: (operationId: string) => void
2828
handleOperationTimeout: (operationId: string) => void
2929
handleSocketReconnection: () => void
30+
processNextOperation: () => void
31+
3032
triggerOfflineMode: () => void
3133
clearError: () => void
3234
}
@@ -60,8 +62,28 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
6062
addToQueue: (operation) => {
6163
const state = get()
6264

65+
// Check for duplicate operation ID
6366
const existingOp = state.operations.find((op) => op.id === operation.id)
6467
if (existingOp) {
68+
logger.debug('Skipping duplicate operation', { operationId: operation.id })
69+
return
70+
}
71+
72+
// Check for duplicate operation content (same operation on same target with same payload)
73+
const duplicateContent = state.operations.find(
74+
(op) =>
75+
op.operation.operation === operation.operation.operation &&
76+
op.operation.target === operation.operation.target &&
77+
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload) &&
78+
op.workflowId === operation.workflowId
79+
)
80+
if (duplicateContent) {
81+
logger.debug('Skipping duplicate operation content', {
82+
operationId: operation.id,
83+
existingOperationId: duplicateContent.id,
84+
operation: operation.operation.operation,
85+
target: operation.operation.target,
86+
})
6587
return
6688
}
6789

@@ -77,20 +99,12 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
7799
operation: queuedOp.operation,
78100
})
79101

80-
const timeoutId = setTimeout(() => {
81-
logger.warn('Operation timeout - no server response after 5 seconds', {
82-
operationId: queuedOp.id,
83-
})
84-
operationTimeouts.delete(queuedOp.id)
85-
86-
get().handleOperationTimeout(queuedOp.id)
87-
}, 5000)
88-
89-
operationTimeouts.set(queuedOp.id, timeoutId)
90-
91102
set((state) => ({
92103
operations: [...state.operations, queuedOp],
93104
}))
105+
106+
// Start processing if not already processing
107+
get().processNextOperation()
94108
},
95109

96110
confirmOperation: (operationId) => {
@@ -114,10 +128,13 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
114128
remainingOps: newOperations.length,
115129
})
116130

117-
set({ operations: newOperations })
131+
set({ operations: newOperations, isProcessing: false })
132+
133+
// Process next operation in queue
134+
get().processNextOperation()
118135
},
119136

120-
failOperation: (operationId: string, emitFunction: (operation: QueuedOperation) => void) => {
137+
failOperation: (operationId: string) => {
121138
const state = get()
122139
const operation = state.operations.find((op) => op.id === operationId)
123140
if (!operation) {
@@ -140,42 +157,23 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
140157
retryCount: newRetryCount,
141158
})
142159

160+
// Update retry count and mark as pending for retry
161+
set((state) => ({
162+
operations: state.operations.map((op) =>
163+
op.id === operationId
164+
? { ...op, retryCount: newRetryCount, status: 'pending' as const }
165+
: op
166+
),
167+
isProcessing: false, // Allow processing to continue
168+
}))
169+
170+
// Schedule retry
143171
const timeout = setTimeout(() => {
144-
if (operation.workflowId !== currentWorkflowId) {
145-
logger.warn('Cancelling retry - workflow changed', {
146-
operationId,
147-
operationWorkflow: operation.workflowId,
148-
currentWorkflow: currentWorkflowId,
149-
})
150-
retryTimeouts.delete(operationId)
151-
set((state) => ({
152-
operations: state.operations.filter((op) => op.id !== operationId),
153-
}))
154-
return
155-
}
156-
157-
emitFunction(operation)
158172
retryTimeouts.delete(operationId)
159-
160-
// Create new operation timeout for this retry attempt
161-
const newTimeoutId = setTimeout(() => {
162-
logger.warn('Retry operation timeout - no server response after 5 seconds', {
163-
operationId,
164-
})
165-
operationTimeouts.delete(operationId)
166-
get().handleOperationTimeout(operationId)
167-
}, 5000)
168-
169-
operationTimeouts.set(operationId, newTimeoutId)
173+
get().processNextOperation()
170174
}, delay)
171175

172176
retryTimeouts.set(operationId, timeout)
173-
174-
set((state) => ({
175-
operations: state.operations.map((op) =>
176-
op.id === operationId ? { ...op, retryCount: newRetryCount } : op
177-
),
178-
}))
179177
} else {
180178
logger.error('Operation failed after max retries, triggering offline mode', { operationId })
181179
get().triggerOfflineMode()
@@ -194,21 +192,74 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
194192
operationId,
195193
})
196194

197-
const retryFunction = (operation: any) => {
198-
const { operation: op, target, payload } = operation.operation
199-
200-
if (op === 'subblock-update' && target === 'subblock') {
201-
if (emitSubblockUpdate) {
202-
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, operation.id)
203-
}
204-
} else {
205-
if (emitWorkflowOperation) {
206-
emitWorkflowOperation(op, target, payload, operation.id)
207-
}
195+
get().failOperation(operationId)
196+
},
197+
198+
processNextOperation: () => {
199+
const state = get()
200+
201+
// Don't process if already processing
202+
if (state.isProcessing) {
203+
return
204+
}
205+
206+
// Find the first pending operation (FIFO - first in, first out)
207+
const nextOperation = state.operations.find((op) => op.status === 'pending')
208+
if (!nextOperation) {
209+
return // No pending operations
210+
}
211+
212+
// Check workflow context
213+
if (nextOperation.workflowId !== currentWorkflowId) {
214+
logger.warn('Cancelling operation - workflow changed', {
215+
operationId: nextOperation.id,
216+
operationWorkflow: nextOperation.workflowId,
217+
currentWorkflow: currentWorkflowId,
218+
})
219+
set((state) => ({
220+
operations: state.operations.filter((op) => op.id !== nextOperation.id),
221+
}))
222+
// Try next operation
223+
get().processNextOperation()
224+
return
225+
}
226+
227+
// Mark as processing
228+
set((state) => ({
229+
operations: state.operations.map((op) =>
230+
op.id === nextOperation.id ? { ...op, status: 'processing' as const } : op
231+
),
232+
isProcessing: true,
233+
}))
234+
235+
logger.debug('Processing operation sequentially', {
236+
operationId: nextOperation.id,
237+
operation: nextOperation.operation,
238+
retryCount: nextOperation.retryCount,
239+
})
240+
241+
// Emit the operation
242+
const { operation: op, target, payload } = nextOperation.operation
243+
if (op === 'subblock-update' && target === 'subblock') {
244+
if (emitSubblockUpdate) {
245+
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id)
246+
}
247+
} else {
248+
if (emitWorkflowOperation) {
249+
emitWorkflowOperation(op, target, payload, nextOperation.id)
208250
}
209251
}
210252

211-
get().failOperation(operationId, retryFunction)
253+
// Create operation timeout
254+
const timeoutId = setTimeout(() => {
255+
logger.warn('Operation timeout - no server response after 5 seconds', {
256+
operationId: nextOperation.id,
257+
})
258+
operationTimeouts.delete(nextOperation.id)
259+
get().handleOperationTimeout(nextOperation.id)
260+
}, 5000)
261+
262+
operationTimeouts.set(nextOperation.id, timeoutId)
212263
},
213264

214265
handleSocketReconnection: () => {
@@ -275,6 +326,7 @@ export function useOperationQueue() {
275326
confirmOperation: store.confirmOperation,
276327
failOperation: store.failOperation,
277328
handleSocketReconnection: store.handleSocketReconnection,
329+
processNextOperation: store.processNextOperation,
278330
triggerOfflineMode: store.triggerOfflineMode,
279331
clearError: store.clearError,
280332
}

0 commit comments

Comments
 (0)