Skip to content

Commit 70339c7

Browse files
authored
fix: move DB setup to happen after Connect.init (#1533)
This change reduces the impact of slow DB setup impacting other tenants trying to connect at the same time that landed on the same partition
1 parent eeba306 commit 70339c7

File tree

8 files changed

+52
-109
lines changed

8 files changed

+52
-109
lines changed

lib/realtime/syn_handler.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ defmodule Realtime.SynHandler do
1010
@behaviour :syn_event_handler
1111

1212
@impl true
13-
def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do
13+
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
1414
# Update that a database connection is ready
15-
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn})
15+
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
1616
end
1717

1818
def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do
@@ -38,7 +38,7 @@ defmodule Realtime.SynHandler do
3838
end
3939

4040
topic = topic(mod)
41-
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
41+
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})
4242

4343
:ok
4444
end

lib/realtime/tenants/connect.ex

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Realtime.Tenants.Connect do
1919
alias Realtime.Tenants.Connect.GetTenant
2020
alias Realtime.Tenants.Connect.Piper
2121
alias Realtime.Tenants.Connect.RegisterProcess
22-
alias Realtime.Tenants.Connect.StartCounters
2322
alias Realtime.Tenants.Migrations
2423
alias Realtime.Tenants.ReplicationConnection
2524
alias Realtime.UsersCounter
@@ -83,14 +82,13 @@ defmodule Realtime.Tenants.Connect do
8382
| {:error, :tenant_database_connection_initializing}
8483
def get_status(tenant_id) do
8584
case :syn.lookup(__MODULE__, tenant_id) do
86-
{_pid, %{conn: nil}} ->
87-
wait_for_connection(tenant_id)
85+
{pid, %{conn: nil}} ->
86+
wait_for_connection(pid, tenant_id)
8887

8988
{_, %{conn: conn}} ->
9089
{:ok, conn}
9190

9291
:undefined ->
93-
Logger.warning("Connection process starting up")
9492
{:error, :tenant_database_connection_initializing}
9593

9694
error ->
@@ -101,7 +99,7 @@ defmodule Realtime.Tenants.Connect do
10199

102100
def syn_topic(tenant_id), do: "connect:#{tenant_id}"
103101

104-
defp wait_for_connection(tenant_id) do
102+
defp wait_for_connection(pid, tenant_id) do
105103
RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id))
106104

107105
# We do a lookup after subscribing because we could've missed a message while subscribing
@@ -112,9 +110,18 @@ defmodule Realtime.Tenants.Connect do
112110
_ ->
113111
# Wait for up to 5 seconds for the ready event
114112
receive do
115-
%{event: "ready", payload: %{conn: conn}} -> {:ok, conn}
113+
%{event: "ready", payload: %{pid: ^pid, conn: conn}} ->
114+
{:ok, conn}
115+
116+
%{event: "connect_down", payload: %{pid: ^pid, reason: {:shutdown, :tenant_db_too_many_connections}}} ->
117+
{:error, :tenant_db_too_many_connections}
118+
119+
%{event: "connect_down", payload: %{pid: ^pid, reason: _reason}} ->
120+
metadata = [external_id: tenant_id, project: tenant_id]
121+
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
122+
{:error, :tenant_database_unavailable}
116123
after
117-
5_000 -> {:error, :initializing}
124+
15_000 -> {:error, :initializing}
118125
end
119126
end
120127
after
@@ -139,16 +146,6 @@ defmodule Realtime.Tenants.Connect do
139146
{:error, {:already_started, _}} ->
140147
get_status(tenant_id)
141148

142-
{:error, {:shutdown, :tenant_db_too_many_connections}} ->
143-
{:error, :tenant_db_too_many_connections}
144-
145-
{:error, {:shutdown, :tenant_not_found}} ->
146-
{:error, :tenant_not_found}
147-
148-
{:error, :shutdown} ->
149-
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
150-
{:error, :tenant_database_unavailable}
151-
152149
{:error, error} ->
153150
log_error("UnableToConnectToTenantDatabase", error, metadata)
154151
{:error, :tenant_database_unavailable}
@@ -209,30 +206,33 @@ defmodule Realtime.Tenants.Connect do
209206
def init(%{tenant_id: tenant_id} = state) do
210207
Logger.metadata(external_id: tenant_id, project: tenant_id)
211208

209+
{:ok, state, {:continue, :db_connect}}
210+
end
211+
212+
@impl true
213+
def handle_continue(:db_connect, state) do
212214
pipes = [
213215
GetTenant,
214216
CheckConnection,
215-
StartCounters,
216217
RegisterProcess
217218
]
218219

219220
case Piper.run(pipes, state) do
220221
{:ok, acc} ->
221-
{:ok, acc, {:continue, :run_migrations}}
222+
{:noreply, acc, {:continue, :run_migrations}}
222223

223224
{:error, :tenant_not_found} ->
224-
{:stop, {:shutdown, :tenant_not_found}}
225+
{:stop, {:shutdown, :tenant_not_found}, state}
225226

226227
{:error, :tenant_db_too_many_connections} ->
227-
{:stop, {:shutdown, :tenant_db_too_many_connections}}
228+
{:stop, {:shutdown, :tenant_db_too_many_connections}, state}
228229

229230
{:error, error} ->
230231
log_error("UnableToConnectToTenantDatabase", error)
231-
{:stop, :shutdown}
232+
{:stop, :shutdown, state}
232233
end
233234
end
234235

235-
@impl true
236236
def handle_continue(:run_migrations, state) do
237237
%{tenant: tenant, db_conn_pid: db_conn_pid} = state
238238
Logger.warning("Tenant #{tenant.external_id} is initializing: #{inspect(node())}")
@@ -375,6 +375,7 @@ defmodule Realtime.Tenants.Connect do
375375

376376
## Private functions
377377
defp call_external_node(tenant_id, opts) do
378+
Logger.warning("Connection process starting up")
378379
rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default)
379380

380381
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),

lib/realtime/tenants/connect/check_connection.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
22
@moduledoc """
33
Check tenant database connection.
44
"""
5-
alias Realtime.Database
65

76
@behaviour Realtime.Tenants.Connect.Piper
87
@impl true
98
def run(acc) do
109
%{tenant: tenant} = acc
1110

12-
case Database.check_tenant_connection(tenant) do
11+
case Realtime.Database.check_tenant_connection(tenant) do
1312
{:ok, conn} ->
14-
Process.link(conn)
1513
db_conn_reference = Process.monitor(conn)
1614
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: db_conn_reference}}
1715

lib/realtime/tenants/connect/start_counters.ex

Lines changed: 0 additions & 60 deletions
This file was deleted.

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.48.0",
7+
version: "2.48.1",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -653,8 +653,8 @@ defmodule Realtime.Integration.RtChannelTest do
653653
:syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: nil} end)
654654
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
655655
WebsocketClient.send_event(service_role_socket, topic, "broadcast", payload)
656-
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
657-
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 6000
656+
# Waiting more than 15 seconds as this is the amount of time we will wait for the Connection to be ready
657+
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 16000
658658
end)
659659

660660
assert log =~ "UnableToHandleBroadcast"
@@ -831,7 +831,7 @@ defmodule Realtime.Integration.RtChannelTest do
831831

832832
refute_receive %Message{event: "presence_diff"}, 500
833833
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
834-
refute_receive %Message{event: "phx_leave", topic: ^topic}, 6000
834+
refute_receive %Message{event: "phx_leave", topic: ^topic}, 16000
835835
end)
836836

837837
assert log =~ "UnableToHandlePresence"

test/realtime/syn_handler_test.exs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,32 +168,40 @@ defmodule Realtime.SynHandlerTest do
168168

169169
test "it handles :syn_conflict_resolution reason" do
170170
reason = :syn_conflict_resolution
171+
pid = self()
171172

172173
log =
173174
capture_log(fn ->
174-
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
175+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
175176
end)
176177

177178
topic = "#{@topic}:#{@name}"
178179
event = "#{@topic}_down"
179180

180181
assert log =~ "#{@mod} terminated due to syn conflict resolution: #{inspect(@name)} #{inspect(self())}"
181-
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}
182+
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: %{reason: ^reason, pid: ^pid}}
182183
end
183184

184185
test "it handles other reasons" do
185186
reason = :other_reason
187+
pid = self()
186188

187189
log =
188190
capture_log(fn ->
189-
assert SynHandler.on_process_unregistered(@mod, @name, self(), %{}, reason) == :ok
191+
assert SynHandler.on_process_unregistered(@mod, @name, pid, %{}, reason) == :ok
190192
end)
191193

192194
topic = "#{@topic}:#{@name}"
193195
event = "#{@topic}_down"
194196

195197
refute log =~ "#{@mod} terminated: #{inspect(@name)} #{node()}"
196-
assert_receive %Phoenix.Socket.Broadcast{topic: ^topic, event: ^event, payload: nil}, 500
198+
199+
assert_receive %Phoenix.Socket.Broadcast{
200+
topic: ^topic,
201+
event: ^event,
202+
payload: %{reason: ^reason, pid: ^pid}
203+
},
204+
500
197205
end
198206
end
199207
end

test/realtime/tenants/connect_test.exs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,31 +78,27 @@ defmodule Realtime.Tenants.ConnectTest do
7878
assert_receive {:ok, ^pid}
7979
end
8080

81-
test "more than 5 seconds passed error out", %{tenant: tenant} do
81+
test "more than 15 seconds passed error out", %{tenant: tenant} do
8282
parent = self()
8383

8484
# Let's slow down Connect starting
8585
expect(Database, :check_tenant_connection, fn t ->
86-
:timer.sleep(5500)
86+
Process.sleep(15500)
8787
call_original(Database, :check_tenant_connection, [t])
8888
end)
8989

9090
connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
9191

92-
# Start an early connect
93-
spawn(connect)
94-
:timer.sleep(100)
95-
96-
# Start others
9792
spawn(connect)
9893
spawn(connect)
9994

100-
{:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(tenant.external_id)
95+
{:error, :initializing} = Connect.lookup_or_start_connection(tenant.external_id)
96+
# The above call waited 15 seconds
97+
assert_receive {:error, :initializing}
98+
assert_receive {:error, :initializing}
10199

102-
# Only one will succeed the others timed out waiting
103-
assert_receive {:error, :tenant_database_unavailable}
104-
assert_receive {:error, :tenant_database_unavailable}
105-
assert_receive {:ok, _pid}, 7000
100+
# This one will succeed
101+
{:ok, _pid} = Connect.lookup_or_start_connection(tenant.external_id)
106102
end
107103
end
108104

0 commit comments

Comments
 (0)