@@ -26,6 +26,7 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
2626 field :partition , non_neg_integer ( )
2727 field :produced_message_groups , Multiset . t ( ) , default: % { }
2828 field :persisted_message_groups , Multiset . t ( ) , default: % { }
29+ field :backfill_message_groups , Multiset . t ( ) , default: % { }
2930 field :unpersisted_cursor_tuples_for_table_reader_batches , Multiset . t ( ) , default: % { }
3031 field :setting_max_messages , non_neg_integer ( ) , default: @ default_setting_max_messages
3132 field :setting_system_max_memory_bytes , non_neg_integer ( ) | nil
@@ -48,17 +49,25 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
4849
4950 @ spec setup_ets ( State . t ( ) ) :: :ok
5051 def setup_ets ( % State { } = state ) do
51- table_name = ordered_cursors_table ( state )
52+ cdc_table_name = ordered_cdc_cursors_table ( state )
53+ backfill_table_name = ordered_backfill_cursors_table ( state )
54+
55+ :ets . new ( cdc_table_name , [ :ordered_set , :named_table , :protected ] )
56+ :ets . new ( backfill_table_name , [ :ordered_set , :named_table , :protected ] )
5257
53- :ets . new ( table_name , [ :ordered_set , :named_table , :protected ] )
5458 :ok
5559 end
5660
57- @ spec ordered_cursors_table ( State . t ( ) ) :: atom ( )
58- defp ordered_cursors_table ( % State { } = state ) do
61+ @ spec ordered_cdc_cursors_table ( State . t ( ) ) :: atom ( )
62+ defp ordered_cdc_cursors_table ( % State { } = state ) do
5963 :"slot_message_store_state_ordered_cursors_consumer_#{ state . consumer . seq } _partition_#{ state . partition } "
6064 end
6165
66+ @ spec ordered_backfill_cursors_table ( State . t ( ) ) :: atom ( )
67+ defp ordered_backfill_cursors_table ( % State { } = state ) do
68+ :"slot_message_store_state_ordered_backfill_cursors_consumer_#{ state . consumer . seq } _partition_#{ state . partition } "
69+ end
70+
6271 @ spec validate_put_messages ( State . t ( ) , list ( message ( ) ) | Enumerable . t ( ) , keyword ( ) ) ::
6372 { :ok , non_neg_integer ( ) } | { :error , Error . t ( ) }
6473 def validate_put_messages ( % State { } = state , messages , opts \\ [ ] ) do
@@ -90,11 +99,17 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
9099
91100 with { :ok , incoming_payload_size_bytes } <- validate_put_messages ( state , messages , opts ) do
92101 # Insert into ETS
93- ets_keys = Enum . map ( messages , fn msg -> { { msg . commit_lsn , msg . commit_idx } } end )
102+ { cdc_messages , backfill_messages } = Enum . split_with ( messages , & is_nil ( & 1 . table_reader_batch_id ) )
103+ cdc_ets_keys = Enum . map ( cdc_messages , fn msg -> { { msg . commit_lsn , msg . commit_idx } } end )
104+ backfill_ets_keys = Enum . map ( backfill_messages , fn msg -> { { msg . commit_lsn , msg . commit_idx } } end )
105+
106+ state
107+ |> ordered_cdc_cursors_table ( )
108+ |> :ets . insert ( cdc_ets_keys )
94109
95110 state
96- |> ordered_cursors_table ( )
97- |> :ets . insert ( ets_keys )
111+ |> ordered_backfill_cursors_table ( )
112+ |> :ets . insert ( backfill_ets_keys )
98113
99114 cursor_tuples_to_messages = Map . new ( messages , fn msg -> { { msg . commit_lsn , msg . commit_idx } , msg } end )
100115 ack_ids_to_cursor_tuples = Map . new ( messages , fn msg -> { msg . ack_id , { msg . commit_lsn , msg . commit_idx } } end )
@@ -137,12 +152,18 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
137152 unpersisted_cursor_tuples_for_table_reader_batches =
138153 Multiset . union ( state . unpersisted_cursor_tuples_for_table_reader_batches , batch_id , MapSet . new ( cursor_tuples ) )
139154
155+ backfill_message_groups =
156+ Enum . reduce ( messages , state . backfill_message_groups , fn msg , acc ->
157+ Multiset . put ( acc , group_id ( msg ) , { msg . commit_lsn , msg . commit_idx } )
158+ end )
159+
140160 case put_messages ( state , messages , skip_limit_check?: true ) do
141161 { :ok , % State { } = state } ->
142162 { :ok ,
143163 % {
144164 state
145165 | table_reader_batch_id: batch_id ,
166+ backfill_message_groups: backfill_message_groups ,
146167 unpersisted_cursor_tuples_for_table_reader_batches: unpersisted_cursor_tuples_for_table_reader_batches
147168 } }
148169
@@ -157,12 +178,27 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
157178 messages = Map . drop ( state . messages , cursor_tuples )
158179
159180 # Remove from ETS
160- table = ordered_cursors_table ( state )
181+ cdc_table = ordered_cdc_cursors_table ( state )
182+ backfill_table = ordered_backfill_cursors_table ( state )
161183
162184 Enum . each ( popped_messages , fn msg ->
163- :ets . delete ( table , { msg . commit_lsn , msg . commit_idx } )
185+ if is_nil ( msg . table_reader_batch_id ) do
186+ :ets . delete ( cdc_table , { msg . commit_lsn , msg . commit_idx } )
187+ else
188+ :ets . delete ( backfill_table , { msg . commit_lsn , msg . commit_idx } )
189+ end
164190 end )
165191
192+ # Remove from backfill_message_groups
193+ backfill_message_groups =
194+ Enum . reduce ( popped_messages , state . backfill_message_groups , fn msg , acc ->
195+ if is_nil ( msg . table_reader_batch_id ) do
196+ acc
197+ else
198+ Multiset . delete ( acc , group_id ( msg ) , { msg . commit_lsn , msg . commit_idx } )
199+ end
200+ end )
201+
166202 persisted_message_groups =
167203 Enum . reduce ( popped_messages , state . persisted_message_groups , fn msg , acc ->
168204 Multiset . delete ( acc , group_id ( msg ) , { msg . commit_lsn , msg . commit_idx } )
@@ -205,6 +241,7 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
205241 payload_size_bytes: next_payload_size_bytes ,
206242 produced_message_groups: produced_message_groups ,
207243 persisted_message_groups: persisted_message_groups ,
244+ backfill_message_groups: backfill_message_groups ,
208245 unpersisted_cursor_tuples_for_table_reader_batches: unpersisted_cursor_tuples_for_table_reader_batches
209246 } }
210247 end
@@ -478,38 +515,100 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
478515 % { message | data: nil }
479516 end
480517
481- # This function provides an optimized way to take the first N messages from a map,
482- # using ETS ordered set to maintain sort order
518+ # This function provides an optimized way to zipper merge two ordered ets sets: cdc and backfill cursor tuples.
519+ #
520+ # We prioritize CDC messages over backfill messages to ensure low latency CDC during backfills.
521+ #
522+ # We immediately switch to strictly ordered mode if any cdc messages have a conflict with backfill groups.
523+ # This is a simplified version of prioritization. We *could* prioritize CDC messages across groups
524+ # while still ensuring strict cursor tuple ordering within groups.
483525 defp sorted_message_stream ( % State { } = state ) do
484- table = ordered_cursors_table ( state )
526+ cdc_table = ordered_cdc_cursors_table ( state )
527+ backfill_table = ordered_backfill_cursors_table ( state )
485528
486- Stream . unfold ( :ets . first ( table ) , fn
487- :"$end_of_table" ->
529+ # Start in :prioritize_cdc mode
530+ Stream . unfold ( { :ets . first ( cdc_table ) , :ets . first ( backfill_table ) , :prioritize_cdc } , fn
531+ # Both tables are empty, end the stream
532+ { :"$end_of_table" , :"$end_of_table" , _ } ->
488533 nil
489534
490- cursor_tuple ->
491- next_cursor_tuple = :ets . next ( table , cursor_tuple )
492- { Map . get ( state . messages , cursor_tuple ) , next_cursor_tuple }
535+ # Backfill table is empty, stream CDC messages
536+ { cdc_cursor_tuple , :"$end_of_table" , mode } ->
537+ next_message = Map . get ( state . messages , cdc_cursor_tuple )
538+ next_accumulator = { :ets . next ( cdc_table , cdc_cursor_tuple ) , :"$end_of_table" , mode }
539+ { next_message , next_accumulator }
540+
541+ # CDC table is empty, stream backfill messages
542+ { :"$end_of_table" , backfill_cursor_tuple , mode } ->
543+ next_message = Map . get ( state . messages , backfill_cursor_tuple )
544+ next_accumulator = { :"$end_of_table" , :ets . next ( backfill_table , backfill_cursor_tuple ) , mode }
545+ { next_message , next_accumulator }
546+
547+ { cdc_cursor_tuple , backfill_cursor_tuple , :prioritize_cdc } ->
548+ # if any cdc messages have a conflict with backfill groups, we immediately switch to non prioritized mode
549+ if message_has_backfill_group_conflict? ( state , cdc_cursor_tuple ) do
550+ stream_strictly_ordered ( state , cdc_cursor_tuple , backfill_cursor_tuple )
551+ else
552+ next_message = Map . get ( state . messages , cdc_cursor_tuple )
553+ next_accumulator = { :ets . next ( cdc_table , cdc_cursor_tuple ) , backfill_cursor_tuple , :prioritize_cdc }
554+ { next_message , next_accumulator }
555+ end
556+
557+ { cdc_cursor_tuple , backfill_cursor_tuple , :strictly_ordered } ->
558+ stream_strictly_ordered ( state , cdc_cursor_tuple , backfill_cursor_tuple )
493559 end )
494560 end
495561
562+ defp message_has_backfill_group_conflict? ( % State { } = state , cursor_tuple ) do
563+ message = Map . get ( state . messages , cursor_tuple )
564+ not is_nil ( message ) and Multiset . member? ( state . backfill_message_groups , group_id ( message ) )
565+ end
566+
567+ # In strictly ordered mode, we stream the lowest cursor tuple from either ets set
568+ # For use in sorted_message_stream/1
569+ defp stream_strictly_ordered ( % State { } = state , cdc_cursor_tuple , backfill_cursor_tuple ) do
570+ if cdc_cursor_tuple < backfill_cursor_tuple do
571+ cdc_table = ordered_cdc_cursors_table ( state )
572+ next_message = Map . get ( state . messages , cdc_cursor_tuple )
573+ next_accumulator = { :ets . next ( cdc_table , cdc_cursor_tuple ) , backfill_cursor_tuple , :strictly_ordered }
574+ { next_message , next_accumulator }
575+ else
576+ backfill_table = ordered_backfill_cursors_table ( state )
577+ next_message = Map . get ( state . messages , backfill_cursor_tuple )
578+ next_accumulator = { cdc_cursor_tuple , :ets . next ( backfill_table , backfill_cursor_tuple ) , :strictly_ordered }
579+ { next_message , next_accumulator }
580+ end
581+ end
582+
496583 @ doc """
497- Counts messages that are out of sync between state.messages and ordered_cursors_table .
498- This includes both messages that exist in state.messages but not in the ETS table ,
499- and vice versa.
584+ Audits the state for consistency between state.messages and the ETS tables .
585+ This includes both messages that exist in state.messages but not in the ETS tables ,
586+ and vice- versa.
500587 """
501- def count_unsynced_messages ( % State { } = state ) do
502- table = ordered_cursors_table ( state )
503- message_keys = Map . keys ( state . messages )
588+ def audit_state ( % State { } = state ) do
589+ cdc_table = ordered_cdc_cursors_table ( state )
590+ backfill_table = ordered_backfill_cursors_table ( state )
591+ message_cursor_tuples = Map . keys ( state . messages )
504592
505- # Count keys in messages that aren't in ETS
506593 messages_not_in_ets =
507- Enum . count ( message_keys , fn { commit_lsn , commit_idx } ->
508- not :ets . member ( table , { commit_lsn , commit_idx } )
594+ Enum . count ( message_cursor_tuples , fn { commit_lsn , commit_idx } ->
595+ not :ets . member ( cdc_table , { commit_lsn , commit_idx } ) and not :ets . member ( backfill_table , { commit_lsn , commit_idx } )
509596 end )
510597
511- # Count keys in ETS that aren't in messages
512- ets_not_in_messages =
598+ cdc_ets_not_in_messages =
599+ :ets . foldl (
600+ fn { { commit_lsn , commit_idx } } = _cursor_tuple , acc ->
601+ if Map . has_key? ( state . messages , { commit_lsn , commit_idx } ) do
602+ acc
603+ else
604+ acc + 1
605+ end
606+ end ,
607+ 0 ,
608+ cdc_table
609+ )
610+
611+ backfill_ets_not_in_messages =
513612 :ets . foldl (
514613 fn { { commit_lsn , commit_idx } } = _cursor_tuple , acc ->
515614 if Map . has_key? ( state . messages , { commit_lsn , commit_idx } ) do
@@ -519,10 +618,14 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
519618 end
520619 end ,
521620 0 ,
522- table
621+ backfill_table
523622 )
524623
525- messages_not_in_ets + ets_not_in_messages
624+ % {
625+ messages_not_in_ets: messages_not_in_ets ,
626+ cdc_ets_not_in_messages: cdc_ets_not_in_messages ,
627+ backfill_ets_not_in_messages: backfill_ets_not_in_messages
628+ }
526629 end
527630
528631 @ doc """
0 commit comments