Skip to content

Commit 0290beb

Browse files
committed
fixup! fix: ensure Watchdog kills ReplicationConnection on failing healthcheck
1 parent 00bff54 commit 0290beb

File tree

3 files changed

+11
-5
lines changed

3 files changed

+11
-5
lines changed

lib/realtime/tenants/replication_connection.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
7373
@doc """
7474
Starts the replication connection for a tenant and monitors a given pid to stop the ReplicationConnection.
7575
"""
76-
@spec start(Realtime.Api.Tenant.t(), pid()) :: {:ok, pid()} | {:error, any()}
76+
@spec start(Realtime.Api.Tenant.t(), pid(), query_timeout :: timeout) :: {:ok, pid()} | {:error, any()}
7777
def start(tenant, monitored_pid, query_timeout \\ @default_query_timeout) do
7878
Logger.info("Starting replication for Broadcast Changes")
7979
opts = %__MODULE__{tenant_id: tenant.external_id, monitored_pid: monitored_pid, query_timeout: query_timeout}
@@ -242,7 +242,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
242242
end)
243243

244244
if valid_tables and rows != [] do
245-
{:query, "SELECT 1", %{state | step: :start_replication_slot}}
245+
{:query, "SELECT 1", [timeout: state.query_timeout], %{state | step: :start_replication_slot}}
246246
else
247247
query =
248248
"DROP PUBLICATION IF EXISTS #{publication_name}; CREATE PUBLICATION #{publication_name} FOR TABLE #{@schema}.#{@table}"

lib/realtime/tenants/replication_connection/watchdog.ex

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Realtime.Tenants.ReplicationConnection.Watchdog do
22
@moduledoc """
33
Monitors ReplicationConnection health by performing periodic call checks.
4-
If the call times out, logs an error and shuts down, which cascades to ReplicationConnection.
4+
If the call times out, logs an error and terminates the ReplicationConnection process to trigger a restart.
55
"""
66
use GenServer
77
use Realtime.Logs
@@ -60,7 +60,13 @@ defmodule Realtime.Tenants.ReplicationConnection.Watchdog do
6060
:exit, {:timeout, _} ->
6161
log_error("ReplicationConnectionWatchdogTimeout", "ReplicationConnection is not responding")
6262

63-
ReplicationConnection.stop(state.tenant_id, state.parent_pid)
63+
case ReplicationConnection.stop(state.tenant_id, state.parent_pid) do
64+
:ok ->
65+
Logger.info("ReplicationConnection stopped successfully")
66+
67+
{:error, reason} ->
68+
log_error(ReplicationConnectionWatchdogTimeout, "Failed to stop ReplicationConnection: #{inspect(reason)}")
69+
end
6470

6571
{:stop, :watchdog_timeout, state}
6672
end

test/realtime/tenants/replication_connection_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
6666
# Let's make it not reply to health checks
6767
:sys.suspend(pid)
6868

69-
reason = assert_process_down(pid, 200)
69+
reason = assert_process_down(pid, 400)
7070
assert reason == :shutdown
7171
end)
7272

0 commit comments

Comments
 (0)