@@ -222,15 +222,13 @@ defmodule Sequin.Runtime.MessageHandler do
222222 # We group_by consumer_id throughput because consumer as a key is slow!
223223 # So we need to do fast lookups by consumer_id
224224 consumers_by_id = Map . new ( ctx . consumers , fn consumer -> { consumer . id , consumer } end )
225+ { consumers_by_table_oid , wildcard_consumer_ids } = build_consumer_lookups ( ctx . consumers )
225226
226227 messages
227228 # First we get a list of consumer_ids that match the SlotProcessor.Message
228229 |> Stream . map ( fn % SlotProcessor.Message { } = message ->
229230 matching_consumer_ids =
230- ctx . consumers
231- |> Stream . filter ( & Consumers . matches_message? ( & 1 , message ) )
232- # Then we map to a list of consumer_ids
233- |> Enum . map ( & & 1 . id )
231+ matching_consumer_ids ( message , consumers_by_id , consumers_by_table_oid , wildcard_consumer_ids )
234232
235233 { message , matching_consumer_ids }
236234 end )
@@ -258,6 +256,55 @@ defmodule Sequin.Runtime.MessageHandler do
258256 end )
259257 end
260258
259+ # Uses memoized lookups to find matching consumer IDs for a message.
260+ # For consumers with simple include_table_oids config, we do O(1) lookup by table_oid.
261+ # For consumers with complex filters (wildcard group), we still need to check matches_message?.
262+ defp matching_consumer_ids (
263+ % SlotProcessor.Message { } = message ,
264+ consumers_by_id ,
265+ consumers_by_table_oid ,
266+ wildcard_consumer_ids
267+ ) do
268+ # Get consumers from the memoized table_oid lookup
269+ simple_consumer_ids = Map . get ( consumers_by_table_oid , message . table_oid , [ ] )
270+
271+ Enum . filter ( simple_consumer_ids ++ wildcard_consumer_ids , fn consumer_id ->
272+ consumer = Map . fetch! ( consumers_by_id , consumer_id )
273+ Consumers . matches_message? ( consumer , message )
274+ end )
275+ end
276+
277+ # Builds memoized consumer lookup structures for performance optimization.
278+ # Consumers with only include_table_oids are grouped by table_oid for O(1) lookup.
279+ # All other consumers (with schema filters or exclude rules) go into a wildcard list.
280+ defp build_consumer_lookups ( consumers ) do
281+ { simple_consumers , wildcard_consumers } = Enum . split_with ( consumers , & simple_table_oid_consumer? / 1 )
282+
283+ consumers_by_table_oid =
284+ Enum . reduce ( simple_consumers , % { } , fn consumer , acc ->
285+ table_oids = consumer . source . include_table_oids
286+
287+ Enum . reduce ( table_oids , acc , fn table_oid , inner_acc ->
288+ Map . update ( inner_acc , table_oid , [ consumer . id ] , fn ids -> [ consumer . id | ids ] end )
289+ end )
290+ end )
291+
292+ wildcard_consumer_ids = Enum . map ( wildcard_consumers , & & 1 . id )
293+
294+ { consumers_by_table_oid , wildcard_consumer_ids }
295+ end
296+
297+ # Returns true if the consumer has a simple source configuration with only include_table_oids
298+ # and no other filtering rules (no schema filters, no exclude rules).
299+ defp simple_table_oid_consumer? ( % SinkConsumer { source: nil } ) , do: false
300+
301+ defp simple_table_oid_consumer? ( % SinkConsumer { source: % Consumers.Source { } = source } ) do
302+ is_list ( source . include_table_oids ) and
303+ is_nil ( source . exclude_table_oids ) and
304+ is_nil ( source . include_schemas ) and
305+ is_nil ( source . exclude_schemas )
306+ end
307+
261308 @ decorate track_metrics ( "map_to_consumer_message" )
262309 defp consumer_message ( % SinkConsumer { } = consumer , % PostgresDatabase { } = database , % SlotProcessor.Message { } = message ) do
263310 Consumers . consumer_message ( consumer , database , message )
0 commit comments