diff --git a/lib/sequin/consumers/consumers.ex b/lib/sequin/consumers/consumers.ex index e99fb5df5..fe34519d6 100644 --- a/lib/sequin/consumers/consumers.ex +++ b/lib/sequin/consumers/consumers.ex @@ -240,34 +240,6 @@ defmodule Sequin.Consumers do round(Sequin.Size.mb(consumer.max_memory_mb) * 0.8) end - @doc """ - Calculates the maximum memory bytes allowed for a consumer when a system-wide max is in place. - - Takes into account both: - - The consumer's configured max_memory_mb setting - - The system-wide memory limit divided among all non-disabled consumers - - ## Returns - The lower of: - * Consumer's max_memory_mb converted to bytes * 0.8 - * (system_max_memory_bytes / number of non-disabled consumers) * 0.8 - """ - @spec max_system_memory_bytes_for_consumer(SinkConsumer.t(), non_neg_integer(), non_neg_integer()) :: - non_neg_integer() - def max_system_memory_bytes_for_consumer(%SinkConsumer{} = consumer, consumer_count, system_max_memory_bytes) do - consumer_max_memory_bytes = max_memory_bytes_for_consumer(consumer) - - # Some tests may call this with 0 consumers actually persisted - consumer_count = max(consumer_count, 1) - - system_per_consumer_max_memory_bytes = round(div(system_max_memory_bytes, consumer_count) * 0.8) - - min( - consumer_max_memory_bytes, - system_per_consumer_max_memory_bytes - ) - end - def earliest_sink_consumer_inserted_at_for_account(account_id) do account_id |> SinkConsumer.where_account_id() diff --git a/lib/sequin/runtime/slot_message_store.ex b/lib/sequin/runtime/slot_message_store.ex index 7d86ca445..6ef82641a 100644 --- a/lib/sequin/runtime/slot_message_store.ex +++ b/lib/sequin/runtime/slot_message_store.ex @@ -903,15 +903,7 @@ defmodule Sequin.Runtime.SlotMessageStore do defp put_max_memory_bytes(%State{} = state) do consumer = state.consumer - max_memory_bytes = - if state.setting_system_max_memory_bytes do - # Must be self-hosted/dev. Check consumer's max_memory_mb setting - consumer_count = Consumers.count_non_disabled_sink_consumers() - - Consumers.max_system_memory_bytes_for_consumer(consumer, consumer_count, state.setting_system_max_memory_bytes) - else - Consumers.max_memory_bytes_for_consumer(consumer) - end + max_memory_bytes = Consumers.max_memory_bytes_for_consumer(consumer) %{state | max_memory_bytes: div(max_memory_bytes, consumer.partition_count)} end diff --git a/test/sequin/consumers_test.exs b/test/sequin/consumers_test.exs index ed6060692..eec1b5cc1 100644 --- a/test/sequin/consumers_test.exs +++ b/test/sequin/consumers_test.exs @@ -2249,33 +2249,6 @@ defmodule Sequin.ConsumersTest do end end - describe "max_system_memory_bytes_for_consumer/3" do - test "returns minimum between consumer's and system's per-consumer max memory" do - # Set up a consumer with 1000MB max memory - consumer = ConsumersFactory.sink_consumer(max_memory_mb: 1000) - - # Mock system having 2000MB total memory - system_max_bytes = Size.mb(2000) - - # With single consumer, returns consumer's max_bytes since it's lower - expected_consumer_max_bytes = round(Size.mb(1000) * 0.8) - assert Consumers.max_system_memory_bytes_for_consumer(consumer, 1, system_max_bytes) == expected_consumer_max_bytes - - # With 5 consumers sharing system memory - expected_shared_max_bytes = round(system_max_bytes / 5 * 0.8) - - # Each consumer (even with different memory limits) gets equal share of system memory - consumers = - for _ <- 1..4 do - ConsumersFactory.sink_consumer(max_memory_mb: Enum.random(1000..10_000)) - end - - Enum.each(consumers, fn consumer -> - assert Consumers.max_system_memory_bytes_for_consumer(consumer, 5, system_max_bytes) == expected_shared_max_bytes - end) - end - end - describe "where_wal_cursor_in/2" do test "finds events matching WAL cursors for a specific consumer" do # Create two consumers diff --git a/test/sequin/slot_message_store_test.exs b/test/sequin/slot_message_store_test.exs index 2158aa4ff..751a78513 100644 --- a/test/sequin/slot_message_store_test.exs +++ b/test/sequin/slot_message_store_test.exs @@ -742,18 +742,17 @@ defmodule Sequin.SlotMessageStoreTest do describe "SlotMessageStore load shedding behavior" do test "returns error when load_shedding_policy=pause_on_full" do - consumer = ConsumersFactory.insert_sink_consumer!(load_shedding_policy: :pause_on_full) + consumer = ConsumersFactory.insert_sink_consumer!(load_shedding_policy: :pause_on_full, max_memory_mb: 128) - start_supervised!( - {SlotMessageStoreSupervisor, consumer: consumer, test_pid: self(), setting_system_max_memory_bytes: 1} - ) + start_supervised!({SlotMessageStoreSupervisor, consumer: consumer, test_pid: self()}) # Create a message with a specific group_id to ensure consistent partitioning message = ConsumersFactory.consumer_message( message_kind: consumer.message_kind, consumer_id: consumer.id, - group_id: "test-group" + group_id: "test-group", + payload_size_bytes: 130 * 1000 * 1000 ) # Put message in store