Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions lib/sequin/consumers/consumers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -240,34 +240,6 @@ defmodule Sequin.Consumers do
round(Sequin.Size.mb(consumer.max_memory_mb) * 0.8)
end

@doc """
Calculates the maximum memory bytes allowed for a consumer when a system-wide max is in place.

Takes into account both:
- The consumer's configured max_memory_mb setting
- The system-wide memory limit divided among all non-disabled consumers

## Returns
The lower of:
* Consumer's max_memory_mb converted to bytes * 0.8
* (system_max_memory_bytes / number of non-disabled consumers) * 0.8
"""
@spec max_system_memory_bytes_for_consumer(SinkConsumer.t(), non_neg_integer(), non_neg_integer()) ::
non_neg_integer()
def max_system_memory_bytes_for_consumer(%SinkConsumer{} = consumer, consumer_count, system_max_memory_bytes) do
consumer_max_memory_bytes = max_memory_bytes_for_consumer(consumer)

# Some tests may call this with 0 consumers actually persisted
consumer_count = max(consumer_count, 1)

system_per_consumer_max_memory_bytes = round(div(system_max_memory_bytes, consumer_count) * 0.8)

min(
consumer_max_memory_bytes,
system_per_consumer_max_memory_bytes
)
end

def earliest_sink_consumer_inserted_at_for_account(account_id) do
account_id
|> SinkConsumer.where_account_id()
Expand Down
10 changes: 1 addition & 9 deletions lib/sequin/runtime/slot_message_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -903,15 +903,7 @@ defmodule Sequin.Runtime.SlotMessageStore do
defp put_max_memory_bytes(%State{} = state) do
consumer = state.consumer

max_memory_bytes =
if state.setting_system_max_memory_bytes do
# Must be self-hosted/dev. Check consumer's max_memory_mb setting
consumer_count = Consumers.count_non_disabled_sink_consumers()

Consumers.max_system_memory_bytes_for_consumer(consumer, consumer_count, state.setting_system_max_memory_bytes)
else
Consumers.max_memory_bytes_for_consumer(consumer)
end
max_memory_bytes = Consumers.max_memory_bytes_for_consumer(consumer)

%{state | max_memory_bytes: div(max_memory_bytes, consumer.partition_count)}
end
Expand Down
27 changes: 0 additions & 27 deletions test/sequin/consumers_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2249,33 +2249,6 @@ defmodule Sequin.ConsumersTest do
end
end

describe "max_system_memory_bytes_for_consumer/3" do
test "returns minimum between consumer's and system's per-consumer max memory" do
# Set up a consumer with 1000MB max memory
consumer = ConsumersFactory.sink_consumer(max_memory_mb: 1000)

# Mock system having 2000MB total memory
system_max_bytes = Size.mb(2000)

# With single consumer, returns consumer's max_bytes since it's lower
expected_consumer_max_bytes = round(Size.mb(1000) * 0.8)
assert Consumers.max_system_memory_bytes_for_consumer(consumer, 1, system_max_bytes) == expected_consumer_max_bytes

# With 5 consumers sharing system memory
expected_shared_max_bytes = round(system_max_bytes / 5 * 0.8)

# Each consumer (even with different memory limits) gets equal share of system memory
consumers =
for _ <- 1..4 do
ConsumersFactory.sink_consumer(max_memory_mb: Enum.random(1000..10_000))
end

Enum.each(consumers, fn consumer ->
assert Consumers.max_system_memory_bytes_for_consumer(consumer, 5, system_max_bytes) == expected_shared_max_bytes
end)
end
end

describe "where_wal_cursor_in/2" do
test "finds events matching WAL cursors for a specific consumer" do
# Create two consumers
Expand Down
9 changes: 4 additions & 5 deletions test/sequin/slot_message_store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -742,18 +742,17 @@ defmodule Sequin.SlotMessageStoreTest do

describe "SlotMessageStore load shedding behavior" do
test "returns error when load_shedding_policy=pause_on_full" do
consumer = ConsumersFactory.insert_sink_consumer!(load_shedding_policy: :pause_on_full)
consumer = ConsumersFactory.insert_sink_consumer!(load_shedding_policy: :pause_on_full, max_memory_mb: 128)

start_supervised!(
{SlotMessageStoreSupervisor, consumer: consumer, test_pid: self(), setting_system_max_memory_bytes: 1}
)
start_supervised!({SlotMessageStoreSupervisor, consumer: consumer, test_pid: self()})

# Create a message with a specific group_id to ensure consistent partitioning
message =
ConsumersFactory.consumer_message(
message_kind: consumer.message_kind,
consumer_id: consumer.id,
group_id: "test-group"
group_id: "test-group",
payload_size_bytes: 130 * 1000 * 1000
)

# Put message in store
Expand Down
Loading