Skip to content

Commit 17e8e91

Browse files
committed
refactor(event_manager): throttle event emission
Add new module, ThrottlingEmitter, that collects events and "drains" them every drain_interval_ms. This allows us to batch events which lets us process a group of them quickly. That, in turn, helps make sure we don't overwhelm neovim (and particularly treesitter) with a ton of events. Before this change, the behavior we'd see is that the neovim ui could get slow or hang for several/many seconds if we emitted too many events. Disabling treesitter seemed to eliminate that behavior but it also caused the buffers to lose their highlighting which wasn't a good user experience.
1 parent ce61b67 commit 17e8e91

File tree

4 files changed

+110
-48
lines changed

4 files changed

+110
-48
lines changed

lua/opencode/event_manager.lua

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
local state = require('opencode.state')
2+
local ThrottlingEmitter = require('opencode.throttling_emitter')
23

34
--- @class EventInstallationUpdated
45
--- @field type "installation.updated"
@@ -113,24 +114,34 @@ local state = require('opencode.state')
113114
--- | "custom.server_ready"
114115
--- | "custom.server_stopped"
115116
--- | "custom.restore_point.created"
117+
--- | "custom.emit_events.started"
118+
--- | "custom.emit_events.finished"
116119

117120
--- @class EventManager
118121
--- @field events table<string, function[]> Event listener registry
119122
--- @field server_subscription table|nil Subscription to server events
120123
--- @field is_started boolean Whether the event manager is started
121124
--- @field captured_events table[] List of captured events for debugging
125+
--- @field throttling_emitter ThrottlingEmitter Throttle instance for batching events
122126
local EventManager = {}
123127
EventManager.__index = EventManager
124128

125129
--- Create a new EventManager instance
126130
--- @return EventManager
127131
function EventManager.new()
128-
return setmetatable({
132+
local self = setmetatable({
129133
events = {},
130134
server_subscription = nil,
131135
is_started = false,
132136
captured_events = {},
133137
}, EventManager)
138+
139+
-- TODO: make drain delay configurable
140+
self.throttling_emitter = ThrottlingEmitter.new(function(events)
141+
self:_on_drained_events(events)
142+
end, 40)
143+
144+
return self
134145
end
135146

136147
--- Subscribe to an event with type-safe callbacks using function overloads
@@ -155,6 +166,8 @@ end
155166
--- @overload fun(self: EventManager, event_name: "custom.server_ready", callback: fun(data: ServerReadyEvent['properties']): nil)
156167
--- @overload fun(self: EventManager, event_name: "custom.server_stopped", callback: fun(data: ServerStoppedEvent['properties']): nil)
157168
--- @overload fun(self: EventManager, event_name: "custom.restore_point.created", callback: fun(data: RestorePointCreatedEvent['properties']): nil)
169+
--- @overload fun(self: EventManager, event_name: "custom.emit_events.started", callback: fun(): nil)
170+
--- @overload fun(self: EventManager, event_name: "custom.emit_events.finished", callback: fun(): nil)
158171
--- @param event_name OpencodeEventName The event name to listen for
159172
--- @param callback function Callback function to execute when event is triggered
160173
function EventManager:subscribe(event_name, callback)
@@ -186,6 +199,8 @@ end
186199
--- @overload fun(self: EventManager, event_name: "custom.server_ready", callback: fun(data: ServerReadyEvent['properties']): nil)
187200
--- @overload fun(self: EventManager, event_name: "custom.server_stopped", callback: fun(data: ServerStoppedEvent['properties']): nil)
188201
--- @overload fun(self: EventManager, event_name: "custom.restore_point.created", callback: fun(data: RestorePointCreatedEvent['properties']): nil)
202+
--- @overload fun(self: EventManager, event_name: "custom.emit_events.started", callback: fun(): nil)
203+
--- @overload fun(self: EventManager, event_name: "custom.emit_events.finished", callback: fun(): nil)
189204
--- @param event_name OpencodeEventName The event name
190205
--- @param callback function The callback function to remove
191206
function EventManager:unsubscribe(event_name, callback)
@@ -202,6 +217,21 @@ function EventManager:unsubscribe(event_name, callback)
202217
end
203218
end
204219

220+
---Callaback from ThrottlingEmitter when the events are now
221+
---ready to be processed
222+
---@param events any
223+
function EventManager:_on_drained_events(events)
224+
self:emit('custom.emit_events.started', {})
225+
226+
-- TODO: try collapsing events here
227+
228+
for _, event in ipairs(events) do
229+
self:emit(event.type, event.properties)
230+
end
231+
232+
self:emit('custom.emit_events.finished', {})
233+
end
234+
205235
--- Emit an event to all subscribers
206236
--- @param event_name OpencodeEventName The event name
207237
--- @param data any Data to pass to event listeners
@@ -217,7 +247,6 @@ function EventManager:emit(event_name, data)
217247
table.insert(self.captured_events, vim.deepcopy(event))
218248
end
219249

220-
-- schedule events to allow for similar pieces of state to be updated
221250
for _, callback in ipairs(listeners) do
222251
pcall(callback, data)
223252
end
@@ -267,6 +296,7 @@ function EventManager:stop()
267296
self.is_started = false
268297
self:_cleanup_server_subscription()
269298

299+
self.throttling_emitter:clear()
270300
self.events = {}
271301
end
272302

@@ -282,9 +312,7 @@ function EventManager:_subscribe_to_server_events(server)
282312
local api_client = state.api_client
283313

284314
local emitter = function(event)
285-
vim.schedule(function()
286-
self:emit(event.type, event.properties)
287-
end)
315+
self.throttling_emitter:enqueue(event)
288316
end
289317

290318
self.server_subscription = api_client:subscribe_to_events(nil, emitter)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
local M = {}
2+
3+
--- @class ThrottlingEmitter
4+
--- @field queue table[] Queue of pending items to be processed
5+
--- @field drain_scheduled boolean Whether a drain is already scheduled
6+
--- @field process_fn fun(table): nil Function to process the queue of events
7+
--- @field drain_interval_ms number Interval between drains in milliseconds
8+
--- @field enqueue fun(self: ThrottlingEmitter, item: any) Enqueue an item for batch processing
9+
--- @field clear fun(self: ThrottlingEmitter) Clear the queue and cancel any pending drain
10+
local ThrottlingEmitter = {}
11+
ThrottlingEmitter.__index = ThrottlingEmitter
12+
13+
--- Create a new ThrottlingEmitter instance. This emitter collects events and
14+
--- then drains them every drain_interval_ms milliseconds. This is helpful to
15+
--- make sure we're not generating so many events that we don't overwhelm
16+
--- neovim, particularly treesitter.
17+
--- @param process_fn function Function to call for each item
18+
--- @param drain_interval_ms number? Interval between drains in milliseconds (default 10)
19+
--- @return ThrottlingEmitter
20+
function M.new(process_fn, drain_interval_ms)
21+
return setmetatable({
22+
queue = {},
23+
drain_scheduled = false,
24+
process_fn = process_fn,
25+
drain_interval_ms = drain_interval_ms or 40,
26+
}, ThrottlingEmitter)
27+
end
28+
29+
--- Enqueue an item for batch processing
30+
--- @param item any The item to enqueue
31+
function ThrottlingEmitter:enqueue(item)
32+
table.insert(self.queue, item)
33+
34+
if not self.drain_scheduled then
35+
self.drain_scheduled = true
36+
vim.defer_fn(function()
37+
self:_drain()
38+
end, self.drain_interval_ms)
39+
end
40+
end
41+
42+
--- Process all queued items
43+
function ThrottlingEmitter:_drain()
44+
self.drain_scheduled = false
45+
46+
local items_to_process = self.queue
47+
self.queue = {}
48+
49+
self.process_fn(items_to_process)
50+
51+
-- double check that items weren't added while processing
52+
if #self.queue > 0 and not self.drain_scheduled then
53+
self.drain_scheduled = true
54+
vim.defer_fn(function()
55+
self:_drain()
56+
end, self.drain_interval_ms)
57+
end
58+
-- end)
59+
end
60+
61+
--- Clear the queue and cancel any pending drain
62+
function ThrottlingEmitter:clear()
63+
self.queue = {}
64+
self.drain_scheduled = false
65+
end
66+
67+
return M

lua/opencode/ui/output_window.lua

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -105,27 +105,6 @@ function M.set_lines(lines, start_line, end_line)
105105
vim.api.nvim_set_option_value('modifiable', false, { buf = windows.output_buf })
106106
end
107107

108-
--- FIXME: remove debugging code
109-
---Set text in a specific line at character positions
110-
---@param line integer The line number (0-indexed)
111-
---@param start_col integer The starting column (0-indexed)
112-
---@param end_col integer The ending column (0-indexed)
113-
---@param text string The text to insert
114-
function M.set_text(line, start_col, end_col, text)
115-
if not M.mounted() then
116-
return
117-
end
118-
119-
local windows = state.windows
120-
if not windows or not windows.output_buf then
121-
return
122-
end
123-
124-
vim.api.nvim_set_option_value('modifiable', true, { buf = windows.output_buf })
125-
vim.api.nvim_buf_set_text(windows.output_buf, line, start_col, line, end_col, { text })
126-
vim.api.nvim_set_option_value('modifiable', false, { buf = windows.output_buf })
127-
end
128-
129108
---Clear output buf extmarks
130109
---@param start_line? integer Line to start clearing, defaults 0
131110
---@param end_line? integer Line to clear until, defaults to -1

lua/opencode/ui/renderer.lua

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ local M = {}
1010
M._subscriptions = {}
1111
M._prev_line_count = 0
1212
M._render_state = RenderState.new()
13-
M._disable_auto_scroll = false
1413
M._last_part_formatted = { part_id = nil, formatted_data = nil }
1514

1615
local trigger_on_data_rendered = require('opencode.util').debounce(function()
@@ -37,7 +36,6 @@ end, config.ui.output.rendering.markdown_debounce_ms or 250)
3736
function M.reset()
3837
M._prev_line_count = 0
3938
M._render_state:reset()
40-
M._disable_auto_scroll = false
4139
M._last_part_formatted = { part_id = nil, formatted_data = nil }
4240

4341
output_window.clear()
@@ -73,13 +71,14 @@ function M._setup_event_subscriptions(subscribe)
7371
state.event_manager[method](state.event_manager, 'session.compacted', M.on_session_compacted)
7472
state.event_manager[method](state.event_manager, 'session.error', M.on_session_error)
7573
state.event_manager[method](state.event_manager, 'message.updated', M.on_message_updated)
76-
state.event_manager[method](state.event_manager, 'message.part.updated', M.on_part_updated)
7774
state.event_manager[method](state.event_manager, 'message.removed', M.on_message_removed)
75+
state.event_manager[method](state.event_manager, 'message.part.updated', M.on_part_updated)
7876
state.event_manager[method](state.event_manager, 'message.part.removed', M.on_part_removed)
7977
state.event_manager[method](state.event_manager, 'permission.updated', M.on_permission_updated)
8078
state.event_manager[method](state.event_manager, 'permission.replied', M.on_permission_replied)
8179
state.event_manager[method](state.event_manager, 'file.edited', M.on_file_edited)
8280
state.event_manager[method](state.event_manager, 'custom.restore_point.created', M.on_restore_points)
81+
state.event_manager[method](state.event_manager, 'custom.emit_events.finished', M.on_emit_events_finished)
8382

8483
state[method]('is_opencode_focused', M.on_focus_changed)
8584
end
@@ -121,9 +120,6 @@ end
121120
function M._render_full_session_data(session_data)
122121
M.reset()
123122

124-
-- disable auto-scroll, makes loading a full session much faster
125-
M._disable_auto_scroll = true
126-
127123
local revert_index = nil
128124

129125
-- local event_manager = state.event_manager
@@ -146,9 +142,7 @@ function M._render_full_session_data(session_data)
146142
M._write_formatted_data(formatter._format_revert_message(state.messages, revert_index))
147143
end
148144

149-
-- re-enable Auto-scroll
150-
M._disable_auto_scroll = false
151-
M._scroll_to_bottom()
145+
M.scroll_to_bottom()
152146
end
153147

154148
---Render lines as the entire output buffer
@@ -169,17 +163,17 @@ function M.render_output(output_data)
169163
output_window.set_lines(output_data.lines)
170164
output_window.clear_extmarks()
171165
output_window.set_extmarks(output_data.extmarks)
172-
M._scroll_to_bottom()
166+
M.scroll_to_bottom()
167+
end
168+
169+
---Called when EventManager has finished emitting a batch of events
170+
function M.on_emit_events_finished()
171+
M.scroll_to_bottom()
173172
end
174173

175174
---Auto-scroll to bottom if user was already at bottom
176175
---Respects cursor position if user has scrolled up
177-
function M._scroll_to_bottom()
178-
-- if we're loading a full session, don't scroll incrementally
179-
if M._disable_auto_scroll then
180-
return
181-
end
182-
176+
function M.scroll_to_bottom()
183177
local ok, line_count = pcall(vim.api.nvim_buf_line_count, state.windows.output_buf)
184178
if not ok then
185179
return
@@ -448,8 +442,6 @@ function M.on_message_updated(message, revert_index)
448442
end
449443

450444
M._update_stats_from_message(msg)
451-
452-
M._scroll_to_bottom()
453445
end
454446

455447
---Event handler for message.part.updated events
@@ -526,8 +518,6 @@ function M.on_part_updated(properties, revert_index)
526518
M._rerender_part(text_part_id)
527519
end
528520
end
529-
530-
M._scroll_to_bottom()
531521
end
532522

533523
---Event handler for message.part.removed events
@@ -653,7 +643,6 @@ function M.on_permission_updated(permission)
653643
local part_id = M._find_part_by_call_id(permission.callID, permission.messageID)
654644
if part_id then
655645
M._rerender_part(part_id)
656-
M._scroll_to_bottom()
657646
end
658647
end
659648

@@ -672,7 +661,6 @@ function M.on_permission_replied(properties)
672661
local part_id = M._find_part_by_call_id(old_permission.callID, old_permission.messageID)
673662
if part_id then
674663
M._rerender_part(part_id)
675-
M._scroll_to_bottom()
676664
end
677665
end
678666
end

0 commit comments

Comments
 (0)