Skip to content

Commit baca74d

Browse files
committed
[GS] Memoize consumers by table oid
1 parent 20ec3c8 commit baca74d

File tree

1 file changed

+51
-4
lines changed

1 file changed

+51
-4
lines changed

lib/sequin/runtime/message_handler.ex

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,15 +222,13 @@ defmodule Sequin.Runtime.MessageHandler do
222222
# We group_by consumer_id throughput because consumer as a key is slow!
223223
# So we need to do fast lookups by consumer_id
224224
consumers_by_id = Map.new(ctx.consumers, fn consumer -> {consumer.id, consumer} end)
225+
{consumers_by_table_oid, wildcard_consumer_ids} = build_consumer_lookups(ctx.consumers)
225226

226227
messages
227228
# First we get a list of consumer_ids that match the SlotProcessor.Message
228229
|> Stream.map(fn %SlotProcessor.Message{} = message ->
229230
matching_consumer_ids =
230-
ctx.consumers
231-
|> Stream.filter(&Consumers.matches_message?(&1, message))
232-
# Then we map to a list of consumer_ids
233-
|> Enum.map(& &1.id)
231+
matching_consumer_ids(message, consumers_by_id, consumers_by_table_oid, wildcard_consumer_ids)
234232

235233
{message, matching_consumer_ids}
236234
end)
@@ -258,6 +256,55 @@ defmodule Sequin.Runtime.MessageHandler do
258256
end)
259257
end
260258

259+
# Uses memoized lookups to find matching consumer IDs for a message.
260+
# For consumers with simple include_table_oids config, we do O(1) lookup by table_oid.
261+
# For consumers with complex filters (wildcard group), we still need to check matches_message?.
262+
defp matching_consumer_ids(
263+
%SlotProcessor.Message{} = message,
264+
consumers_by_id,
265+
consumers_by_table_oid,
266+
wildcard_consumer_ids
267+
) do
268+
# Get consumers from the memoized table_oid lookup
269+
simple_consumer_ids = Map.get(consumers_by_table_oid, message.table_oid, [])
270+
271+
Enum.filter(simple_consumer_ids ++ wildcard_consumer_ids, fn consumer_id ->
272+
consumer = Map.fetch!(consumers_by_id, consumer_id)
273+
Consumers.matches_message?(consumer, message)
274+
end)
275+
end
276+
277+
# Builds memoized consumer lookup structures for performance optimization.
278+
# Consumers with only include_table_oids are grouped by table_oid for O(1) lookup.
279+
# All other consumers (with schema filters or exclude rules) go into a wildcard list.
280+
defp build_consumer_lookups(consumers) do
281+
{simple_consumers, wildcard_consumers} = Enum.split_with(consumers, &simple_table_oid_consumer?/1)
282+
283+
consumers_by_table_oid =
284+
Enum.reduce(simple_consumers, %{}, fn consumer, acc ->
285+
table_oids = consumer.source.include_table_oids
286+
287+
Enum.reduce(table_oids, acc, fn table_oid, inner_acc ->
288+
Map.update(inner_acc, table_oid, [consumer.id], fn ids -> [consumer.id | ids] end)
289+
end)
290+
end)
291+
292+
wildcard_consumer_ids = Enum.map(wildcard_consumers, & &1.id)
293+
294+
{consumers_by_table_oid, wildcard_consumer_ids}
295+
end
296+
297+
# Returns true if the consumer has a simple source configuration with only include_table_oids
298+
# and no other filtering rules (no schema filters, no exclude rules).
299+
defp simple_table_oid_consumer?(%SinkConsumer{source: nil}), do: false
300+
301+
defp simple_table_oid_consumer?(%SinkConsumer{source: %Consumers.Source{} = source}) do
302+
is_list(source.include_table_oids) and
303+
is_nil(source.exclude_table_oids) and
304+
is_nil(source.include_schemas) and
305+
is_nil(source.exclude_schemas)
306+
end
307+
261308
@decorate track_metrics("map_to_consumer_message")
262309
defp consumer_message(%SinkConsumer{} = consumer, %PostgresDatabase{} = database, %SlotProcessor.Message{} = message) do
263310
Consumers.consumer_message(consumer, database, message)

0 commit comments

Comments
 (0)