Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 1 addition & 40 deletions lib/sequin/runtime/slot_message_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1097,46 +1097,7 @@ defmodule Sequin.Runtime.SlotMessageStore do
end)
end

defp stream_messages_into_state(%State{} = state) do
%SinkConsumer{} = consumer = Repo.preload(state.consumer, :postgres_database)
%PostgresDatabase{} = database = consumer.postgres_database

# Stream messages and stop when we reach max_memory_bytes
{time, persisted_messages} =
:timer.tc(fn ->
consumer
|> Consumers.stream_consumer_messages_for_consumer()
|> Stream.map(&Consumers.put_dynamic_fields(&1, consumer, database))
|> Stream.filter(&(message_partition(&1, state.consumer.partition_count) == state.partition))
|> Stream.reject(&State.message_exists?(state, &1))
|> Enum.map(fn msg ->
%{msg | payload_size_bytes: :erlang.external_size(msg.data)}
end)
end)

duration_ms = time / 1000

if duration_ms > 100 do
Logger.warning("[SlotMessageStore] Loaded messages from disk (duration=#{duration_ms}ms)",
consumer_id: state.consumer_id,
partition: state.partition,
message_count: length(persisted_messages),
duration_ms: duration_ms
)
else
if persisted_messages != [] do
Logger.info("[SlotMessageStore] Loaded messages from disk",
consumer_id: state.consumer_id,
partition: state.partition,
message_count: length(persisted_messages),
duration_ms: duration_ms
)
end
end

# Now put the messages into state
{:ok, State.put_persisted_messages(state, persisted_messages)}
end
defp stream_messages_into_state(%State{} = state), do: {:ok, state}

@decorate track_metrics("delete_messages_noop")
defp delete_messages(%State{}, []), do: :ok
Expand Down
Loading