Skip to content

Commit 14ce221

Browse files
authored
fix: Handle in-flight requests during stack shutdown more gracefully (#3310)
Closes #3300 @balegas this error was a symptom of what we have discussed about races during shutdown - we have requests in-flight during the shutdown that will start erroring as resources are no longer present. I have looked into the logs of the occurrences and have pinpointed two causes and added handlers for them: - After a live request timeout, we check the global last processed LSN to attach to an up to date message - but if during that 20 second wait the stack goes down (and is still down by the end of the 20 seconds), it fails either with table not present or no value for the LSN - I had originally fixed this by doing another `hold_until_stack_ready` after a live timeout, but @alco had removed that in his PR that moved the shape status out of the connection subsystem - however this is still an issue in how we do multitenancy/cloud so I'm putting it back in without a timeout (no longer wakes connections either) - Added a test to cover it as the test that was there was not actually doing the right thing - An in-flight request might reach `await_snapshot_start`, which if it finds a shape but not shape process, it enters a retry loop as it expects the process to exist shortly - which is not the case if the stack is shutting down and eventually errors with tables not found - I am now catching argument errors/tables not existing and returning a "prettier" error that will be turned into a 500 but without a Sentry error - ultimately this is an issue we have to deal with for what to do with in-flight requests
1 parent d86d7ac commit 14ce221

File tree

8 files changed

+143
-41
lines changed

8 files changed

+143
-41
lines changed

.changeset/metal-peaches-change.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Handle requests during stack shutdown more gracefully

packages/sync-service/lib/electric/lsn_tracker.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ defmodule Electric.LsnTracker do
1919
stack_id
2020
|> table()
2121
|> :ets.insert({:last_processed_lsn, lsn})
22+
23+
:ok
2224
end
2325

2426
def set_last_processed_lsn(lsn, stack_id) when is_integer(lsn) do

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,10 +161,11 @@ defmodule Electric.ShapeCache do
161161
{:error, :unknown}
162162

163163
true ->
164-
server = Electric.Shapes.Consumer.name(stack_id, shape_handle)
165-
166164
try do
167-
GenServer.call(server, :await_snapshot_start, 15_000)
165+
Electric.Shapes.Consumer.await_snapshot_start(
166+
%{stack_id: stack_id, shape_handle: shape_handle},
167+
15_000
168+
)
168169
catch
169170
:exit, {:timeout, {GenServer, :call, _}} ->
170171
# Please note that :await_snapshot_start can also return a timeout error as well
@@ -181,6 +182,9 @@ defmodule Electric.ShapeCache do
181182
await_snapshot_start(shape_handle, opts)
182183
end
183184
end
185+
rescue
186+
ArgumentError ->
187+
{:error, %RuntimeError{message: "Shape meta tables not found"}}
184188
end
185189

186190
@impl Electric.ShapeCacheBehaviour

packages/sync-service/lib/electric/shape_cache/shape_status.ex

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ defmodule Electric.ShapeCache.ShapeStatusBehaviour do
2020
@type stack_ref() :: atom() | stack_id() | [stack_id: stack_id()]
2121
end
2222

23-
@callback initialize_from_storage(stack_ref(), Storage.t()) :: :ok | {:error, term()}
23+
@callback initialize_from_storage(stack_ref(), Electric.ShapeCache.Storage.storage()) ::
24+
:ok | {:error, term()}
2425
@callback terminate(stack_ref(), String.t()) :: :ok | {:error, term()}
2526
@callback list_shapes(stack_ref()) :: [{shape_handle(), Shape.t()}]
2627
@callback count_shapes(stack_ref()) :: non_neg_integer()
@@ -521,10 +522,7 @@ defmodule Electric.ShapeCache.ShapeStatus do
521522
end
522523

523524
defp backup_file_path(backup_dir) do
524-
case backup_dir do
525-
nil -> nil
526-
dir -> dir |> Path.join(@backup_file) |> String.to_charlist()
527-
end
525+
backup_dir |> Path.join(@backup_file) |> String.to_charlist()
528526
end
529527

530528
def backup_dir(storage) do

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ defmodule Electric.Shapes.Api do
366366

367367
defp hold_until_stack_ready(%Api{} = api, opts \\ []) do
368368
stack_id = stack_id(api)
369-
opts = [timeout: api.stack_ready_timeout] ++ opts
369+
opts = Keyword.put_new(opts, :timeout, api.stack_ready_timeout)
370370

371371
case Electric.StatusMonitor.wait_until_active(stack_id, opts) do
372372
:ok ->
@@ -384,7 +384,7 @@ defmodule Electric.Shapes.Api do
384384
hold_until_stack_ready(api, block_on_conn_sleeping: true)
385385

386386
{:error, message} ->
387-
Logger.warning("Stack not ready after #{api.stack_ready_timeout}ms. Reason: #{message}")
387+
Logger.warning("Stack not ready after #{opts[:timeout]}ms. Reason: #{message}")
388388
{:error, Response.error(api, message, status: 503)}
389389
end
390390
end
@@ -746,7 +746,7 @@ defmodule Electric.Shapes.Api do
746746
%{
747747
new_changes_ref: ref,
748748
handle: shape_handle,
749-
api: %{long_poll_timeout: long_poll_timeout}
749+
api: %{long_poll_timeout: long_poll_timeout} = api
750750
} = request
751751

752752
Logger.debug("Client #{inspect(self())} is waiting for changes to #{shape_handle}")
@@ -772,12 +772,21 @@ defmodule Electric.Shapes.Api do
772772
error = Api.Error.must_refetch()
773773
Response.error(request, error.message, status: error.status)
774774
after
775-
# If we timeout, return an up-to-date message
775+
# If we timeout, check that the stack is still up and
776+
# return an up-to-date message
776777
long_poll_timeout ->
777-
request
778-
|> update_attrs(%{ot_is_long_poll_timeout: true})
779-
|> determine_global_last_seen_lsn()
780-
|> no_change_response()
778+
request = update_attrs(request, %{ot_is_long_poll_timeout: true})
779+
780+
case Electric.StatusMonitor.status(api.stack_id) do
781+
%{shape: :up} ->
782+
request
783+
|> determine_global_last_seen_lsn()
784+
|> no_change_response()
785+
786+
_ ->
787+
message = Electric.StatusMonitor.timeout_message(api.stack_id)
788+
Response.error(request, message, status: 503)
789+
end
781790
end
782791
end
783792

@@ -821,7 +830,7 @@ defmodule Electric.Shapes.Api do
821830

822831
response = %{request.response | chunked: true, body: sse_event_stream}
823832

824-
%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
833+
%{response | trace_attrs: Map.put(response.trace_attrs, :ot_is_sse_response, true)}
825834
end
826835

827836
defp next_sse_event(%SseState{mode: :receive} = state) do

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@ defmodule Electric.Shapes.Consumer do
3434
GenServer.call(consumer, :initial_state, 30_000)
3535
end
3636

37-
def await_snapshot_start(consumer) when is_pid(consumer) do
38-
GenServer.call(consumer, :await_snapshot_start, 30_000)
37+
@spec await_snapshot_start(pid() | map()) :: :started | {:error, any()}
38+
@spec await_snapshot_start(pid() | map(), timeout()) :: :started | {:error, any()}
39+
def await_snapshot_start(consumer, timeout \\ 30_000)
40+
41+
def await_snapshot_start(consumer, timeout) when is_pid(consumer) do
42+
GenServer.call(consumer, :await_snapshot_start, timeout)
3943
end
4044

41-
def await_snapshot_start(consumer) do
42-
GenServer.call(name(consumer), :await_snapshot_start, 30_000)
45+
def await_snapshot_start(consumer, timeout) do
46+
GenServer.call(name(consumer), :await_snapshot_start, timeout)
4347
end
4448

4549
def subscribe_materializer(consumer) do

packages/sync-service/test/electric/shape_cache_test.exs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,76 @@ defmodule Electric.ShapeCacheTest do
822822
"Shape terminated before snapshot was ready"
823823
]
824824
end
825+
826+
test "should wait for consumer to come up", ctx do
827+
Support.TestUtils.patch_snapshotter(fn parent, shape_handle, _, _ ->
828+
GenServer.cast(parent, {:pg_snapshot_known, shape_handle, @pg_snapshot_xmin_100})
829+
GenServer.cast(parent, {:snapshot_started, shape_handle})
830+
end)
831+
832+
%{shape_cache_opts: opts} = with_shape_cache(ctx)
833+
834+
start_consumer_delay = 500
835+
836+
test_pid = self()
837+
838+
Repatch.patch(
839+
Electric.Shapes.DynamicConsumerSupervisor,
840+
:start_shape_consumer,
841+
[mode: :shared],
842+
fn a, b ->
843+
send(test_pid, :about_to_start_consumer)
844+
845+
Process.sleep(start_consumer_delay)
846+
Repatch.real(Electric.Shapes.DynamicConsumerSupervisor.start_shape_consumer(a, b))
847+
end
848+
)
849+
850+
Repatch.allow(self(), opts[:server])
851+
852+
creation_task = Task.async(fn -> ShapeCache.get_or_create_shape_handle(@shape, opts) end)
853+
854+
{shape_handle, _} =
855+
receive do
856+
:about_to_start_consumer -> ShapeCache.get_or_create_shape_handle(@shape, opts)
857+
end
858+
859+
wait_task = Task.async(fn -> ShapeCache.await_snapshot_start(shape_handle, opts) end)
860+
861+
# should delay in responding
862+
refute Task.yield(wait_task, 10)
863+
Task.await(creation_task)
864+
assert :started = Task.await(wait_task, start_consumer_delay)
865+
end
866+
867+
test "should stop waiting for consumer to come up if shape tables missing", ctx do
868+
Support.TestUtils.patch_snapshotter(fn _, _, _, _ -> nil end)
869+
%{shape_cache_opts: opts} = with_shape_cache(ctx)
870+
871+
Repatch.patch(
872+
Electric.Shapes.DynamicConsumerSupervisor,
873+
:start_shape_consumer,
874+
[mode: :shared],
875+
fn _, _ -> Process.sleep(:infinity) end
876+
)
877+
878+
Repatch.allow(self(), opts[:server])
879+
880+
start_supervised({Task, fn -> ShapeCache.get_or_create_shape_handle(@shape, opts) end})
881+
882+
Process.sleep(10)
883+
884+
{shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
885+
886+
wait_task = Task.async(fn -> ShapeCache.await_snapshot_start(shape_handle, opts) end)
887+
888+
# should delay in responding
889+
refute Task.yield(wait_task, 10)
890+
stop_supervised(ctx[:shape_status_owner])
891+
892+
assert {:error, %RuntimeError{message: "Shape meta tables not found"}} =
893+
Task.await(wait_task, 500)
894+
end
825895
end
826896

827897
describe "after restart" do

packages/sync-service/test/electric/shapes/api_test.exs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
defmodule Electric.Shapes.ApiTest do
22
use ExUnit.Case, async: true
3+
use Repatch.ExUnit
34
use Support.Mock
45

56
alias Electric.Postgres.Lsn
@@ -1194,29 +1195,38 @@ defmodule Electric.Shapes.ApiTest do
11941195
[]
11951196
end)
11961197

1197-
lsn = Lsn.from_integer(:rand.uniform(1_000_000))
1198-
Electric.LsnTracker.set_last_processed_lsn(lsn, ctx.stack_id)
1198+
stack_id = ctx.stack_id
11991199

1200-
assert {:ok, request} =
1201-
Api.validate(
1202-
ctx.api,
1203-
%{
1204-
table: "public.users",
1205-
offset: "#{@test_offset}",
1206-
handle: @test_shape_handle,
1207-
live: true
1208-
}
1209-
)
1200+
status_task =
1201+
start_supervised!({
1202+
Task,
1203+
fn ->
1204+
set_status_to_active(ctx)
1205+
Process.sleep(:infinity)
1206+
end
1207+
})
12101208

1211-
response = Api.serve_shape_response(request)
1212-
assert response.status == 200
1209+
req_task =
1210+
Task.async(fn ->
1211+
assert {:ok, request} =
1212+
Api.validate(
1213+
ctx.api,
1214+
%{
1215+
table: "public.users",
1216+
offset: "#{@test_offset}",
1217+
handle: @test_shape_handle,
1218+
live: true
1219+
}
1220+
)
12131221

1214-
assert [
1215-
%{
1216-
headers: %{control: "up-to-date", global_last_seen_lsn: "#{Lsn.to_integer(lsn)}"}
1217-
}
1218-
] ==
1219-
response_body(response)
1222+
Process.exit(status_task, :kill)
1223+
Electric.StatusMonitor.wait_for_messages_to_be_processed(stack_id)
1224+
Process.sleep(50)
1225+
1226+
Api.serve_shape_response(request)
1227+
end)
1228+
1229+
assert %{status: 503} = Task.await(req_task)
12201230
end
12211231
end
12221232

0 commit comments

Comments
 (0)