Skip to content

Commit e2e5333

Browse files
committed
⚡️ Remove State.pop_blocked_messages
This behavior is O(messages) whenever a message fails. We don't need this anymore, as we automatically flush stale in-memory messages on an interval.
1 parent 504f253 commit e2e5333

File tree

4 files changed

+2
-109
lines changed

4 files changed

+2
-109
lines changed

lib/sequin/runtime/slot_message_store.ex

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -667,11 +667,8 @@ defmodule Sequin.Runtime.SlotMessageStore do
667667

668668
state = State.put_persisted_messages(state, messages)
669669

670-
with {newly_blocked_messages, state} <- State.pop_blocked_messages(state),
671-
:ok <- handle_discarded_messages(state, discarded_messages),
672-
# Now put the blocked messages into persisted_messages
673-
state = State.put_persisted_messages(state, newly_blocked_messages),
674-
:ok <- upsert_messages(state, messages ++ newly_blocked_messages) do
670+
with :ok <- handle_discarded_messages(state, discarded_messages),
671+
:ok <- upsert_messages(state, messages) do
675672
maybe_finish_table_reader_batch(prev_state, state)
676673
{:reply, :ok, state}
677674
else

lib/sequin/runtime/slot_message_store_state.ex

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -207,22 +207,6 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
207207
}}
208208
end
209209

210-
@spec pop_blocked_messages(State.t()) :: {list(message()), State.t()}
211-
def pop_blocked_messages(%State{} = state) do
212-
execute_timed(:pop_blocked_messages, fn ->
213-
blocked_cursor_tuples =
214-
state.messages
215-
|> Stream.filter(fn {cursor_tuple, msg} ->
216-
is_message_group_persisted?(state, msg.group_id) and
217-
not Multiset.value_member?(state.persisted_message_groups, msg.group_id, cursor_tuple)
218-
end)
219-
|> Stream.map(fn {cursor_tuple, _msg} -> cursor_tuple end)
220-
|> Enum.to_list()
221-
222-
pop_messages(state, blocked_cursor_tuples)
223-
end)
224-
end
225-
226210
@spec pop_all_messages(State.t()) :: {list(message()), State.t()}
227211
def pop_all_messages(%State{} = state) do
228212
pop_messages(state, Map.keys(state.messages))
@@ -539,16 +523,4 @@ defmodule Sequin.Runtime.SlotMessageStore.State do
539523
end)
540524
|> Enum.to_list()
541525
end
542-
543-
defp incr_counter(name, amount \\ 1) do
544-
current = Process.get(name, 0)
545-
Process.put(name, current + amount)
546-
end
547-
548-
defp execute_timed(name, fun) do
549-
{time, result} = :timer.tc(fun, :millisecond)
550-
incr_counter(:"#{name}_total_ms", time)
551-
incr_counter(:"#{name}_count")
552-
result
553-
end
554526
end

test/sequin/slot_message_store_state_test.exs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -196,32 +196,6 @@ defmodule Sequin.Runtime.SlotMessageStoreStateTest do
196196
end
197197
end
198198

199-
describe "pop_blocked_messages/2" do
200-
test "pops messages that should be moved to persisted_message_groups in persisted_message_groups", %{state: state} do
201-
# Add two messages with different group_ids
202-
msg1 = ConsumersFactory.consumer_message(group_id: "group1")
203-
msg2 = ConsumersFactory.consumer_message(group_id: "group2")
204-
205-
{:ok, state} = State.put_messages(state, [msg1, msg2])
206-
207-
# Add a persisted message that shares group_id with msg1
208-
persisted_msg = ConsumersFactory.consumer_message(group_id: "group1")
209-
state = State.put_persisted_messages(state, [persisted_msg])
210-
211-
# Call pop_blocked_messages/2
212-
{popped_messages, updated_state} = State.pop_blocked_messages(state)
213-
214-
# Verify we get back only msg1 which shares group_id with the persisted message
215-
assert length(popped_messages) == 1
216-
[popped_msg] = popped_messages
217-
assert_cursor_tuple_matches(msg1, popped_msg)
218-
assert popped_msg.group_id == "group1"
219-
220-
# Verify msg2 is still in state since it wasn't blocked
221-
assert_message_in_state(msg2, updated_state)
222-
end
223-
end
224-
225199
describe "end to end test with put and pop" do
226200
test "popped messages are removed from state", %{state: state} do
227201
now = DateTime.utc_now()

test/sequin/slot_message_store_test.exs

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -221,50 +221,6 @@ defmodule Sequin.SlotMessageStoreTest do
221221
assert Enum.all?(messages, fn msg -> msg.not_visible_until == not_visible_until end)
222222
end
223223

224-
test "when a message fails, non-persisted messages of the same group_id are persisted", %{consumer: consumer} do
225-
# Create two messages with same group_id
226-
group_id = "test-group"
227-
228-
message1 =
229-
ConsumersFactory.consumer_message(
230-
group_id: group_id,
231-
message_kind: consumer.message_kind,
232-
consumer_id: consumer.id
233-
)
234-
235-
message2 =
236-
ConsumersFactory.consumer_message(
237-
group_id: group_id,
238-
message_kind: consumer.message_kind,
239-
consumer_id: consumer.id
240-
)
241-
242-
# Put both messages in store
243-
:ok = SlotMessageStore.put_messages(consumer, [message1, message2])
244-
245-
# Get and fail first message
246-
{:ok, [delivered]} = SlotMessageStore.produce(consumer, 1, self())
247-
248-
meta = %{
249-
ack_id: delivered.ack_id,
250-
deliver_count: 1,
251-
group_id: delivered.group_id,
252-
last_delivered_at: DateTime.utc_now(),
253-
not_visible_until: DateTime.add(DateTime.utc_now(), 60)
254-
}
255-
256-
:ok = SlotMessageStore.messages_failed(consumer, [meta])
257-
258-
# Verify both messages are now persisted
259-
messages = SlotMessageStore.peek_messages(consumer, 2)
260-
assert length(messages) == 2
261-
assert Enum.all?(messages, &(&1.group_id == group_id))
262-
263-
persisted_messages = Consumers.list_consumer_messages_for_consumer(consumer)
264-
assert length(persisted_messages) == 2
265-
assert Enum.all?(persisted_messages, &(&1.group_id == group_id))
266-
end
267-
268224
test "if the pid changes between calls of produce_messages, produced_messages are available for deliver", %{
269225
consumer: consumer
270226
} do
@@ -389,12 +345,6 @@ defmodule Sequin.SlotMessageStoreTest do
389345

390346
# Put second message (nil group_id)
391347
:ok = SlotMessageStore.put_messages(consumer, [message2])
392-
393-
# Verify only messages with non-nil group_ids are blocked
394-
Enum.each(SlotMessageStore.peek(consumer), fn state ->
395-
{blocked_messages, _state} = State.pop_blocked_messages(state)
396-
assert length(blocked_messages) == 0
397-
end)
398348
end
399349

400350
test "duplicate messages don't accumulate payload size", %{consumer: consumer} do

0 commit comments

Comments
 (0)