@@ -9,6 +9,7 @@ defmodule Electric.Shapes.Api do
99 alias __MODULE__
1010 alias __MODULE__.Request
1111 alias __MODULE__.Response
12+ alias __MODULE__.SseState
1213
1314 import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]
1415
@@ -27,7 +28,9 @@ defmodule Electric.Shapes.Api do
2728 required: true
2829 ],
2930 allow_shape_deletion: [type: :boolean],
31+ keepalive_interval: [type: :integer],
3032 long_poll_timeout: [type: :integer],
33+ sse_timeout: [type: :integer],
3134 max_age: [type: :integer],
3235 stack_ready_timeout: [type: :integer],
3336 stale_age: [type: :integer],
@@ -50,12 +53,15 @@ defmodule Electric.Shapes.Api do
5053 :stack_id,
5154 :storage,
5255 allow_shape_deletion: false,
56+ keepalive_interval: 21_000,
5357 long_poll_timeout: 20_000,
58+ sse_timeout: 60_000,
5459 max_age: 60,
5560 stack_ready_timeout: 5_000,
5661 stale_age: 300,
5762 send_cache_headers?: true,
5863 encoder: Electric.Shapes.Api.Encoder.JSON,
64+ sse_encoder: Electric.Shapes.Api.Encoder.SSE,
5965 configured: false
6066 ]
6167
@@ -320,8 +326,18 @@ defmodule Electric.Shapes.Api do
320326
321327 # TODO: discuss returning a 307 redirect rather than a 409, the client
322328 # will have to detect this and throw out old data
329+
330+ # In SSE mode we send the must refetch object as an event
331+ # instead of a singleton array containing that object
332+ must_refetch =
333+ if request.params.experimental_live_sse do
334+ hd(@must_refetch)
335+ else
336+ @must_refetch
337+ end
338+
323339 {:error,
324- Response.error(request, @ must_refetch,
340+ Response.error(request, must_refetch,
325341 handle: active_shape_handle,
326342 status: 409
327343 )}
@@ -499,7 +515,7 @@ defmodule Electric.Shapes.Api do
499515 if live? && Enum.take(log, 1) == [] do
500516 request
501517 |> update_attrs(%{ot_is_immediate_response: false})
502- |> hold_until_change ()
518+ |> handle_live_request ()
503519 else
504520 up_to_date_lsn =
505521 if live? do
@@ -512,9 +528,9 @@ defmodule Electric.Shapes.Api do
512528 max(global_last_seen_lsn, chunk_end_offset.tx_offset)
513529 end
514530
515- body = Stream.concat([ log, maybe_up_to_date(request, up_to_date_lsn)] )
531+ log_stream = Stream.concat(log, maybe_up_to_date(request, up_to_date_lsn))
516532
517- %{response | chunked: true, body: encode_log(request, body )}
533+ %{response | chunked: true, body: encode_log(request, log_stream )}
518534 end
519535
520536 {:error, %Api.Error{} = error} ->
@@ -563,6 +579,14 @@ defmodule Electric.Shapes.Api do
563579 end
564580 end
565581
582+ defp handle_live_request(%Request{params: %{experimental_live_sse: true}} = request) do
583+ stream_sse_events(request)
584+ end
585+
586+ defp handle_live_request(%Request{} = request) do
587+ hold_until_change(request)
588+ end
589+
566590 defp hold_until_change(%Request{} = request) do
567591 %{
568592 new_changes_ref: ref,
@@ -604,6 +628,155 @@ defmodule Electric.Shapes.Api do
604628 end
605629 end
606630
631+ defp stream_sse_events(%Request{} = request) do
632+ %{
633+ new_changes_ref: ref,
634+ handle: shape_handle,
635+ api: %{keepalive_interval: keepalive_interval, sse_timeout: sse_timeout},
636+ params: %{offset: since_offset}
637+ } = request
638+
639+ Logger.debug(
640+ "Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"
641+ )
642+
643+ # Set up timer for SSE comment as keep-alive
644+ keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
645+
646+ # Set up timer for SSE timeout
647+ timeout_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
648+
649+ # Stream changes as SSE events for the duration of the timer.
650+ sse_event_stream =
651+ Stream.resource(
652+ fn ->
653+ %SseState{
654+ mode: :receive,
655+ request: request,
656+ stream: nil,
657+ since_offset: since_offset,
658+ last_message_time: System.monotonic_time(:millisecond),
659+ keepalive_ref: keepalive_ref
660+ }
661+ end,
662+ &next_sse_event/1,
663+ fn %SseState{keepalive_ref: latest_keepalive_ref} ->
664+ Process.cancel_timer(latest_keepalive_ref)
665+ Process.cancel_timer(timeout_ref)
666+ end
667+ )
668+
669+ response = %{request.response | chunked: true, body: sse_event_stream}
670+
671+ %{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
672+ end
673+
674+ defp next_sse_event(%SseState{mode: :receive} = state) do
675+ %{
676+ keepalive_ref: keepalive_ref,
677+ last_message_time: last_message_time,
678+ request:
679+ %{
680+ api: %{
681+ keepalive_interval: keepalive_interval
682+ },
683+ handle: shape_handle,
684+ new_changes_ref: ref
685+ } = request,
686+ since_offset: since_offset
687+ } = state
688+
689+ receive do
690+ {^ref, :new_changes, latest_log_offset} ->
691+ updated_request =
692+ %{request | last_offset: latest_log_offset}
693+ |> determine_global_last_seen_lsn()
694+ |> determine_log_chunk_offset()
695+ |> determine_up_to_date()
696+
697+ # This is usually but not always the `latest_log_offset`
698+ # as per `determine_log_chunk_offset/1`.
699+ end_offset = updated_request.chunk_end_offset
700+
701+ case Shapes.get_merged_log_stream(updated_request.api, shape_handle,
702+ since: since_offset,
703+ up_to: end_offset
704+ ) do
705+ {:ok, log} ->
706+ Process.cancel_timer(keepalive_ref)
707+
708+ control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
709+ message_stream = Stream.concat(log, control_messages)
710+ encoded_stream = encode_log(updated_request, message_stream)
711+
712+ current_time = System.monotonic_time(:millisecond)
713+
714+ new_keepalive_ref =
715+ Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
716+
717+ {[],
718+ %{
719+ state
720+ | mode: :emit,
721+ stream: encoded_stream,
722+ since_offset: end_offset,
723+ last_message_time: current_time,
724+ keepalive_ref: new_keepalive_ref
725+ }}
726+
727+ {:error, _error} ->
728+ {[], state}
729+ end
730+
731+ {^ref, :shape_rotation} ->
732+ must_refetch = %{headers: %{control: "must-refetch"}}
733+ message = encode_message(request, must_refetch)
734+
735+ {message, %{state | mode: :done}}
736+
737+ {:sse_keepalive, ^ref} ->
738+ current_time = System.monotonic_time(:millisecond)
739+ time_since_last_message = current_time - last_message_time
740+
741+ if time_since_last_message >= keepalive_interval do
742+ new_keepalive_ref =
743+ Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
744+
745+ {[": keep-alive\n\n"],
746+ %{state | last_message_time: current_time, keepalive_ref: new_keepalive_ref}}
747+ else
748+ # Not time to send a keep-alive yet, schedule for the remaining time
749+ remaining_time = keepalive_interval - time_since_last_message
750+ new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, remaining_time)
751+
752+ {[], %{state | keepalive_ref: new_keepalive_ref}}
753+ end
754+
755+ {:sse_timeout, ^ref} ->
756+ {[], %{state | mode: :done}}
757+ end
758+ end
759+
760+ defp next_sse_event(%SseState{mode: :emit} = state) do
761+ %{stream: stream} = state
762+
763+ # Can change the number taken to adjust the grouping. Currently three
764+ # because there's typically 3 elements per SSE -- the actual message
765+ # and the "data: " and "\n\n" delimiters around it.
766+ #
767+ # The JSON encoder groups stream elements by 500. So perhaps this
768+ # could be a larger number for more efficiency?
769+ case StreamSplit.take_and_drop(stream, 3) do
770+ {[], _tail} ->
771+ {[], %{state | mode: :receive, stream: nil}}
772+
773+ {head, tail} ->
774+ {head, %{state | stream: tail}}
775+ end
776+ end
777+
778+ defp next_sse_event(%SseState{mode: :done} = state), do: {:halt, state}
779+
607780 defp no_change_response(%Request{} = request) do
608781 %{response: response, global_last_seen_lsn: global_last_seen_lsn} =
609782 update_attrs(request, %{ot_is_empty_response: true})
@@ -647,16 +820,35 @@ defmodule Electric.Shapes.Api do
647820 def stack_id(%Api{stack_id: stack_id}), do: stack_id
648821 def stack_id(%{api: %{stack_id: stack_id}}), do: stack_id
649822
823+ defp encode_log(%Request{api: api, params: %{live: true, experimental_live_sse: true}}, stream) do
824+ encode_sse(api, :log, stream)
825+ end
826+
650827 defp encode_log(%Request{api: api}, stream) do
651828 encode(api, :log, stream)
652829 end
653830
654- @spec encode_message(Api.t() | Request.t(), term()) :: Enum.t()
655- def encode_message(%Request{api: api}, message) do
831+ # Error messages are encoded normally, even when using SSE
832+ # because they are returned on the original fetch request
833+ # with a status code that is not 2xx.
834+ @spec encode_error_message(Api.t() | Request.t(), term()) :: Enum.t()
835+ def encode_error_message(%Api{} = api, message) do
836+ encode(api, :message, message)
837+ end
838+
839+ def encode_error_message(%Request{api: api}, message) do
656840 encode(api, :message, message)
657841 end
658842
659- def encode_message(%Api{} = api, message) do
843+ @spec encode_message(Request.t(), term()) :: Enum.t()
844+ def encode_message(
845+ %Request{api: api, params: %{live: true, experimental_live_sse: true}},
846+ message
847+ ) do
848+ encode_sse(api, :message, message)
849+ end
850+
851+ def encode_message(%Request{api: api}, message) do
660852 encode(api, :message, message)
661853 end
662854
@@ -665,6 +857,10 @@ defmodule Electric.Shapes.Api do
665857 apply(encoder, type, [message])
666858 end
667859
860+ defp encode_sse(%Api{sse_encoder: sse_encoder}, type, message) when type in [:message, :log] do
861+ apply(sse_encoder, type, [message])
862+ end
863+
668864 def schema(%Response{
669865 api: %Api{inspector: inspector},
670866 shape_definition: %Shapes.Shape{} = shape
0 commit comments