Skip to content

Commit 5ccea17

Browse files
authored
feat: replay realtime.messages (#1526)
A new index was created on inserted_at DESC, topic WHERE private IS TRUE AND extension = "broadast" The hardcoded limit is 25 for now.
1 parent 50891cd commit 5ccea17

File tree

19 files changed

+678
-81
lines changed

19 files changed

+678
-81
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ You can add your own by making a `POST` request to the server. You must change b
9494
"region": "us-west-1",
9595
"poll_interval_ms": 100,
9696
"poll_max_record_bytes": 1048576,
97-
"ssl_enforced": false
97+
"ssl_enforced": false
9898
}
9999
}
100100
]
@@ -284,6 +284,7 @@ This is the list of operational codes that can help you understand your deployme
284284
| UnknownErrorOnController | An error we are not handling correctly was triggered on a controller |
285285
| UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel |
286286
| PresenceRateLimitReached | Limit of presence events reached |
287+
| UnableToReplayMessages | An error while replaying messages |
287288

288289
## License
289290

lib/realtime/api/message.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ defmodule Realtime.Api.Message do
88
@primary_key {:id, Ecto.UUID, autogenerate: true}
99
@schema_prefix "realtime"
1010

11+
@type t :: %__MODULE__{}
12+
1113
schema "messages" do
1214
field(:topic, :string)
1315
field(:extension, Ecto.Enum, values: [:broadcast, :presence])
@@ -39,7 +41,7 @@ defmodule Realtime.Api.Message do
3941
end
4042

4143
defp maybe_put_timestamp(changeset, field) do
42-
case Map.get(changeset.data, field) do
44+
case get_field(changeset, field) do
4345
nil -> put_timestamp(changeset, field)
4446
_ -> changeset
4547
end

lib/realtime/messages.ex

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,61 @@ defmodule Realtime.Messages do
33
Handles `realtime.messages` table operations
44
"""
55

6+
alias Realtime.Api.Message
7+
8+
import Ecto.Query, only: [from: 2]
9+
10+
@hard_limit 25
11+
@default_timeout 5_000
12+
13+
@doc """
14+
Fetch last `limit ` messages for a given `topic` inserted after `since`
15+
16+
Automatically uses RPC if the database connection is not in the same node
17+
18+
Only allowed for private channels
19+
"""
20+
@spec replay(pid, String.t(), non_neg_integer, non_neg_integer) ::
21+
{:ok, Message.t(), [String.t()]} | {:error, term} | {:error, :rpc_error, term}
22+
def replay(conn, topic, since, limit) when node(conn) == node() and is_integer(since) and is_integer(limit) do
23+
limit = max(min(limit, @hard_limit), 1)
24+
25+
with {:ok, since} <- DateTime.from_unix(since, :millisecond),
26+
{:ok, messages} <- messages(conn, topic, since, limit) do
27+
{:ok, Enum.reverse(messages), MapSet.new(messages, & &1.id)}
28+
else
29+
{:error, :postgrex_exception} -> {:error, :failed_to_replay_messages}
30+
{:error, :invalid_unix_time} -> {:error, :invalid_replay_params}
31+
error -> error
32+
end
33+
end
34+
35+
def replay(conn, topic, since, limit) when is_integer(since) and is_integer(limit) do
36+
Realtime.GenRpc.call(node(conn), __MODULE__, :replay, [conn, topic, since, limit], key: topic)
37+
end
38+
39+
def replay(_, _, _, _), do: {:error, :invalid_replay_params}
40+
41+
defp messages(conn, topic, since, limit) do
42+
since = DateTime.to_naive(since)
43+
# We want to avoid searching partitions in the future as they should be empty
44+
# so we limit to 1 minute in the future to account for any potential drift
45+
now = NaiveDateTime.utc_now() |> NaiveDateTime.add(1, :minute)
46+
47+
query =
48+
from m in Message,
49+
where:
50+
m.topic == ^topic and
51+
m.private == true and
52+
m.extension == :broadcast and
53+
m.inserted_at >= ^since and
54+
m.inserted_at < ^now,
55+
limit: ^limit,
56+
order_by: [desc: m.inserted_at]
57+
58+
Realtime.Repo.all(conn, query, Message, timeout: @default_timeout)
59+
end
60+
661
@doc """
762
Deletes messages older than 72 hours for a given tenant connection
863
"""

lib/realtime/tenants/batch_broadcast.ex

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ defmodule Realtime.Tenants.BatchBroadcast do
2929
@spec broadcast(
3030
auth_params :: map() | nil,
3131
tenant :: Tenant.t(),
32-
messages :: %{messages: list(%{topic: String.t(), payload: map(), event: String.t(), private: boolean()})},
32+
messages :: %{
33+
messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()})
34+
},
3335
super_user :: boolean()
3436
) :: :ok | {:error, atom()}
3537
def broadcast(auth_params, tenant, messages, super_user \\ false)
@@ -59,8 +61,8 @@ defmodule Realtime.Tenants.BatchBroadcast do
5961
# Handle events for public channel
6062
events
6163
|> Map.get(false, [])
62-
|> Enum.each(fn %{topic: sub_topic, payload: payload, event: event} ->
63-
send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, true)
64+
|> Enum.each(fn message ->
65+
send_message_and_count(tenant, events_per_second_rate, message, true)
6466
end)
6567

6668
# Handle events for private channel
@@ -69,14 +71,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
6971
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
7072
|> Enum.each(fn {topic, events} ->
7173
if super_user do
72-
Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
73-
send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false)
74+
Enum.each(events, fn message ->
75+
send_message_and_count(tenant, events_per_second_rate, message, false)
7476
end)
7577
else
7678
case permissions_for_message(tenant, auth_params, topic) do
7779
%Policies{broadcast: %BroadcastPolicies{write: true}} ->
78-
Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
79-
send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false)
80+
Enum.each(events, fn message ->
81+
send_message_and_count(tenant, events_per_second_rate, message, false)
8082
end)
8183

8284
_ ->
@@ -91,15 +93,15 @@ defmodule Realtime.Tenants.BatchBroadcast do
9193

9294
def broadcast(_, nil, _, _), do: {:error, :tenant_not_found}
9395

94-
def changeset(payload, attrs) do
96+
defp changeset(payload, attrs) do
9597
payload
9698
|> cast(attrs, [])
9799
|> cast_embed(:messages, required: true, with: &message_changeset/2)
98100
end
99101

100-
def message_changeset(message, attrs) do
102+
defp message_changeset(message, attrs) do
101103
message
102-
|> cast(attrs, [:topic, :payload, :event, :private])
104+
|> cast(attrs, [:id, :topic, :payload, :event, :private])
103105
|> maybe_put_private_change()
104106
|> validate_required([:topic, :payload, :event])
105107
end
@@ -112,11 +114,19 @@ defmodule Realtime.Tenants.BatchBroadcast do
112114
end
113115

114116
@event_type "broadcast"
115-
defp send_message_and_count(tenant, events_per_second_rate, topic, event, payload, public?) do
116-
tenant_topic = Tenants.tenant_topic(tenant, topic, public?)
117-
payload = %{"payload" => payload, "event" => event, "type" => "broadcast"}
117+
defp send_message_and_count(tenant, events_per_second_rate, message, public?) do
118+
tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?)
118119

119-
broadcast = %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload}
120+
payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"}
121+
122+
payload =
123+
if message[:id] do
124+
Map.put(payload, "meta", %{"id" => message.id})
125+
else
126+
payload
127+
end
128+
129+
broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}
120130

121131
GenCounter.add(events_per_second_rate.id)
122132
TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)

lib/realtime/tenants/migrations.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ defmodule Realtime.Tenants.Migrations do
7474
RealtimeSendSetsTopicConfig,
7575
SubscriptionIndexBridgingDisabled,
7676
RunSubscriptionIndexBridgingDisabled,
77-
BroadcastSendErrorLogging
77+
BroadcastSendErrorLogging,
78+
CreateMessagesReplayIndex
7879
}
7980

8081
@migrations [
@@ -140,7 +141,8 @@ defmodule Realtime.Tenants.Migrations do
140141
{20_250_128_220_012, RealtimeSendSetsTopicConfig},
141142
{20_250_506_224_012, SubscriptionIndexBridgingDisabled},
142143
{20_250_523_164_012, RunSubscriptionIndexBridgingDisabled},
143-
{20_250_714_121_412, BroadcastSendErrorLogging}
144+
{20_250_714_121_412, BroadcastSendErrorLogging},
145+
{20_250_905_041_441, CreateMessagesReplayIndex}
144146
]
145147

146148
defstruct [:tenant_external_id, :settings]

lib/realtime/tenants/replication_connection.ex

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,13 @@ defmodule Realtime.Tenants.ReplicationConnection do
310310
{:ok, topic} <- get_or_error(to_broadcast, "topic", :topic_missing),
311311
{:ok, private} <- get_or_error(to_broadcast, "private", :private_missing),
312312
%Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id),
313-
broadcast_message = %{topic: topic, event: event, private: private, payload: Map.put_new(payload, "id", id)},
313+
broadcast_message = %{
314+
id: id,
315+
topic: topic,
316+
event: event,
317+
private: private,
318+
payload: Map.put_new(payload, "id", id)
319+
},
314320
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
315321
inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
316322
latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
defmodule Realtime.Tenants.Migrations.CreateMessagesReplayIndex do
2+
@moduledoc false
3+
4+
use Ecto.Migration
5+
6+
def change do
7+
create_if_not_exists index(:messages, [{:desc, :inserted_at}, :topic],
8+
where: "extension = 'broadcast' and private IS TRUE"
9+
)
10+
end
11+
end

lib/realtime_web/channels/payloads/broadcast.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ defmodule RealtimeWeb.Channels.Payloads.Broadcast do
99
embedded_schema do
1010
field :ack, :boolean, default: false
1111
field :self, :boolean, default: false
12+
embeds_one :replay, RealtimeWeb.Channels.Payloads.Broadcast.Replay
1213
end
1314

1415
def changeset(broadcast, attrs) do
1516
cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2)
17+
|> cast_embed(:replay, invalid_message: "unable to parse, expected a map")
1618
end
1719
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
defmodule RealtimeWeb.Channels.Payloads.Broadcast.Replay do
2+
@moduledoc """
3+
Validate broadcast replay field of the join payload.
4+
"""
5+
use Ecto.Schema
6+
import Ecto.Changeset
7+
alias RealtimeWeb.Channels.Payloads.Join
8+
9+
embedded_schema do
10+
field :limit, :integer, default: 10
11+
field :since, :integer, default: 0
12+
end
13+
14+
def changeset(broadcast, attrs) do
15+
cast(broadcast, attrs, [:limit, :since], message: &Join.error_message/2)
16+
end
17+
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,21 @@ defmodule RealtimeWeb.RealtimeChannel do
7272
{:ok, claims, confirm_token_ref} <- confirm_token(socket),
7373
socket = assign_authorization_context(socket, sub_topic, claims),
7474
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
75-
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
75+
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket),
76+
{:ok, replayed_message_ids} <-
77+
maybe_replay_messages(params["config"], sub_topic, db_conn, socket.assigns.private?) do
7678
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?)
7779

7880
# fastlane subscription
7981
metadata =
80-
MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id)
82+
MessageDispatcher.fastlane_metadata(
83+
transport_pid,
84+
serializer,
85+
topic,
86+
log_level,
87+
tenant_id,
88+
replayed_message_ids
89+
)
8190

8291
RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata)
8392

@@ -198,13 +207,30 @@ defmodule RealtimeWeb.RealtimeChannel do
198207
{:error, :shutdown_in_progress} ->
199208
log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby")
200209

210+
{:error, :failed_to_replay_messages} ->
211+
log_error(socket, "UnableToReplayMessages", "Realtime was unable to replay messages")
212+
213+
{:error, :invalid_replay_params} ->
214+
log_error(socket, "UnableToReplayMessages", "Replay params are not valid")
215+
201216
{:error, error} ->
202217
log_error(socket, "UnknownErrorOnChannel", error)
203218
{:error, %{reason: "Unknown Error on Channel"}}
204219
end
205220
end
206221

207222
@impl true
223+
def handle_info({:replay, messages}, socket) do
224+
for message <- messages do
225+
meta = %{"replayed" => true, "id" => message.id}
226+
payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast", "meta" => meta}
227+
228+
push(socket, "broadcast", payload)
229+
end
230+
231+
{:noreply, socket}
232+
end
233+
208234
def handle_info(:update_rate_counter, socket) do
209235
count(socket)
210236

@@ -762,4 +788,25 @@ defmodule RealtimeWeb.RealtimeChannel do
762788
do: {:error, :private_only},
763789
else: :ok
764790
end
791+
792+
defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do
793+
{:error, :invalid_replay_params}
794+
end
795+
796+
defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?)
797+
when is_map(replay_params) do
798+
with {:ok, messages, message_ids} <-
799+
Realtime.Messages.replay(
800+
db_conn,
801+
sub_topic,
802+
replay_params["since"],
803+
replay_params["limit"] || 25
804+
) do
805+
# Send to self because we can't write to the socket before finishing the join process
806+
send(self(), {:replay, messages})
807+
{:ok, message_ids}
808+
end
809+
end
810+
811+
defp maybe_replay_messages(_, _, _, _), do: {:ok, MapSet.new()}
765812
end

0 commit comments

Comments
 (0)