diff --git a/lib/plausible/ingestion/write_buffer.ex b/lib/plausible/ingestion/write_buffer.ex index 8fc6b802f8c9..abb9bf7bf02a 100644 --- a/lib/plausible/ingestion/write_buffer.ex +++ b/lib/plausible/ingestion/write_buffer.ex @@ -66,6 +66,13 @@ defmodule Plausible.Ingestion.WriteBuffer do {:noreply, %{state | buffer: [], buffer_size: 0, timer: timer}} end + def handle_info({:EXIT, _from, _reason}, state) do + # We trap exits so terminate/2 can flush on shutdown. Linked ports and + # processes (e.g. the ClickHouse connection socket) deliver :EXIT messages + # here on disconnect, which we ignore to keep the buffer alive. + {:noreply, state} + end + @impl true def handle_call(:flush, _from, state) do %{timer: timer, flush_interval_ms: flush_interval_ms} = state diff --git a/test/plausible/ingestion/write_buffer_test.exs b/test/plausible/ingestion/write_buffer_test.exs new file mode 100644 index 000000000000..37362dcf83f3 --- /dev/null +++ b/test/plausible/ingestion/write_buffer_test.exs @@ -0,0 +1,25 @@ +defmodule Plausible.Ingestion.WriteBufferTest do + use ExUnit.Case, async: true + + alias Plausible.Ingestion.WriteBuffer + + @opts WriteBuffer.compile_time_prepare(Plausible.ClickhouseEventV2) + + test "survives :EXIT messages from linked processes while trapping exits" do + opts = + @opts + |> Map.take([:header, :insert_sql, :insert_opts]) + |> Map.to_list() + |> Keyword.put(:name, :"write_buffer_exit_#{System.unique_integer([:positive])}") + # large buffer so the empty buffer never flushes to ClickHouse during the test + |> Keyword.put(:flush_interval_ms, 60_000) + + {:ok, pid} = WriteBuffer.start_link(opts) + + send(pid, {:EXIT, self(), :normal}) + + # process is still alive and responsive after the :EXIT message + assert Process.alive?(pid) + assert WriteBuffer.flush(opts[:name]) == :ok + end +end