Skip to content

Commit f16d759

Browse files
Vikhyath MondretiVikhyath Mondreti
authored andcommitted
simplify
1 parent db4ad80 commit f16d759

File tree

2 files changed

+49
-85
lines changed

2 files changed

+49
-85
lines changed

apps/sim/hooks/use-collaborative-workflow.ts

Lines changed: 26 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,8 @@ export function useCollaborativeWorkflow() {
4444
const lastPositionTimestamps = useRef<Map<string, number>>(new Map())
4545

4646
// Operation queue
47-
const {
48-
queue,
49-
hasOperationError,
50-
addToQueue,
51-
confirmOperation,
52-
failOperation,
53-
handleSocketReconnection,
54-
} = useOperationQueue()
47+
const { queue, hasOperationError, addToQueue, confirmOperation, failOperation } =
48+
useOperationQueue()
5549

5650
// Clear position timestamps when switching workflows
5751
// Note: Workflow joining is now handled automatically by socket connect event based on URL
@@ -74,30 +68,6 @@ export function useCollaborativeWorkflow() {
7468
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
7569
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
7670

77-
// Log connection status changes and handle reconnection
78-
useEffect(() => {
79-
logger.info('Collaborative workflow connection status changed', {
80-
isConnected,
81-
currentWorkflowId,
82-
activeWorkflowId,
83-
presenceUsers: presenceUsers.length,
84-
})
85-
86-
// Clear operation queue when socket reconnects AND has joined workflow
87-
// We need both isConnected=true AND currentWorkflowId to match activeWorkflowId
88-
// This ensures the socket has actually joined the workflow room before we allow retries
89-
if (isConnected && currentWorkflowId && currentWorkflowId === activeWorkflowId) {
90-
logger.info('Socket reconnected and joined workflow - clearing operation queue')
91-
handleSocketReconnection()
92-
}
93-
}, [
94-
isConnected,
95-
currentWorkflowId,
96-
activeWorkflowId,
97-
presenceUsers.length,
98-
handleSocketReconnection,
99-
])
100-
10171
useEffect(() => {
10272
const handleWorkflowOperation = (data: any) => {
10373
const { operation, target, payload, userId } = data
@@ -416,10 +386,8 @@ export function useCollaborativeWorkflow() {
416386
})
417387

418388
localAction()
419-
420-
emitWorkflowOperation(operation, target, payload, operationId)
421389
},
422-
[addToQueue, emitWorkflowOperation, session?.user?.id]
390+
[addToQueue, session?.user?.id]
423391
)
424392

425393
const executeQueuedDebouncedOperation = useCallback(
@@ -496,8 +464,6 @@ export function useCollaborativeWorkflow() {
496464
workflowStore.addEdge(autoConnectEdge)
497465
}
498466

499-
// Emit to server with operation ID for tracking
500-
emitWorkflowOperation('add', 'block', completeBlockData, operationId)
501467
return
502468
}
503469

@@ -562,9 +528,6 @@ export function useCollaborativeWorkflow() {
562528
if (autoConnectEdge) {
563529
workflowStore.addEdge(autoConnectEdge)
564530
}
565-
566-
// Emit to server with operation ID
567-
emitWorkflowOperation('add', 'block', completeBlockData, operationId)
568531
},
569532
[workflowStore, emitWorkflowOperation, addToQueue, session?.user?.id]
570533
)
@@ -594,17 +557,37 @@ export function useCollaborativeWorkflow() {
594557
const globalWindow = window as any
595558
const pendingUpdates = globalWindow.__pendingSubblockUpdates
596559
if (pendingUpdates && Array.isArray(pendingUpdates)) {
597-
// Emit collaborative subblock updates for each changed subblock
560+
// Queue each subblock update individually
598561
for (const update of pendingUpdates) {
599562
const { blockId, subBlockId, newValue } = update
600-
emitSubblockUpdate(blockId, subBlockId, newValue)
563+
const operationId = crypto.randomUUID()
564+
565+
addToQueue({
566+
id: operationId,
567+
operation: {
568+
operation: 'subblock-update',
569+
target: 'subblock',
570+
payload: { blockId, subblockId: subBlockId, value: newValue },
571+
},
572+
workflowId: activeWorkflowId || '',
573+
userId: session?.user?.id || 'unknown',
574+
})
575+
576+
subBlockStore.setValue(blockId, subBlockId, newValue)
601577
}
602578
// Clear the pending updates
603579
globalWindow.__pendingSubblockUpdates = undefined
604580
}
605581
})
606582
},
607-
[executeQueuedOperation, workflowStore, emitSubblockUpdate]
583+
[
584+
executeQueuedOperation,
585+
workflowStore,
586+
addToQueue,
587+
subBlockStore,
588+
activeWorkflowId,
589+
session?.user?.id,
590+
]
608591
)
609592

610593
const collaborativeToggleBlockEnabled = useCallback(
@@ -802,9 +785,6 @@ export function useCollaborativeWorkflow() {
802785

803786
// Apply locally first (immediate UI feedback)
804787
subBlockStore.setValue(blockId, subblockId, value)
805-
806-
// Emit to server with operation ID for tracking
807-
emitSubblockUpdate(blockId, subblockId, value, operationId)
808788
},
809789
[
810790
subBlockStore,

apps/sim/stores/operation-queue/store.ts

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ interface OperationQueueState {
2626
confirmOperation: (operationId: string) => void
2727
failOperation: (operationId: string) => void
2828
handleOperationTimeout: (operationId: string) => void
29-
handleSocketReconnection: () => void
3029
processNextOperation: () => void
3130

3231
triggerOfflineMode: () => void
@@ -65,24 +64,43 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
6564
// Check for duplicate operation ID
6665
const existingOp = state.operations.find((op) => op.id === operation.id)
6766
if (existingOp) {
68-
logger.debug('Skipping duplicate operation', { operationId: operation.id })
67+
logger.debug('Skipping duplicate operation ID', {
68+
operationId: operation.id,
69+
existingStatus: existingOp.status,
70+
})
6971
return
7072
}
7173

72-
// Check for duplicate operation content (same operation on same target with same payload)
74+
// Enhanced duplicate content check - especially important for block operations
7375
const duplicateContent = state.operations.find(
7476
(op) =>
7577
op.operation.operation === operation.operation.operation &&
7678
op.operation.target === operation.operation.target &&
77-
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload) &&
78-
op.workflowId === operation.workflowId
79+
op.workflowId === operation.workflowId &&
80+
// For block operations, check the block ID specifically
81+
((operation.operation.target === 'block' &&
82+
op.operation.payload?.id === operation.operation.payload?.id) ||
83+
// For subblock operations, check blockId and subblockId
84+
(operation.operation.target === 'subblock' &&
85+
op.operation.payload?.blockId === operation.operation.payload?.blockId &&
86+
op.operation.payload?.subblockId === operation.operation.payload?.subblockId) ||
87+
// For other operations, fall back to full payload comparison
88+
(operation.operation.target !== 'block' &&
89+
operation.operation.target !== 'subblock' &&
90+
JSON.stringify(op.operation.payload) === JSON.stringify(operation.operation.payload)))
7991
)
92+
8093
if (duplicateContent) {
8194
logger.debug('Skipping duplicate operation content', {
8295
operationId: operation.id,
8396
existingOperationId: duplicateContent.id,
8497
operation: operation.operation.operation,
8598
target: operation.operation.target,
99+
existingStatus: duplicateContent.status,
100+
payload:
101+
operation.operation.target === 'block'
102+
? { id: operation.operation.payload?.id }
103+
: operation.operation.payload,
86104
})
87105
return
88106
}
@@ -262,39 +280,6 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
262280
operationTimeouts.set(nextOperation.id, timeoutId)
263281
},
264282

265-
handleSocketReconnection: () => {
266-
// Clear all timeouts since they're for the old socket
267-
retryTimeouts.forEach((timeout) => clearTimeout(timeout))
268-
retryTimeouts.clear()
269-
operationTimeouts.forEach((timeout) => clearTimeout(timeout))
270-
operationTimeouts.clear()
271-
272-
const state = get()
273-
const resetOperations = state.operations.map((op) => ({
274-
...op,
275-
retryCount: 0, // Reset retry count for fresh attempts
276-
status: 'pending' as const,
277-
}))
278-
279-
set({
280-
operations: resetOperations,
281-
isProcessing: false,
282-
hasOperationError: false,
283-
})
284-
285-
resetOperations.forEach((operation) => {
286-
const timeoutId = setTimeout(() => {
287-
logger.warn('Operation timeout after reconnection - no server response after 5 seconds', {
288-
operationId: operation.id,
289-
})
290-
operationTimeouts.delete(operation.id)
291-
get().handleOperationTimeout(operation.id)
292-
}, 5000)
293-
294-
operationTimeouts.set(operation.id, timeoutId)
295-
})
296-
},
297-
298283
triggerOfflineMode: () => {
299284
logger.error('Operation failed after retries - triggering offline mode')
300285

@@ -325,7 +310,6 @@ export function useOperationQueue() {
325310
addToQueue: store.addToQueue,
326311
confirmOperation: store.confirmOperation,
327312
failOperation: store.failOperation,
328-
handleSocketReconnection: store.handleSocketReconnection,
329313
processNextOperation: store.processNextOperation,
330314
triggerOfflineMode: store.triggerOfflineMode,
331315
clearError: store.clearError,

0 commit comments

Comments
 (0)