Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Electric.Plug.ServeShapePlug do
all_params =
Map.merge(conn.query_params, conn.path_params)
|> Map.update("live", "false", &(&1 != "false"))
|> Map.update("experimental_live_sse", "false", &(&1 != "false"))

case Api.validate(api, all_params) do
{:ok, request} ->
Expand Down
212 changes: 207 additions & 5 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Electric.Shapes.Api do
alias __MODULE__
alias __MODULE__.Request
alias __MODULE__.Response
alias __MODULE__.SseState

import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]

Expand All @@ -25,7 +26,9 @@ defmodule Electric.Shapes.Api do
required: true
],
allow_shape_deletion: [type: :boolean],
keepalive_interval: [type: :integer],
long_poll_timeout: [type: :integer],
sse_timeout: [type: :integer],
max_age: [type: :integer],
stack_ready_timeout: [type: :integer],
stale_age: [type: :integer],
Expand All @@ -48,12 +51,15 @@ defmodule Electric.Shapes.Api do
:stack_id,
:storage,
allow_shape_deletion: false,
keepalive_interval: 21_000,
long_poll_timeout: 20_000,
sse_timeout: 60_000,
max_age: 60,
stack_ready_timeout: 5_000,
stale_age: 300,
send_cache_headers?: true,
encoder: Electric.Shapes.Api.Encoder.JSON,
sse_encoder: Electric.Shapes.Api.Encoder.SSE,
configured: false
]

Expand Down Expand Up @@ -482,7 +488,7 @@ defmodule Electric.Shapes.Api do
if live? && Enum.take(log, 1) == [] do
request
|> update_attrs(%{ot_is_immediate_response: false})
|> hold_until_change()
|> handle_live_request()
else
up_to_date_lsn =
if live? do
Expand All @@ -495,9 +501,9 @@ defmodule Electric.Shapes.Api do
max(global_last_seen_lsn, chunk_end_offset.tx_offset)
end

body = Stream.concat([log, maybe_up_to_date(request, up_to_date_lsn)])
log_stream = Stream.concat(log, maybe_up_to_date(request, up_to_date_lsn))

%{response | chunked: true, body: encode_log(request, body)}
%{response | chunked: true, body: encode_log(request, log_stream)}
end

{:error, error} ->
Expand All @@ -511,6 +517,14 @@ defmodule Electric.Shapes.Api do
end
end

defp handle_live_request(%Request{params: %{experimental_live_sse: true}} = request) do
stream_sse_events(request)
end

defp handle_live_request(%Request{} = request) do
hold_until_change(request)
end

defp hold_until_change(%Request{} = request) do
%{
new_changes_ref: ref,
Expand Down Expand Up @@ -547,10 +561,183 @@ defmodule Electric.Shapes.Api do
end
end

defp stream_sse_events(%Request{} = request) do
%{
new_changes_ref: ref,
handle: shape_handle,
api: %{keepalive_interval: keepalive_interval, sse_timeout: sse_timeout},
params: %{offset: since_offset}
} = request

Logger.debug(
"Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"
)

# Set up timer for SSE comment as keep-alive
keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

# Set up timer for SSE timeout
timeout_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)

# Stream changes as SSE events for the duration of the timer.
sse_event_stream =
Stream.resource(
fn ->
%SseState{
mode: :receive,
request: request,
stream: nil,
since_offset: since_offset,
last_message_time: System.monotonic_time(:millisecond),
keepalive_ref: keepalive_ref
}
end,
&next_sse_event/1,
fn %SseState{keepalive_ref: latest_keepalive_ref} ->
Process.cancel_timer(latest_keepalive_ref)
Process.cancel_timer(timeout_ref)
end
)

response = %{request.response | chunked: true, body: sse_event_stream}

%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
end

defp next_sse_event(%SseState{mode: :receive} = state) do
%{
keepalive_ref: keepalive_ref,
last_message_time: last_message_time,
request:
%{
api:
%{
keepalive_interval: keepalive_interval
} = api,
handle: shape_handle,
new_changes_ref: ref
} = request,
since_offset: since_offset
} = state

receive do
{^ref, :new_changes, latest_log_offset} ->
updated_request =
%{request | last_offset: latest_log_offset}
|> determine_global_last_seen_lsn()
|> determine_log_chunk_offset()
|> determine_up_to_date()

# This is usually but not always the `latest_log_offset`
# as per `determine_log_chunk_offset/1`.
end_offset = updated_request.chunk_end_offset

case Shapes.get_merged_log_stream(updated_request.api, shape_handle,
since: since_offset,
up_to: end_offset
) do
{:ok, log} ->
Process.cancel_timer(keepalive_ref)

control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
message_stream = Stream.concat(log, control_messages)
encoded_stream = encode_log(updated_request, message_stream)

current_time = System.monotonic_time(:millisecond)

new_keepalive_ref =
Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

{[],
%{
state
| mode: :emit,
stream: encoded_stream,
since_offset: end_offset,
last_message_time: current_time,
keepalive_ref: new_keepalive_ref
}}

{:error, _error} ->
{[], state}
end

{^ref, :shape_rotation} ->
must_refetch = %{headers: %{control: "must-refetch"}}
message = encode_message(api, must_refetch)

{message, %{state | mode: :done}}

{:sse_keepalive, ^ref} ->
current_time = System.monotonic_time(:millisecond)
time_since_last_message = current_time - last_message_time

if time_since_last_message >= keepalive_interval do
new_keepalive_ref =
Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)

{[": keep-alive\n\n"],
%{state | last_message_time: current_time, keepalive_ref: new_keepalive_ref}}
else
# Not time to send a keep-alive yet, schedule for the remaining time
remaining_time = keepalive_interval - time_since_last_message
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, remaining_time)

{[], %{state | keepalive_ref: new_keepalive_ref}}
end

{:sse_timeout, ^ref} ->
{[], %{state | mode: :done}}
end
end

defp next_sse_event(%SseState{mode: :emit} = state) do
%{stream: stream} = state

# Can change the number taken to adjust the grouping. Currently three
# because there's typically 3 elements per SSE -- the actual message
# and the "data: " and "\n\n" delimiters around it.
#
# The JSON encoder groups stream elements by 500. So perhaps this
# could be a larger number for more efficiency?
case StreamSplit.take_and_drop(stream, 3) do
{[], _tail} ->
{[], %{state | mode: :receive, stream: nil}}

{head, tail} ->
{head, %{state | stream: tail}}
end
end

defp next_sse_event(%SseState{mode: :done} = state), do: {:halt, state}

defp clean_up_change_listener(%Request{handle: shape_handle} = request)
when not is_nil(shape_handle) do
%{api: %{registry: registry}} = request
Registry.unregister(registry, shape_handle)
%{
api: %{
registry: registry,
sse_timeout: sse_timeout
},
params: %{
live: live?,
experimental_live_sse: live_sse?
}
} = request

# When handling SSE requests, the response body is a stream that listens for
# :new_changes events. If we unregister the shape_handle event listener immediately,
# we don't receive the events. So, in this case, we unregister the shape_handle
# listener after the sse_timeout, when we can be sure that the request is over.
if live? and live_sse? do
spawn(fn ->
:timer.sleep(sse_timeout)

Registry.unregister(registry, shape_handle)
end)
else
Registry.unregister(registry, shape_handle)
end

request
end

Expand Down Expand Up @@ -598,6 +785,10 @@ defmodule Electric.Shapes.Api do
def stack_id(%Api{stack_id: stack_id}), do: stack_id
def stack_id(%{api: %{stack_id: stack_id}}), do: stack_id

defp encode_log(%Request{api: api, params: %{live: true, experimental_live_sse: true}}, stream) do
encode_sse(api, :log, stream)
end

defp encode_log(%Request{api: api}, stream) do
encode(api, :log, stream)
end
Expand All @@ -607,6 +798,13 @@ defmodule Electric.Shapes.Api do
encode(api, :message, message)
end

def encode_message(
%Request{api: api, params: %{live: true, experimental_live_sse: true}},
message
) do
encode_sse(api, :message, message)
end

def encode_message(%Request{api: api}, message) do
encode(api, :message, message)
end
Expand All @@ -615,6 +813,10 @@ defmodule Electric.Shapes.Api do
apply(encoder, type, [message])
end

defp encode_sse(%Api{sse_encoder: sse_encoder}, type, message) when type in [:message, :log] do
apply(sse_encoder, type, [message])
end

def schema(%Response{
api: %Api{inspector: inspector},
shape_definition: %Shapes.Shape{} = shape
Expand Down
30 changes: 30 additions & 0 deletions packages/sync-service/lib/electric/shapes/api/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ defmodule Electric.Shapes.Api.Encoder.JSON do
end
end

defmodule Electric.Shapes.Api.Encoder.SSE do
@behaviour Electric.Shapes.Api.Encoder

@impl Electric.Shapes.Api.Encoder
def log(item_stream) do
# Note that, unlike the JSON log encoder, this doesn't currently use
# `Stream.chunk_every/1`.
#
# This is because it's only handling live events and is usually used
# for small updates (the point of enabling SSE mode is to avoid request
# overhead when consuming small changes).

item_stream
|> Stream.flat_map(&message/1)
end

@impl Electric.Shapes.Api.Encoder
def message(message) do
["data: ", ensure_json(message), "\n\n"]
end

defp ensure_json(json) when is_binary(json) do
json
end

defp ensure_json(term) do
Jason.encode_to_iodata!(term)
end
end

defmodule Electric.Shapes.Api.Encoder.Term do
@behaviour Electric.Shapes.Api.Encoder

Expand Down
17 changes: 17 additions & 0 deletions packages/sync-service/lib/electric/shapes/api/params.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Electric.Shapes.Api.Params do
import Ecto.Changeset

@tmp_compaction_flag :experimental_compaction
@tmp_sse_flag :experimental_live_sse

@primary_key false
defmodule ColumnList do
Expand Down Expand Up @@ -50,6 +51,7 @@ defmodule Electric.Shapes.Api.Params do
field(:replica, Ecto.Enum, values: [:default, :full], default: :default)
field(:params, {:map, :string}, default: %{})
field(@tmp_compaction_flag, :boolean, default: false)
field(@tmp_sse_flag, :boolean, default: false)
end

@type t() :: %__MODULE__{}
Expand All @@ -61,6 +63,7 @@ defmodule Electric.Shapes.Api.Params do
|> cast_offset()
|> validate_handle_with_offset()
|> validate_live_with_offset()
|> validate_live_sse()
|> cast_root_table(api)
|> apply_action(:validate)
|> convert_error(api)
Expand Down Expand Up @@ -150,6 +153,20 @@ defmodule Electric.Shapes.Api.Params do
end
end

def validate_live_sse(%Ecto.Changeset{valid?: false} = changeset), do: changeset

def validate_live_sse(%Ecto.Changeset{} = changeset) do
live = get_field(changeset, :live)

if live do
changeset
else
validate_exclusion(changeset, @tmp_sse_flag, [true],
message: "can't be true unless live is also true"
)
end
end

def cast_root_table(%Ecto.Changeset{valid?: false} = changeset, _), do: changeset

def cast_root_table(%Ecto.Changeset{} = changeset, %Api{shape: nil} = api) do
Expand Down
Loading
Loading