Skip to content

Commit 1e14743

Browse files
fix(sockets): move debounce to server side (#1265)
* fix(sockets): move debounce to server side * remove comments / unused onBlur
1 parent 8510312 commit 1e14743

File tree

9 files changed

+378
-493
lines changed

9 files changed

+378
-493
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/long-input.tsx

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
1414
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
1515
import type { SubBlockConfig } from '@/blocks/types'
1616
import { useTagSelection } from '@/hooks/use-tag-selection'
17-
import { useOperationQueueStore } from '@/stores/operation-queue/store'
1817

1918
const logger = createLogger('LongInput')
2019

@@ -382,11 +381,6 @@ export function LongInput({
382381
onScroll={handleScroll}
383382
onWheel={handleWheel}
384383
onKeyDown={handleKeyDown}
385-
onBlur={() => {
386-
try {
387-
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
388-
} catch {}
389-
}}
390384
onFocus={() => {
391385
setShowEnvVars(false)
392386
setShowTags(false)

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/short-input.tsx

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/c
1414
import { useWand } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand'
1515
import type { SubBlockConfig } from '@/blocks/types'
1616
import { useTagSelection } from '@/hooks/use-tag-selection'
17-
import { useOperationQueueStore } from '@/stores/operation-queue/store'
1817

1918
const logger = createLogger('ShortInput')
2019

@@ -396,9 +395,6 @@ export function ShortInput({
396395
onBlur={() => {
397396
setIsFocused(false)
398397
setShowEnvVars(false)
399-
try {
400-
useOperationQueueStore.getState().flushDebouncedForBlock(blockId)
401-
} catch {}
402398
}}
403399
onDrop={handleDrop}
404400
onDragOver={handleDragOver}

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/hooks/use-sub-block-value.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ interface UseSubBlockValueOptions {
1818
/**
1919
* Custom hook to get and set values for a sub-block in a workflow.
2020
* Handles complex object values properly by using deep equality comparison.
21-
* Includes automatic debouncing and explicit streaming mode for AI generation.
21+
* Supports explicit streaming mode for AI generation.
2222
*
2323
* @param blockId The ID of the block containing the sub-block
2424
* @param subBlockId The ID of the sub-block
@@ -181,7 +181,7 @@ export function useSubBlockValue<T = any>(
181181
}
182182
}
183183

184-
// Emit immediately - let the operation queue handle debouncing and deduplication
184+
// Emit immediately; the client queue coalesces same-key ops and the server debounces
185185
emitValue(valueCopy)
186186

187187
if (triggerWorkflowUpdate) {

apps/sim/contexts/socket-context.tsx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -514,16 +514,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
514514
`URL workflow changed from ${currentWorkflowId} to ${urlWorkflowId}, switching rooms`
515515
)
516516

517-
try {
518-
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
519-
// Flush debounced updates for the old workflow before switching rooms
520-
if (currentWorkflowId) {
521-
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
522-
} else {
523-
useOperationQueueStore.getState().flushAllDebounced()
524-
}
525-
} catch {}
526-
527517
// Leave current workflow first if we're in one
528518
if (currentWorkflowId) {
529519
logger.info(`Leaving current workflow ${currentWorkflowId} before joining ${urlWorkflowId}`)
@@ -583,7 +573,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
583573
logger.info(`Leaving workflow: ${currentWorkflowId}`)
584574
try {
585575
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
586-
useOperationQueueStore.getState().flushDebouncedForWorkflow(currentWorkflowId)
587576
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
588577
} catch {}
589578
socket.emit('leave-workflow')

apps/sim/socket-server/handlers/subblocks.ts

Lines changed: 152 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ import type { RoomManager } from '@/socket-server/rooms/manager'
88

99
const logger = createLogger('SubblocksHandlers')
1010

11+
type PendingSubblock = {
12+
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
13+
timeout: NodeJS.Timeout
14+
// Map operationId -> socketId to emit confirmations/failures to correct clients
15+
opToSocket: Map<string, string>
16+
}
17+
18+
// Keyed by `${workflowId}:${blockId}:${subblockId}`
19+
const pendingSubblockUpdates = new Map<string, PendingSubblock>()
20+
1121
export function setupSubblocksHandlers(
1222
socket: AuthenticatedSocket,
1323
deps: HandlerDependencies | RoomManager
@@ -46,106 +56,44 @@ export function setupSubblocksHandlers(
4656
userPresence.lastActivity = Date.now()
4757
}
4858

49-
// First, verify that the workflow still exists in the database
50-
const workflowExists = await db
51-
.select({ id: workflow.id })
52-
.from(workflow)
53-
.where(eq(workflow.id, workflowId))
54-
.limit(1)
55-
56-
if (workflowExists.length === 0) {
57-
logger.warn(`Ignoring subblock update: workflow ${workflowId} no longer exists`, {
58-
socketId: socket.id,
59-
blockId,
60-
subblockId,
61-
})
62-
roomManager.cleanupUserFromRoom(socket.id, workflowId)
63-
return
64-
}
65-
66-
let updateSuccessful = false
67-
await db.transaction(async (tx) => {
68-
const [block] = await tx
69-
.select({ subBlocks: workflowBlocks.subBlocks })
70-
.from(workflowBlocks)
71-
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
72-
.limit(1)
73-
74-
if (!block) {
75-
// Block was deleted - this is a normal race condition in collaborative editing
76-
logger.debug(
77-
`Ignoring subblock update for deleted block: ${workflowId}/${blockId}.${subblockId}`
78-
)
79-
return
80-
}
81-
82-
const subBlocks = (block.subBlocks as any) || {}
83-
84-
if (!subBlocks[subblockId]) {
85-
// Create new subblock with minimal structure
86-
subBlocks[subblockId] = {
87-
id: subblockId,
88-
type: 'unknown', // Will be corrected by next collaborative update
89-
value: value,
90-
}
91-
} else {
92-
// Preserve existing id and type, only update value
93-
subBlocks[subblockId] = {
94-
...subBlocks[subblockId],
95-
value: value,
59+
// Server-side debounce/coalesce by workflowId+blockId+subblockId
60+
const debouncedKey = `${workflowId}:${blockId}:${subblockId}`
61+
const existing = pendingSubblockUpdates.get(debouncedKey)
62+
if (existing) {
63+
clearTimeout(existing.timeout)
64+
existing.latest = { blockId, subblockId, value, timestamp }
65+
if (operationId) existing.opToSocket.set(operationId, socket.id)
66+
existing.timeout = setTimeout(async () => {
67+
await flushSubblockUpdate(workflowId, existing, roomManager)
68+
pendingSubblockUpdates.delete(debouncedKey)
69+
}, 25)
70+
} else {
71+
const opToSocket = new Map<string, string>()
72+
if (operationId) opToSocket.set(operationId, socket.id)
73+
const timeout = setTimeout(async () => {
74+
const pending = pendingSubblockUpdates.get(debouncedKey)
75+
if (pending) {
76+
await flushSubblockUpdate(workflowId, pending, roomManager)
77+
pendingSubblockUpdates.delete(debouncedKey)
9678
}
97-
}
98-
99-
await tx
100-
.update(workflowBlocks)
101-
.set({
102-
subBlocks: subBlocks,
103-
updatedAt: new Date(),
104-
})
105-
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
106-
107-
updateSuccessful = true
108-
})
109-
110-
// Only broadcast to other clients if the update was successful
111-
if (updateSuccessful) {
112-
socket.to(workflowId).emit('subblock-update', {
113-
blockId,
114-
subblockId,
115-
value,
116-
timestamp,
117-
senderId: socket.id,
118-
userId: session.userId,
119-
})
120-
121-
// Emit confirmation if operationId is provided
122-
if (operationId) {
123-
socket.emit('operation-confirmed', {
124-
operationId,
125-
serverTimestamp: Date.now(),
126-
})
127-
}
128-
129-
logger.debug(`Subblock update in workflow ${workflowId}: ${blockId}.${subblockId}`)
130-
} else if (operationId) {
131-
// Block was deleted - notify client that operation completed (but didn't update anything)
132-
socket.emit('operation-failed', {
133-
operationId,
134-
error: 'Block no longer exists',
135-
retryable: false, // No point retrying for deleted blocks
79+
}, 25)
80+
pendingSubblockUpdates.set(debouncedKey, {
81+
latest: { blockId, subblockId, value, timestamp },
82+
timeout,
83+
opToSocket,
13684
})
13785
}
13886
} catch (error) {
13987
logger.error('Error handling subblock update:', error)
14088

14189
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
14290

143-
// Emit operation-failed for queue-tracked operations
91+
// Best-effort failure for the single operation if provided
14492
if (operationId) {
14593
socket.emit('operation-failed', {
14694
operationId,
14795
error: errorMessage,
148-
retryable: true, // Subblock updates are generally retryable
96+
retryable: true,
14997
})
15098
}
15199

@@ -159,3 +107,119 @@ export function setupSubblocksHandlers(
159107
}
160108
})
161109
}
110+
111+
async function flushSubblockUpdate(
112+
workflowId: string,
113+
pending: PendingSubblock,
114+
roomManager: RoomManager
115+
) {
116+
const { blockId, subblockId, value, timestamp } = pending.latest
117+
try {
118+
// Verify workflow still exists
119+
const workflowExists = await db
120+
.select({ id: workflow.id })
121+
.from(workflow)
122+
.where(eq(workflow.id, workflowId))
123+
.limit(1)
124+
125+
if (workflowExists.length === 0) {
126+
pending.opToSocket.forEach((socketId, opId) => {
127+
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
128+
if (sock) {
129+
sock.emit('operation-failed', {
130+
operationId: opId,
131+
error: 'Workflow not found',
132+
retryable: false,
133+
})
134+
}
135+
})
136+
return
137+
}
138+
139+
let updateSuccessful = false
140+
await db.transaction(async (tx) => {
141+
const [block] = await tx
142+
.select({ subBlocks: workflowBlocks.subBlocks })
143+
.from(workflowBlocks)
144+
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
145+
.limit(1)
146+
147+
if (!block) {
148+
return
149+
}
150+
151+
const subBlocks = (block.subBlocks as any) || {}
152+
if (!subBlocks[subblockId]) {
153+
subBlocks[subblockId] = { id: subblockId, type: 'unknown', value }
154+
} else {
155+
subBlocks[subblockId] = { ...subBlocks[subblockId], value }
156+
}
157+
158+
await tx
159+
.update(workflowBlocks)
160+
.set({ subBlocks, updatedAt: new Date() })
161+
.where(and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId)))
162+
163+
updateSuccessful = true
164+
})
165+
166+
if (updateSuccessful) {
167+
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
168+
const senderSocketIds = new Set(pending.opToSocket.values())
169+
const io = (roomManager as any).io
170+
if (io) {
171+
// Get all sockets in the room
172+
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
173+
if (roomSockets) {
174+
roomSockets.forEach((socketId: string) => {
175+
// Only emit to sockets that didn't send any of the coalesced ops
176+
if (!senderSocketIds.has(socketId)) {
177+
const sock = io.sockets.sockets.get(socketId)
178+
if (sock) {
179+
sock.emit('subblock-update', {
180+
blockId,
181+
subblockId,
182+
value,
183+
timestamp,
184+
})
185+
}
186+
}
187+
})
188+
}
189+
}
190+
191+
// Confirm all coalesced operationIds
192+
pending.opToSocket.forEach((socketId, opId) => {
193+
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
194+
if (sock) {
195+
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
196+
}
197+
})
198+
199+
logger.debug(`Flushed subblock update ${workflowId}: ${blockId}.${subblockId}`)
200+
} else {
201+
pending.opToSocket.forEach((socketId, opId) => {
202+
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
203+
if (sock) {
204+
sock.emit('operation-failed', {
205+
operationId: opId,
206+
error: 'Block no longer exists',
207+
retryable: false,
208+
})
209+
}
210+
})
211+
}
212+
} catch (error) {
213+
logger.error('Error flushing subblock update:', error)
214+
pending.opToSocket.forEach((socketId, opId) => {
215+
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
216+
if (sock) {
217+
sock.emit('operation-failed', {
218+
operationId: opId,
219+
error: error instanceof Error ? error.message : 'Unknown error',
220+
retryable: true,
221+
})
222+
}
223+
})
224+
}
225+
}

0 commit comments

Comments
 (0)