@@ -9,6 +9,7 @@ defmodule Electric.Shapes.Api do
99 alias __MODULE__
1010 alias __MODULE__ . Request
1111 alias __MODULE__ . Response
12+ alias __MODULE__ . SseState
1213
1314 import Electric.Replication.LogOffset , only: [ is_log_offset_lt: 2 ]
1415
@@ -27,7 +28,9 @@ defmodule Electric.Shapes.Api do
2728 required: true
2829 ] ,
2930 allow_shape_deletion: [ type: :boolean ] ,
31+ keepalive_interval: [ type: :integer ] ,
3032 long_poll_timeout: [ type: :integer ] ,
33+ sse_timeout: [ type: :integer ] ,
3134 max_age: [ type: :integer ] ,
3235 stack_ready_timeout: [ type: :integer ] ,
3336 stale_age: [ type: :integer ] ,
@@ -50,12 +53,15 @@ defmodule Electric.Shapes.Api do
5053 :stack_id ,
5154 :storage ,
5255 allow_shape_deletion: false ,
56+ keepalive_interval: 21_000 ,
5357 long_poll_timeout: 20_000 ,
58+ sse_timeout: 60_000 ,
5459 max_age: 60 ,
5560 stack_ready_timeout: 5_000 ,
5661 stale_age: 300 ,
5762 send_cache_headers?: true ,
5863 encoder: Electric.Shapes.Api.Encoder.JSON ,
64+ sse_encoder: Electric.Shapes.Api.Encoder.SSE ,
5965 configured: false
6066 ]
6167
@@ -65,7 +71,6 @@ defmodule Electric.Shapes.Api do
6571 # Aliasing for pattern matching
6672 @ before_all_offset LogOffset . before_all ( )
6773 @ offset_out_of_bounds % { offset: [ "out of bounds for this shape" ] }
68- @ must_refetch [ % { headers: % { control: "must-refetch" } } ]
6974
7075 # Need to implement Access behaviour because we use that to extract config
7176 # when using shapes api
@@ -320,10 +325,14 @@ defmodule Electric.Shapes.Api do
320325
321326 # TODO: discuss returning a 307 redirect rather than a 409, the client
322327 # will have to detect this and throw out old data
328+
329+ % { params: % { experimental_live_sse: in_sse? } } = request
330+ error = Api.Error . must_refetch ( experimental_live_sse: in_sse? )
331+
323332 { :error ,
324- Response . error ( request , @ must_refetch ,
333+ Response . error ( request , error . message ,
325334 handle: active_shape_handle ,
326- status: 409
335+ status: error . status
327336 ) }
328337 end
329338
@@ -489,17 +498,21 @@ defmodule Electric.Shapes.Api do
489498 handle: shape_handle ,
490499 chunk_end_offset: chunk_end_offset ,
491500 global_last_seen_lsn: global_last_seen_lsn ,
492- params: % { offset: offset , live: live? } ,
501+ params: % { offset: offset , live: live? , experimental_live_sse: in_sse? } ,
493502 api: api ,
494503 response: response
495504 } = request
496505
497- case Shapes . get_merged_log_stream ( api , shape_handle , since: offset , up_to: chunk_end_offset ) do
506+ case Shapes . get_merged_log_stream ( api , shape_handle ,
507+ since: offset ,
508+ up_to: chunk_end_offset ,
509+ experimental_live_sse: in_sse?
510+ ) do
498511 { :ok , log } ->
499512 if live? && Enum . take ( log , 1 ) == [ ] do
500513 request
501514 |> update_attrs ( % { ot_is_immediate_response: false } )
502- |> hold_until_change ( )
515+ |> handle_live_request ( )
503516 else
504517 up_to_date_lsn =
505518 if live? do
@@ -512,9 +525,9 @@ defmodule Electric.Shapes.Api do
512525 max ( global_last_seen_lsn , chunk_end_offset . tx_offset )
513526 end
514527
515- body = Stream . concat ( [ log , maybe_up_to_date ( request , up_to_date_lsn ) ] )
528+ log_stream = Stream . concat ( log , maybe_up_to_date ( request , up_to_date_lsn ) )
516529
517- % { response | chunked: true , body: encode_log ( request , body ) }
530+ % { response | chunked: true , body: encode_log ( request , log_stream ) }
518531 end
519532
520533 { :error , % Api.Error { } = error } ->
@@ -523,10 +536,11 @@ defmodule Electric.Shapes.Api do
523536 { :error , :unknown } ->
524537 # the shape has been deleted between the request validation and the attempt
525538 # to return the log stream
526- Response . error ( request , @ must_refetch , status: 409 )
539+ error = Api.Error . must_refetch ( experimental_live_sse: in_sse? )
540+ Response . error ( request , error . message , status: error . status )
527541
528542 { :error , % SnapshotError { type: :schema_changed } } ->
529- error = Api.Error . must_refetch ( )
543+ error = Api.Error . must_refetch ( experimental_live_sse: in_sse? )
530544 Logger . warning ( "Schema changed while creating snapshot for #{ shape_handle } " )
531545 Response . error ( request , error . message , status: error . status )
532546
@@ -563,12 +577,20 @@ defmodule Electric.Shapes.Api do
563577 end
564578 end
565579
580+ defp handle_live_request ( % Request { params: % { experimental_live_sse: true } } = request ) do
581+ stream_sse_events ( request )
582+ end
583+
584+ defp handle_live_request ( % Request { } = request ) do
585+ hold_until_change ( request )
586+ end
587+
566588 defp hold_until_change ( % Request { } = request ) do
567589 % {
568590 new_changes_ref: ref ,
569591 last_offset: last_offset ,
570592 handle: shape_handle ,
571- params: % { shape_definition: shape_def } ,
593+ params: % { shape_definition: shape_def , experimental_live_sse: in_sse? } ,
572594 api: % { long_poll_timeout: long_poll_timeout } = api
573595 } = request
574596
@@ -603,13 +625,16 @@ defmodule Electric.Shapes.Api do
603625 |> do_serve_shape_log ( )
604626
605627 { ^ ref , :shape_rotation , new_handle } ->
606- Response . error ( request , @ must_refetch ,
628+ error = Api.Error . must_refetch ( experimental_live_sse: in_sse? )
629+
630+ Response . error ( request , error . message ,
607631 handle: new_handle ,
608- status: 409
632+ status: error . status
609633 )
610634
611635 { ^ ref , :shape_rotation } ->
612- Response . error ( request , @ must_refetch , status: 409 )
636+ error = Api.Error . must_refetch ( experimental_live_sse: in_sse? )
637+ Response . error ( request , error . message , status: error . status )
613638 after
614639 # If we timeout, return an up-to-date message
615640 long_poll_timeout ->
@@ -628,6 +653,160 @@ defmodule Electric.Shapes.Api do
628653 end
629654 end
630655
656+ defp stream_sse_events ( % Request { } = request ) do
657+ % {
658+ new_changes_ref: ref ,
659+ handle: shape_handle ,
660+ api: % { keepalive_interval: keepalive_interval , sse_timeout: sse_timeout } ,
661+ params: % { offset: since_offset }
662+ } = request
663+
664+ Logger . debug (
665+ "Client #{ inspect ( self ( ) ) } is streaming SSE for changes to #{ shape_handle } since #{ inspect ( since_offset ) } "
666+ )
667+
668+ # Set up timer for SSE comment as keep-alive
669+ keepalive_ref = Process . send_after ( self ( ) , { :sse_keepalive , ref } , keepalive_interval )
670+
671+ # Set up timer for SSE timeout
672+ timeout_ref = Process . send_after ( self ( ) , { :sse_timeout , ref } , sse_timeout )
673+
674+ # Stream changes as SSE events for the duration of the timer.
675+ sse_event_stream =
676+ Stream . resource (
677+ fn ->
678+ % SseState {
679+ mode: :receive ,
680+ request: request ,
681+ stream: nil ,
682+ since_offset: since_offset ,
683+ last_message_time: System . monotonic_time ( :millisecond ) ,
684+ keepalive_ref: keepalive_ref
685+ }
686+ end ,
687+ & next_sse_event / 1 ,
688+ fn % SseState { keepalive_ref: latest_keepalive_ref } ->
689+ Process . cancel_timer ( latest_keepalive_ref )
690+ Process . cancel_timer ( timeout_ref )
691+ end
692+ )
693+
694+ response = % { request . response | chunked: true , body: sse_event_stream }
695+
696+ % { response | trace_attrs: Map . put ( response . trace_attrs || % { } , :ot_is_sse_response , true ) }
697+ end
698+
699+ defp next_sse_event ( % SseState { mode: :receive } = state ) do
700+ % {
701+ keepalive_ref: keepalive_ref ,
702+ last_message_time: last_message_time ,
703+ request:
704+ % {
705+ api: % {
706+ keepalive_interval: keepalive_interval
707+ } ,
708+ handle: shape_handle ,
709+ new_changes_ref: ref
710+ } = request ,
711+ since_offset: since_offset
712+ } = state
713+
714+ receive do
715+ { ^ ref , :new_changes , latest_log_offset } ->
716+ updated_request =
717+ % { request | last_offset: latest_log_offset }
718+ |> determine_global_last_seen_lsn ( )
719+ |> determine_log_chunk_offset ( )
720+ |> determine_up_to_date ( )
721+
722+ # This is usually but not always the `latest_log_offset`
723+ # as per `determine_log_chunk_offset/1`.
724+ end_offset = updated_request . chunk_end_offset
725+
726+ in_sse? = true
727+
728+ case Shapes . get_merged_log_stream (
729+ updated_request . api ,
730+ shape_handle ,
731+ since: since_offset ,
732+ up_to: end_offset ,
733+ experimental_live_sse: in_sse?
734+ ) do
735+ { :ok , log } ->
736+ Process . cancel_timer ( keepalive_ref )
737+
738+ control_messages = maybe_up_to_date ( updated_request , end_offset . tx_offset )
739+ message_stream = Stream . concat ( log , control_messages )
740+ encoded_stream = encode_log ( updated_request , message_stream )
741+
742+ current_time = System . monotonic_time ( :millisecond )
743+
744+ new_keepalive_ref =
745+ Process . send_after ( self ( ) , { :sse_keepalive , ref } , keepalive_interval )
746+
747+ { [ ] ,
748+ % {
749+ state
750+ | mode: :emit ,
751+ stream: encoded_stream ,
752+ since_offset: end_offset ,
753+ last_message_time: current_time ,
754+ keepalive_ref: new_keepalive_ref
755+ } }
756+
757+ { :error , _error } ->
758+ { [ ] , state }
759+ end
760+
761+ { ^ ref , :shape_rotation } ->
762+ must_refetch = % { headers: % { control: "must-refetch" } }
763+ message = encode_message ( request , must_refetch )
764+
765+ { message , % { state | mode: :done } }
766+
767+ { :sse_keepalive , ^ ref } ->
768+ current_time = System . monotonic_time ( :millisecond )
769+ time_since_last_message = current_time - last_message_time
770+
771+ if time_since_last_message >= keepalive_interval do
772+ new_keepalive_ref =
773+ Process . send_after ( self ( ) , { :sse_keepalive , ref } , keepalive_interval )
774+
775+ { [ ": keep-alive\n \n " ] ,
776+ % { state | last_message_time: current_time , keepalive_ref: new_keepalive_ref } }
777+ else
778+ # Not time to send a keep-alive yet, schedule for the remaining time
779+ remaining_time = keepalive_interval - time_since_last_message
780+ new_keepalive_ref = Process . send_after ( self ( ) , { :sse_keepalive , ref } , remaining_time )
781+
782+ { [ ] , % { state | keepalive_ref: new_keepalive_ref } }
783+ end
784+
785+ { :sse_timeout , ^ ref } ->
786+ { [ ] , % { state | mode: :done } }
787+ end
788+ end
789+
790+ defp next_sse_event ( % SseState { mode: :emit } = state ) do
791+ % { stream: stream } = state
792+
793+ # Can change the number taken to adjust the grouping. Currently three
794+ # because there's typically 3 elements per SSE -- the actual message
795+ # and the "data: " and "\n\n" delimiters around it.
796+ #
797+ # The JSON encoder groups stream elements by 500. So perhaps this
798+ # could be a larger number for more efficiency?
799+ case StreamSplit . take_and_drop ( stream , 3 ) do
800+ { [ ] , _tail } ->
801+ { [ ] , % { state | mode: :receive , stream: nil } }
802+
803+ { head , tail } ->
804+ { head , % { state | stream: tail } }
805+ end
806+ end
807+
808+ defp next_sse_event ( % SseState { mode: :done } = state ) , do: { :halt , state }
809+
631810 defp no_change_response ( % Request { } = request ) do
632811 % { response: response , global_last_seen_lsn: global_last_seen_lsn } =
633812 update_attrs ( request , % { ot_is_empty_response: true } )
@@ -671,16 +850,35 @@ defmodule Electric.Shapes.Api do
671850 def stack_id ( % Api { stack_id: stack_id } ) , do: stack_id
672851 def stack_id ( % { api: % { stack_id: stack_id } } ) , do: stack_id
673852
853+ defp encode_log ( % Request { api: api , params: % { live: true , experimental_live_sse: true } } , stream ) do
854+ encode_sse ( api , :log , stream )
855+ end
856+
674857 defp encode_log ( % Request { api: api } , stream ) do
675858 encode ( api , :log , stream )
676859 end
677860
678- @ spec encode_message ( Api . t ( ) | Request . t ( ) , term ( ) ) :: Enum . t ( )
679- def encode_message ( % Request { api: api } , message ) do
861+ # Error messages are encoded normally, even when using SSE
862+ # because they are returned on the original fetch request
863+ # with a status code that is not 2xx.
864+ @ spec encode_error_message ( Api . t ( ) | Request . t ( ) , term ( ) ) :: Enum . t ( )
865+ def encode_error_message ( % Api { } = api , message ) do
866+ encode ( api , :message , message )
867+ end
868+
869+ def encode_error_message ( % Request { api: api } , message ) do
680870 encode ( api , :message , message )
681871 end
682872
683- def encode_message ( % Api { } = api , message ) do
873+ @ spec encode_message ( Request . t ( ) , term ( ) ) :: Enum . t ( )
874+ def encode_message (
875+ % Request { api: api , params: % { live: true , experimental_live_sse: true } } ,
876+ message
877+ ) do
878+ encode_sse ( api , :message , message )
879+ end
880+
881+ def encode_message ( % Request { api: api } , message ) do
684882 encode ( api , :message , message )
685883 end
686884
@@ -689,6 +887,10 @@ defmodule Electric.Shapes.Api do
689887 apply ( encoder , type , [ message ] )
690888 end
691889
890+ defp encode_sse ( % Api { sse_encoder: sse_encoder } , type , message ) when type in [ :message , :log ] do
891+ apply ( sse_encoder , type , [ message ] )
892+ end
893+
692894 def schema ( % Response {
693895 api: % Api { inspector: inspector } ,
694896 shape_definition: % Shapes.Shape { } = shape
0 commit comments