diff --git a/lib/sequin/runtime/slot_message_store.ex b/lib/sequin/runtime/slot_message_store.ex index eb6f6928a..ffce17ee6 100644 --- a/lib/sequin/runtime/slot_message_store.ex +++ b/lib/sequin/runtime/slot_message_store.ex @@ -1097,46 +1097,7 @@ defmodule Sequin.Runtime.SlotMessageStore do end) end - defp stream_messages_into_state(%State{} = state) do - %SinkConsumer{} = consumer = Repo.preload(state.consumer, :postgres_database) - %PostgresDatabase{} = database = consumer.postgres_database - - # Stream messages and stop when we reach max_memory_bytes - {time, persisted_messages} = - :timer.tc(fn -> - consumer - |> Consumers.stream_consumer_messages_for_consumer() - |> Stream.map(&Consumers.put_dynamic_fields(&1, consumer, database)) - |> Stream.filter(&(message_partition(&1, state.consumer.partition_count) == state.partition)) - |> Stream.reject(&State.message_exists?(state, &1)) - |> Enum.map(fn msg -> - %{msg | payload_size_bytes: :erlang.external_size(msg.data)} - end) - end) - - duration_ms = time / 1000 - - if duration_ms > 100 do - Logger.warning("[SlotMessageStore] Loaded messages from disk (duration=#{duration_ms}ms)", - consumer_id: state.consumer_id, - partition: state.partition, - message_count: length(persisted_messages), - duration_ms: duration_ms - ) - else - if persisted_messages != [] do - Logger.info("[SlotMessageStore] Loaded messages from disk", - consumer_id: state.consumer_id, - partition: state.partition, - message_count: length(persisted_messages), - duration_ms: duration_ms - ) - end - end - - # Now put the messages into state - {:ok, State.put_persisted_messages(state, persisted_messages)} - end + defp stream_messages_into_state(%State{} = state), do: {:ok, state} @decorate track_metrics("delete_messages_noop") defp delete_messages(%State{}, []), do: :ok