Skip to content

Commit a5c226c

Browse files
committed
Don't factor in system max memory in individual SMS sizes
We let you set a max memory for Sequin (defaults to ~80% of avail memory) or for individual slot message stores. Prior behavior was to do min( max_memory/count_sms , sms_max ). This change removes that - we'll let an SMS fill up to his max, so long as the system can support it.
1 parent 3bdcea4 commit a5c226c

File tree

4 files changed

+5
-69
lines changed

4 files changed

+5
-69
lines changed

lib/sequin/consumers/consumers.ex

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -240,34 +240,6 @@ defmodule Sequin.Consumers do
240240
round(Sequin.Size.mb(consumer.max_memory_mb) * 0.8)
241241
end
242242

243-
@doc """
244-
Calculates the maximum memory bytes allowed for a consumer when a system-wide max is in place.
245-
246-
Takes into account both:
247-
- The consumer's configured max_memory_mb setting
248-
- The system-wide memory limit divided among all non-disabled consumers
249-
250-
## Returns
251-
The lower of:
252-
* Consumer's max_memory_mb converted to bytes * 0.8
253-
* (system_max_memory_bytes / number of non-disabled consumers) * 0.8
254-
"""
255-
@spec max_system_memory_bytes_for_consumer(SinkConsumer.t(), non_neg_integer(), non_neg_integer()) ::
256-
non_neg_integer()
257-
def max_system_memory_bytes_for_consumer(%SinkConsumer{} = consumer, consumer_count, system_max_memory_bytes) do
258-
consumer_max_memory_bytes = max_memory_bytes_for_consumer(consumer)
259-
260-
# Some tests may call this with 0 consumers actually persisted
261-
consumer_count = max(consumer_count, 1)
262-
263-
system_per_consumer_max_memory_bytes = round(div(system_max_memory_bytes, consumer_count) * 0.8)
264-
265-
min(
266-
consumer_max_memory_bytes,
267-
system_per_consumer_max_memory_bytes
268-
)
269-
end
270-
271243
def earliest_sink_consumer_inserted_at_for_account(account_id) do
272244
account_id
273245
|> SinkConsumer.where_account_id()

lib/sequin/runtime/slot_message_store.ex

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -903,15 +903,7 @@ defmodule Sequin.Runtime.SlotMessageStore do
903903
defp put_max_memory_bytes(%State{} = state) do
904904
consumer = state.consumer
905905

906-
max_memory_bytes =
907-
if state.setting_system_max_memory_bytes do
908-
# Must be self-hosted/dev. Check consumer's max_memory_mb setting
909-
consumer_count = Consumers.count_non_disabled_sink_consumers()
910-
911-
Consumers.max_system_memory_bytes_for_consumer(consumer, consumer_count, state.setting_system_max_memory_bytes)
912-
else
913-
Consumers.max_memory_bytes_for_consumer(consumer)
914-
end
906+
max_memory_bytes = Consumers.max_memory_bytes_for_consumer(consumer)
915907

916908
%{state | max_memory_bytes: div(max_memory_bytes, consumer.partition_count)}
917909
end

test/sequin/consumers_test.exs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2249,33 +2249,6 @@ defmodule Sequin.ConsumersTest do
22492249
end
22502250
end
22512251

2252-
describe "max_system_memory_bytes_for_consumer/3" do
2253-
test "returns minimum between consumer's and system's per-consumer max memory" do
2254-
# Set up a consumer with 1000MB max memory
2255-
consumer = ConsumersFactory.sink_consumer(max_memory_mb: 1000)
2256-
2257-
# Mock system having 2000MB total memory
2258-
system_max_bytes = Size.mb(2000)
2259-
2260-
# With single consumer, returns consumer's max_bytes since it's lower
2261-
expected_consumer_max_bytes = round(Size.mb(1000) * 0.8)
2262-
assert Consumers.max_system_memory_bytes_for_consumer(consumer, 1, system_max_bytes) == expected_consumer_max_bytes
2263-
2264-
# With 5 consumers sharing system memory
2265-
expected_shared_max_bytes = round(system_max_bytes / 5 * 0.8)
2266-
2267-
# Each consumer (even with different memory limits) gets equal share of system memory
2268-
consumers =
2269-
for _ <- 1..4 do
2270-
ConsumersFactory.sink_consumer(max_memory_mb: Enum.random(1000..10_000))
2271-
end
2272-
2273-
Enum.each(consumers, fn consumer ->
2274-
assert Consumers.max_system_memory_bytes_for_consumer(consumer, 5, system_max_bytes) == expected_shared_max_bytes
2275-
end)
2276-
end
2277-
end
2278-
22792252
describe "where_wal_cursor_in/2" do
22802253
test "finds events matching WAL cursors for a specific consumer" do
22812254
# Create two consumers

test/sequin/slot_message_store_test.exs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -742,18 +742,17 @@ defmodule Sequin.SlotMessageStoreTest do
742742

743743
describe "SlotMessageStore load shedding behavior" do
744744
test "returns error when load_shedding_policy=pause_on_full" do
745-
consumer = ConsumersFactory.insert_sink_consumer!(load_shedding_policy: :pause_on_full)
745+
consumer = ConsumersFactory.insert_sink_consumer!(load_shedding_policy: :pause_on_full, max_memory_mb: 128)
746746

747-
start_supervised!(
748-
{SlotMessageStoreSupervisor, consumer: consumer, test_pid: self(), setting_system_max_memory_bytes: 1}
749-
)
747+
start_supervised!({SlotMessageStoreSupervisor, consumer: consumer, test_pid: self()})
750748

751749
# Create a message with a specific group_id to ensure consistent partitioning
752750
message =
753751
ConsumersFactory.consumer_message(
754752
message_kind: consumer.message_kind,
755753
consumer_id: consumer.id,
756-
group_id: "test-group"
754+
group_id: "test-group",
755+
payload_size_bytes: 130 * 1000 * 1000
757756
)
758757

759758
# Put message in store

0 commit comments

Comments
 (0)