Skip to content

Commit b11fc3c

Browse files
committed
sse: proper keep-alives and refactor SSE loop state.
1 parent e6b6c3e commit b11fc3c

File tree

2 files changed

+90
-22
lines changed

2 files changed

+90
-22
lines changed

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule Electric.Shapes.Api do
77
alias __MODULE__
88
alias __MODULE__.Request
99
alias __MODULE__.Response
10+
alias __MODULE__.SseState
1011

1112
import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]
1213

@@ -25,6 +26,7 @@ defmodule Electric.Shapes.Api do
2526
required: true
2627
],
2728
allow_shape_deletion: [type: :boolean],
29+
keepalive_interval: [type: :integer],
2830
long_poll_timeout: [type: :integer],
2931
sse_timeout: [type: :integer],
3032
max_age: [type: :integer],
@@ -49,6 +51,7 @@ defmodule Electric.Shapes.Api do
4951
:stack_id,
5052
:storage,
5153
allow_shape_deletion: false,
54+
keepalive_interval: 21_000,
5255
long_poll_timeout: 20_000,
5356
sse_timeout: 60_000,
5457
max_age: 60,
@@ -562,26 +565,37 @@ defmodule Electric.Shapes.Api do
562565
%{
563566
new_changes_ref: ref,
564567
handle: shape_handle,
565-
api: %{sse_timeout: sse_timeout},
568+
api: %{keepalive_interval: keepalive_interval, sse_timeout: sse_timeout},
566569
params: %{offset: since_offset}
567570
} = request
568571

569572
Logger.debug(
570573
"Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"
571574
)
572575

576+
# Set up timer for SSE comment as keep-alive
577+
keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
578+
573579
# Set up timer for SSE timeout
574-
timer_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
580+
timeout_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
575581

576582
# Stream changes as SSE events for the duration of the timer.
577583
sse_event_stream =
578584
Stream.resource(
579585
fn ->
580-
{request, since_offset}
586+
%SseState{
587+
mode: :receive,
588+
request: request
589+
stream: nil,
590+
since_offset: since_offset
591+
last_message_time: System.monotonic_time(:millisecond),
592+
keepalive_ref: keepalive_ref
593+
}
581594
end,
582595
&next_sse_event/1,
583-
fn _ ->
584-
Process.cancel_timer(timer_ref)
596+
fn %SseState{keepalive_ref: latest_keepalive_ref} ->
597+
Process.cancel_timer(latest_keepalive_ref)
598+
Process.cancel_timer(timeout_ref)
585599
end
586600
)
587601

@@ -590,12 +604,18 @@ defmodule Electric.Shapes.Api do
590604
%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
591605
end
592606

593-
defp next_sse_event({%Request{} = request, since_offset}) do
607+
defp next_sse_event(%SseState{mode: :receive} = state) do
594608
%{
595-
api: api,
596-
handle: shape_handle,
597-
new_changes_ref: ref
598-
} = request
609+
request: %{
610+
api: %{
611+
keepalive_interval: keepalive_interval
612+
} = api,
613+
handle: shape_handle,
614+
new_changes_ref: ref
615+
} = request,
616+
since_offset: since_offset
617+
keepalive_ref: keepalive_ref
618+
} = state
599619

600620
receive do
601621
{^ref, :new_changes, latest_log_offset} ->
@@ -614,47 +634,73 @@ defmodule Electric.Shapes.Api do
614634
up_to: end_offset
615635
) do
616636
{:ok, log} ->
637+
Process.cancel_timer(keepalive_ref)
638+
617639
control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
618640
message_stream = Stream.concat(log, control_messages)
619641
encoded_stream = encode_log(updated_request, message_stream)
620642

621-
{[], {:emit, encoded_stream, updated_request, end_offset}}
643+
current_time = System.monotonic_time(:millisecond)
644+
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
645+
646+
{[], %{state |
647+
mode: :emit,
648+
stream: encoded_stream,
649+
since_offset: end_offset,
650+
last_message_time: current_time,
651+
keepalive_ref: new_keepalive_ref
652+
}}
622653

623654
{:error, _error} ->
624-
{[], {request, since_offset}}
655+
{[], state}
625656
end
626657

627658
{^ref, :shape_rotation} ->
628659
must_refetch = %{headers: %{control: "must-refetch"}}
629660
message = encode_message(api, must_refetch)
630661

631-
{message, :done}
662+
{message, %{state | mode: :done}}
663+
664+
{:sse_keepalive, ^ref} ->
665+
current_time = System.monotonic_time(:millisecond)
666+
time_since_last_message = current_time - last_message_time
667+
668+
if time_since_last_message >= keepalive_interval do
669+
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
670+
671+
{[": keep-alive\n\n"], %{state | last_message_time: current_time, keepalive_ref: new_keepalive_ref}}
672+
else
673+
# Not time to send a keep-alive yet, schedule for the remaining time
674+
remaining_time = keepalive_interval - time_since_last_message
675+
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, remaining_time)
676+
677+
{[], %{state | keepalive_ref: new_keepalive_ref}}
678+
end
632679

633680
{:sse_timeout, ^ref} ->
634-
{[], :done}
681+
{[], %{state | mode: :done}}
635682
end
636683
end
637684

638-
defp next_sse_event({:emit, stream, %Request{} = request, since_offset}) do
685+
defp next_sse_event(%SseState{mode: :emit} = state) do
686+
%{stream: stream} = state
687+
639688
# Can change the number taken to adjust the grouping. Currently three
640689
# because there's typically 3 elements per SSE -- the actual message
641690
# and the "data: " and "\n\n" delimiters around it.
642691
#
643692
# The JSON encoder groups stream elements by 500. So perhaps this
644693
# could be a larger number for more efficiency?
645694
case Electric.Utils.take_and_drop(stream, 3) do
646-
{[], []} ->
647-
{[], {request, since_offset}}
648-
649-
{head, []} ->
650-
{head, {request, since_offset}}
695+
{[], _tail} ->
696+
{[], %{state | mode: :receive, stream: nil}}
651697

652698
{head, tail} ->
653-
{head, {:emit, tail, request, since_offset}}
699+
{head, %{state | stream: tail}}
654700
end
655701
end
656702

657-
defp next_sse_event(:done), do: {:halt, :done}
703+
defp next_sse_event(%SseState{mode: :done}), do: {:halt, :done}
658704

659705
defp clean_up_change_listener(%Request{handle: shape_handle} = request)
660706
when not is_nil(shape_handle) do
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
defmodule Electric.Shapes.Api.SseState do
2+
alias Electric.Shapes.Api
3+
alias Electric.Replication.LogOffset
4+
5+
defstruct [
6+
:mode,
7+
:request,
8+
:stream,
9+
:since_offset,
10+
:last_message_time,
11+
:keepalive_ref
12+
]
13+
14+
@type t() :: %__MODULE__{
15+
mode: :receive | :emit | :done,
16+
request: Api.Request.t(),
17+
stream: Enumerable.t() | nil,
18+
since_offset: LogOffset.t(),
19+
last_message_time: pos_integer(),
20+
keepalive_ref: reference()
21+
}
22+
end

0 commit comments

Comments
 (0)