Skip to content

Commit a63d04e

Browse files
committed
feat(event_manager): collapse repeated parts in queue
Collapse repeated events in the queue and only emit the latest event but in the position of the earlier event. Keeping the earlier position is required to not get out of order parts
1 parent 64dbaeb commit a63d04e

File tree

1 file changed

+39
-6
lines changed

1 file changed

+39
-6
lines changed

lua/opencode/event_manager.lua

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local state = require('opencode.state')
2+
local config = require('opencode.config')
23
local ThrottlingEmitter = require('opencode.throttling_emitter')
34

45
--- @class EventInstallationUpdated
@@ -136,7 +137,6 @@ function EventManager.new()
136137
captured_events = {},
137138
}, EventManager)
138139

139-
local config = require('opencode.config')
140140
local throttle_ms = config.ui.output.rendering.event_throttle_ms
141141
self.throttling_emitter = ThrottlingEmitter.new(function(events)
142142
self:_on_drained_events(events)
@@ -218,16 +218,49 @@ function EventManager:unsubscribe(event_name, callback)
218218
end
219219
end
220220

221-
---Callaback from ThrottlingEmitter when the events are now
222-
---ready to be processed
221+
---Callback from ThrottlingEmitter when the events are now ready to be processed.
222+
---Collapses parts that are duplicated, making sure to replace earlier parts with later
223+
---ones (but keeping the earlier position)
223224
---@param events any
224225
function EventManager:_on_drained_events(events)
225226
self:emit('custom.emit_events.started', {})
226227

227-
-- TODO: try collapsing events here
228+
local collapsed_events = {}
229+
local part_update_indices = {}
228230

229-
for _, event in ipairs(events) do
230-
self:emit(event.type, event.properties)
231+
for i, event in ipairs(events) do
232+
if event.type == 'message.part.updated' and event.properties.part then
233+
local part_id = event.properties.part.id
234+
if part_update_indices[part_id] then
235+
-- vim.notify('collapsing: ' .. part_id .. ' text: ' .. vim.inspect(event.properties.part.text))
236+
-- put this event in the earlier slot
237+
238+
-- move this newer part to the position of the original part
239+
collapsed_events[part_update_indices[part_id]] = event
240+
241+
-- clear out this parts now unneeded position
242+
collapsed_events[i] = nil
243+
else
244+
part_update_indices[part_id] = i
245+
collapsed_events[i] = event
246+
end
247+
else
248+
collapsed_events[i] = event
249+
end
250+
end
251+
252+
local actually_emitted = 0
253+
254+
for i = 1, #events do
255+
local event = collapsed_events[i]
256+
if event then
257+
actually_emitted = actually_emitted + 1
258+
self:emit(event.type, event.properties)
259+
end
260+
end
261+
262+
if config.debug.enabled then
263+
vim.notify('Drained ' .. #events .. ', actually emitted: ' .. actually_emitted)
231264
end
232265

233266
self:emit('custom.emit_events.finished', {})

0 commit comments

Comments
 (0)