Skip to content

Commit db22e26

Browse files
Merge pull request #671 from simstudioai/fix/queuing
improvement(queuing): queuing with retries for sockets ops
2 parents 36eb04d + f8000a7 commit db22e26

File tree

12 files changed

+758
-344
lines changed

12 files changed

+758
-344
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/user-avatar-stack/components/connection-status/connection-status.tsx

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@ import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/w/compo
77

88
interface ConnectionStatusProps {
99
isConnected: boolean
10+
hasOperationError?: boolean
1011
}
1112

12-
export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
13+
export function ConnectionStatus({ isConnected, hasOperationError }: ConnectionStatusProps) {
1314
const userPermissions = useUserPermissionsContext()
1415

1516
const handleRefresh = () => {
1617
window.location.reload()
1718
}
1819

19-
// Don't render anything if not in offline mode
20-
if (!userPermissions.isOfflineMode) {
20+
// Show error if either offline mode OR operation error
21+
const shouldShowError = userPermissions.isOfflineMode || hasOperationError
22+
23+
// Don't render anything if no errors
24+
if (!shouldShowError) {
2125
return null
2226
}
2327

@@ -32,10 +36,14 @@ export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
3236
</div>
3337
<div className='flex flex-col'>
3438
<span className='font-medium text-xs leading-tight'>
35-
{isConnected ? 'Reconnected' : 'Connection lost - please refresh'}
39+
{hasOperationError
40+
? 'Workflow Edit Failed'
41+
: isConnected
42+
? 'Reconnected'
43+
: 'Connection lost - please refresh'}
3644
</span>
3745
<span className='text-red-600 text-xs leading-tight'>
38-
{isConnected ? 'Refresh to continue editing' : 'Read-only mode active'}
46+
Please refresh to continue editing
3947
</span>
4048
</div>
4149
</div>

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/user-avatar-stack/user-avatar-stack.tsx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use client'
22

33
import { useMemo } from 'react'
4+
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
45
import { usePresence } from '../../../../hooks/use-presence'
56
import { ConnectionStatus } from './components/connection-status/connection-status'
67
import { UserAvatar } from './components/user-avatar/user-avatar'
@@ -29,6 +30,9 @@ export function UserAvatarStack({
2930
const { users: presenceUsers, isConnected } = usePresence()
3031
const users = propUsers || presenceUsers
3132

33+
// Get operation error state from collaborative workflow
34+
const { hasOperationError } = useCollaborativeWorkflow()
35+
3236
// Memoize the processed users to avoid unnecessary re-renders
3337
const { visibleUsers, overflowCount } = useMemo(() => {
3438
if (users.length === 0) {
@@ -53,8 +57,8 @@ export function UserAvatarStack({
5357

5458
return (
5559
<div className={`flex items-center gap-3 ${className}`}>
56-
{/* Connection status - always check, shows when offline */}
57-
<ConnectionStatus isConnected={isConnected} />
60+
{/* Connection status - always check, shows when offline or operation errors */}
61+
<ConnectionStatus isConnected={isConnected} hasOperationError={hasOperationError} />
5862

5963
{/* Only show avatar stack when there are multiple users (>1) */}
6064
{users.length > 1 && (

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/loop-node/components/loop-badges.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ export function LoopBadges({ nodeId, data }: LoopBadgesProps) {
260260
<div className='relative min-h-[80px] rounded-md border border-input bg-background px-3 pt-2 pb-3 font-mono text-sm'>
261261
{editorValue === '' && (
262262
<div className='pointer-events-none absolute top-[8.5px] left-3 select-none text-muted-foreground/50'>
263-
["item1", "item2", "item3"]
263+
['item1', 'item2', 'item3']
264264
</div>
265265
)}
266266
<Editor

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ export function useSubBlockValue<T = any>(
269269
if (!isEqual(valueRef.current, newValue)) {
270270
valueRef.current = newValue
271271

272-
// Always update local store immediately for UI responsiveness
272+
// Update local store immediately for UI responsiveness
273+
// The collaborative function will also update it, but that's okay for idempotency
273274
useSubBlockStore.setState((state) => ({
274275
workflowValues: {
275276
...state.workflowValues,

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { SkeletonLoading } from '@/app/workspace/[workspaceId]/w/[workflowId]/co
2222
import { Toolbar } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/toolbar/toolbar'
2323
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/w/components/providers/workspace-permissions-provider'
2424
import { getBlock } from '@/blocks'
25-
import { useSocket } from '@/contexts/socket-context'
2625
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
2726
import { useWorkspacePermissions } from '@/hooks/use-workspace-permissions'
2827
import { useExecutionStore } from '@/stores/execution/store'
@@ -120,10 +119,9 @@ const WorkflowContent = React.memo(() => {
120119
collaborativeRemoveEdge: removeEdge,
121120
collaborativeUpdateBlockPosition,
122121
collaborativeUpdateParentId: updateParentId,
123-
isConnected,
124-
currentWorkflowId,
122+
collaborativeSetSubblockValue,
125123
} = useCollaborativeWorkflow()
126-
const { emitSubblockUpdate } = useSocket()
124+
127125
const { markAllAsRead } = useNotificationStore()
128126
const { resetLoaded: resetVariablesLoaded } = useVariablesStore()
129127

@@ -1484,11 +1482,9 @@ const WorkflowContent = React.memo(() => {
14841482
const handleSubBlockValueUpdate = (event: CustomEvent) => {
14851483
const { blockId, subBlockId, value } = event.detail
14861484
if (blockId && subBlockId) {
1487-
// Only emit the socket update, don't update the store again
1488-
// The store was already updated in the setValue function
1489-
if (isConnected && currentWorkflowId && activeWorkflowId === currentWorkflowId) {
1490-
emitSubblockUpdate(blockId, subBlockId, value)
1491-
}
1485+
// Use collaborative function to go through queue system
1486+
// This ensures 5-second timeout and error detection work
1487+
collaborativeSetSubblockValue(blockId, subBlockId, value)
14921488
}
14931489
}
14941490

@@ -1500,7 +1496,7 @@ const WorkflowContent = React.memo(() => {
15001496
handleSubBlockValueUpdate as EventListener
15011497
)
15021498
}
1503-
}, [emitSubblockUpdate, isConnected, currentWorkflowId, activeWorkflowId])
1499+
}, [collaborativeSetSubblockValue])
15041500

15051501
// Show skeleton UI while loading, then smoothly transition to real content
15061502
const showSkeletonUI = !isWorkflowReady

apps/sim/app/workspace/[workspaceId]/w/components/providers/workspace-permissions-provider.tsx

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import type React from 'react'
44
import { createContext, useContext, useEffect, useMemo, useState } from 'react'
55
import { useParams } from 'next/navigation'
66
import { createLogger } from '@/lib/logs/console-logger'
7+
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
78
import { useUserPermissions, type WorkspaceUserPermissions } from '@/hooks/use-user-permissions'
89
import {
910
useWorkspacePermissions,
1011
type WorkspacePermissions,
1112
} from '@/hooks/use-workspace-permissions'
12-
import { usePresence } from '../../[workflowId]/hooks/use-presence'
1313

1414
const logger = createLogger('WorkspacePermissionsProvider')
1515

@@ -57,7 +57,16 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
5757

5858
// Manage offline mode state locally
5959
const [isOfflineMode, setIsOfflineMode] = useState(false)
60-
const [hasBeenConnected, setHasBeenConnected] = useState(false)
60+
61+
// Get operation error state from collaborative workflow
62+
const { hasOperationError } = useCollaborativeWorkflow()
63+
64+
// Set offline mode when there are operation errors
65+
useEffect(() => {
66+
if (hasOperationError) {
67+
setIsOfflineMode(true)
68+
}
69+
}, [hasOperationError])
6170

6271
// Fetch workspace permissions and loading state
6372
const {
@@ -74,26 +83,8 @@ export function WorkspacePermissionsProvider({ children }: WorkspacePermissionsP
7483
permissionsError
7584
)
7685

77-
// Get connection status and update offline mode accordingly
78-
const { isConnected } = usePresence()
79-
80-
useEffect(() => {
81-
if (isConnected) {
82-
// Mark that we've been connected at least once
83-
setHasBeenConnected(true)
84-
// On initial connection, allow going online
85-
if (!hasBeenConnected) {
86-
setIsOfflineMode(false)
87-
}
88-
// If we were previously connected and this is a reconnection, stay offline (user must refresh)
89-
} else if (hasBeenConnected) {
90-
const timeoutId = setTimeout(() => {
91-
setIsOfflineMode(true)
92-
}, 6000)
93-
return () => clearTimeout(timeoutId)
94-
}
95-
// If not connected and never been connected, stay in initial state (not offline mode)
96-
}, [isConnected, hasBeenConnected])
86+
// Note: Connection-based error detection removed - only rely on operation timeouts
87+
// The 5-second operation timeout system will handle all error cases
9788

9889
// Create connection-aware permissions that override user permissions when offline
9990
const userPermissions = useMemo((): WorkspaceUserPermissions & { isOfflineMode?: boolean } => {

apps/sim/contexts/socket-context.tsx

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,18 @@ interface SocketContextType {
3838
presenceUsers: PresenceUser[]
3939
joinWorkflow: (workflowId: string) => void
4040
leaveWorkflow: () => void
41-
emitWorkflowOperation: (operation: string, target: string, payload: any) => void
42-
emitSubblockUpdate: (blockId: string, subblockId: string, value: any) => void
41+
emitWorkflowOperation: (
42+
operation: string,
43+
target: string,
44+
payload: any,
45+
operationId?: string
46+
) => void
47+
emitSubblockUpdate: (
48+
blockId: string,
49+
subblockId: string,
50+
value: any,
51+
operationId?: string
52+
) => void
4353
emitCursorUpdate: (cursor: { x: number; y: number }) => void
4454
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
4555
// Event handlers for receiving real-time updates
@@ -51,6 +61,8 @@ interface SocketContextType {
5161
onUserLeft: (handler: (data: any) => void) => void
5262
onWorkflowDeleted: (handler: (data: any) => void) => void
5363
onWorkflowReverted: (handler: (data: any) => void) => void
64+
onOperationConfirmed: (handler: (data: any) => void) => void
65+
onOperationFailed: (handler: (data: any) => void) => void
5466
}
5567

5668
const SocketContext = createContext<SocketContextType>({
@@ -73,6 +85,8 @@ const SocketContext = createContext<SocketContextType>({
7385
onUserLeft: () => {},
7486
onWorkflowDeleted: () => {},
7587
onWorkflowReverted: () => {},
88+
onOperationConfirmed: () => {},
89+
onOperationFailed: () => {},
7690
})
7791

7892
export const useSocket = () => useContext(SocketContext)
@@ -103,6 +117,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
103117
userLeft?: (data: any) => void
104118
workflowDeleted?: (data: any) => void
105119
workflowReverted?: (data: any) => void
120+
operationConfirmed?: (data: any) => void
121+
operationFailed?: (data: any) => void
106122
}>({})
107123

108124
// Helper function to generate a fresh socket token
@@ -290,6 +306,18 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
290306
eventHandlers.current.workflowReverted?.(data)
291307
})
292308

309+
// Operation confirmation events
310+
socketInstance.on('operation-confirmed', (data) => {
311+
logger.debug('Operation confirmed', { operationId: data.operationId })
312+
eventHandlers.current.operationConfirmed?.(data)
313+
})
314+
315+
// Operation failure events
316+
socketInstance.on('operation-failed', (data) => {
317+
logger.warn('Operation failed', { operationId: data.operationId, error: data.error })
318+
eventHandlers.current.operationFailed?.(data)
319+
})
320+
293321
// Cursor update events
294322
socketInstance.on('cursor-update', (data) => {
295323
setPresenceUsers((prev) =>
@@ -444,8 +472,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
444472

445473
// Emit workflow operations (blocks, edges, subflows)
446474
const emitWorkflowOperation = useCallback(
447-
(operation: string, target: string, payload: any) => {
448-
if (!socket || !currentWorkflowId) return
475+
(operation: string, target: string, payload: any, operationId?: string) => {
476+
if (!socket || !currentWorkflowId) {
477+
return
478+
}
449479

450480
// Apply light throttling only to position updates for smooth collaborative experience
451481
const isPositionUpdate = operation === 'update-position' && target === 'block'
@@ -459,6 +489,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
459489
target,
460490
payload,
461491
timestamp: Date.now(),
492+
operationId, // Include operation ID for queue tracking
462493
})
463494

464495
// Check if we already have a pending timeout for this block
@@ -482,6 +513,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
482513
target,
483514
payload,
484515
timestamp: Date.now(),
516+
operationId, // Include operation ID for queue tracking
485517
})
486518
}
487519
},
@@ -490,14 +522,15 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
490522

491523
// Emit subblock value updates
492524
const emitSubblockUpdate = useCallback(
493-
(blockId: string, subblockId: string, value: any) => {
525+
(blockId: string, subblockId: string, value: any, operationId?: string) => {
494526
// Only emit if socket is connected and we're in a valid workflow room
495527
if (socket && currentWorkflowId) {
496528
socket.emit('subblock-update', {
497529
blockId,
498530
subblockId,
499531
value,
500532
timestamp: Date.now(),
533+
operationId, // Include operation ID for queue tracking
501534
})
502535
} else {
503536
logger.warn('Cannot emit subblock update: no socket connection or workflow room', {
@@ -570,6 +603,14 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
570603
eventHandlers.current.workflowReverted = handler
571604
}, [])
572605

606+
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
607+
eventHandlers.current.operationConfirmed = handler
608+
}, [])
609+
610+
const onOperationFailed = useCallback((handler: (data: any) => void) => {
611+
eventHandlers.current.operationFailed = handler
612+
}, [])
613+
573614
return (
574615
<SocketContext.Provider
575616
value={{
@@ -592,6 +633,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
592633
onUserLeft,
593634
onWorkflowDeleted,
594635
onWorkflowReverted,
636+
onOperationConfirmed,
637+
onOperationFailed,
595638
}}
596639
>
597640
{children}

0 commit comments

Comments
 (0)