Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
name: Integration Tests
on:
pull_request:
paths:
- "lib/**"
- "test/**"
- "config/**"
- "priv/**"
- "assets/**"
- "rel/**"
- "mix.exs"
- "Dockerfile"
- "run.sh"
- "docker-compose.test.yml"

push:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
tests:
name: Tests
runs-on: blacksmith-8vcpu-ubuntu-2404

steps:
- uses: actions/checkout@v2
- name: Run integration test
run: docker compose -f docker-compose.tests.yml up --abort-on-container-exit --exit-code-from test-runner

83 changes: 83 additions & 0 deletions docker-compose.tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
services:
# Supabase Realtime service
test_db:
image: supabase/postgres:14.1.0.105
container_name: test-realtime-db
ports:
- "5532:5432"
volumes:
- ./dev/postgres:/docker-entrypoint-initdb.d/
command: postgres -c config_file=/etc/postgresql/postgresql.conf
environment:
POSTGRES_HOST: /var/run/postgresql
POSTGRES_PASSWORD: postgres
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
test_realtime:
depends_on:
- test_db
build: .
container_name: test-realtime-server
ports:
- "4100:4100"
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
PORT: 4100
DB_HOST: host.docker.internal
DB_PORT: 5532
DB_USER: postgres
DB_PASSWORD: postgres
DB_NAME: postgres
DB_ENC_KEY: 1234567890123456
DB_AFTER_CONNECT_QUERY: 'SET search_path TO _realtime'
API_JWT_SECRET: super-secret-jwt-token-with-at-least-32-characters-long
SECRET_KEY_BASE: UpNVntn3cDxHJpq99YMc1T1AQgQpc8kfYTuRgBiYa15BLrx8etQoXz3gZv1/u2oq
ERL_AFLAGS: -proto_dist inet_tcp
RLIMIT_NOFILE: 1000000
DNS_NODES: "''"
APP_NAME: realtime
RUN_JANITOR: true
JANITOR_INTERVAL: 60000
LOG_LEVEL: "info"
SEED_SELF_HOST: true
networks:
test-network:
aliases:
- realtime-dev.local
- realtime-dev.localhost
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4100/"]
interval: 10s
timeout: 5s
retries: 5
start_period: 5s

# Deno test runner
test-runner:
image: denoland/deno:alpine-2.5.6
container_name: deno-test-runner
depends_on:
test_realtime:
condition: service_healthy
test_db:
condition: service_healthy
volumes:
- ./test/integration/tests.ts:/app/tests.ts:ro
working_dir: /app
command: >
sh -c "
echo 'Running tests...' &&
deno test tests.ts --allow-import --no-check --allow-read --allow-net --trace-leaks --allow-env=WS_NO_BUFFER_UTIL
"
networks:
- test-network
extra_hosts:
- "realtime-dev.localhost:host-gateway"

networks:
test-network:
driver: bridge
25 changes: 22 additions & 3 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies

@type payload :: map | {String.t(), :json | :binary, binary}

@event_type "broadcast"
@spec handle(map(), Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()}
@spec handle(payload, Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()}
def handle(payload, %{assigns: %{private?: false}} = socket), do: handle(payload, nil, socket)

@spec handle(map(), pid() | nil, Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()}
@spec handle(payload, pid() | nil, Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()}
def handle(payload, db_conn, %{assigns: %{private?: true}} = socket) do
%{
assigns: %{
Expand Down Expand Up @@ -101,7 +103,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
end

defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}
broadcast = build_broadcast(tenant_topic, payload)

if self_broadcast do
TenantBroadcaster.pubsub_broadcast(
Expand All @@ -123,6 +125,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
end
end

# No idea why Dialyzer is complaining here
@dialyzer {:nowarn_function, build_broadcast: 2}

# Message payload was built by V2 Serializer which was originally UserBroadcastPush
defp build_broadcast(topic, {user_event, user_payload_encoding, user_payload}) do
%RealtimeWeb.Socket.UserBroadcast{
topic: topic,
user_event: user_event,
user_payload_encoding: user_payload_encoding,
user_payload: user_payload
}
end

defp build_broadcast(topic, payload) do
%Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload}
end

defp increment_rate_counter(%{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{write: false}}}} = socket) do
socket
end
Expand Down
124 changes: 85 additions & 39 deletions lib/realtime_web/channels/realtime_channel/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,92 +4,138 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
"""

require Logger
alias Phoenix.Socket.Broadcast
alias RealtimeWeb.Socket.UserBroadcast

def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do
{:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids}
end

@presence_diff "presence_diff"

@doc """
This dispatch function caches encoded messages if fastlane is used
It also sends an :update_rate_counter to the subscriber and it can conditionally log
"""
@spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok
def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do
# fastlane_pid is the actual socket transport pid
# This reduce caches the serialization and bypasses the channel process going straight to the
# transport process

message_id = message_id(msg.payload)
fastlane_pid is the actual socket transport pid
"""
@spec dispatch(list, pid, Broadcast.t() | UserBroadcast.t()) :: :ok
def dispatch(subscribers, from, %Broadcast{event: @presence_diff} = msg) do
{_cache, count} =
Enum.reduce(subscribers, {%{}, 0}, fn
{pid, _}, {cache, count} when pid == from ->
{cache, count}

{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
{_pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, _replayed_message_ids}},
{cache, count} ->
maybe_log(log_level, join_topic, msg, tenant_id)

cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache, tenant_id, log_level)
{cache, count + 1}

{pid, _}, {cache, count} ->
send(pid, msg)
{cache, count}
end)

tenant_id = tenant_id(subscribers)
increment_presence_counter(tenant_id, msg.event, count)

:ok
end

def dispatch(subscribers, from, msg) do
message_id = message_id(msg)

_ =
Enum.reduce(subscribers, %{}, fn
{pid, _}, cache when pid == from ->
cache

{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
cache ->
if already_replayed?(message_id, replayed_message_ids) do
# skip already replayed message
{cache, count}
cache
else
if event != "presence_diff", do: send(pid, :update_rate_counter)
send(pid, :update_rate_counter)

maybe_log(log_level, join_topic, msg, tenant_id)

cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
{cache, count + 1}
do_dispatch(msg, fastlane_pid, serializer, join_topic, cache, tenant_id, log_level)
end

{pid, _}, {cache, count} ->
{pid, _}, cache ->
send(pid, msg)
{cache, count}
cache
end)

tenant_id = tenant_id(subscribers)
increment_presence_counter(tenant_id, event, count)

:ok
end

defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
tenant_id
|> Realtime.Tenants.presence_events_per_second_key()
|> Realtime.GenCounter.add(count)
defp maybe_log(:info, join_topic, msg, tenant_id) when is_struct(msg) do
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
Logger.info(log, external_id: tenant_id, project: tenant_id)
end

defp increment_presence_counter(_tenant_id, _event, _count), do: :ok

defp maybe_log(:info, join_topic, msg, tenant_id) do
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
defp maybe_log(:info, join_topic, msg, tenant_id) when is_binary(msg) do
log = "Received message on #{join_topic}. #{msg}"
Logger.info(log, external_id: tenant_id, project: tenant_id)
end

defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok

defp message_id(%{"meta" => %{"id" => id}}), do: id
defp message_id(_), do: nil

defp already_replayed?(nil, _replayed_message_ids), do: false
defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id)

defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) do
defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache, tenant_id, log_level) do
case cache do
%{^serializer => encoded_msg} ->
%{^serializer => {:ok, encoded_msg}} ->
send(fastlane_pid, encoded_msg)
cache

%{^serializer => {:error, _reason}} ->
# We do nothing at this stage. It has been already logged depending on the log level
cache

%{} ->
# Use the original topic that was joined without the external_id
msg = %{msg | topic: join_topic}
encoded_msg = serializer.fastlane!(msg)
send(fastlane_pid, encoded_msg)
Map.put(cache, serializer, encoded_msg)

result =
case fastlane!(serializer, msg) do
{:ok, encoded_msg} ->
send(fastlane_pid, encoded_msg)
{:ok, encoded_msg}

{:error, reason} ->
maybe_log(log_level, join_topic, reason, tenant_id)
end

Map.put(cache, serializer, result)
end
end

defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do
tenant_id
# We have to convert because V1 does not know how to process UserBroadcast
defp fastlane!(Phoenix.Socket.V1.JSONSerializer = serializer, %UserBroadcast{} = msg) do
with {:ok, msg} <- UserBroadcast.convert_to_json_broadcast(msg) do
{:ok, serializer.fastlane!(msg)}
end
end

defp fastlane!(serializer, msg), do: {:ok, serializer.fastlane!(msg)}

defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]), do: tenant_id
defp tenant_id(_), do: nil

defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
tenant_id
|> Realtime.Tenants.presence_events_per_second_key()
|> Realtime.GenCounter.add(count)
end

defp increment_presence_counter(_tenant_id, _event, _count), do: :ok

defp message_id(%Broadcast{payload: %{"meta" => %{"id" => id}}}), do: id
defp message_id(_), do: nil

defp already_replayed?(nil, _replayed_message_ids), do: false
defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id)
end
4 changes: 2 additions & 2 deletions lib/realtime_web/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ defmodule RealtimeWeb.Endpoint do
# the expense of potentially higher memory being used.
active_n: 100,
# Skip validating UTF8 for faster frame processing.
# Currently all text frames as handled only with JSON which already requires UTF-8
# Currently all text frames are handled only with JSON which already requires UTF-8
validate_utf8: false,
serializer: [
{Phoenix.Socket.V1.JSONSerializer, "~> 1.0.0"},
{Phoenix.Socket.V2.JSONSerializer, "~> 2.0.0"}
{RealtimeWeb.Socket.V2Serializer, "~> 2.0.0"}
]
],
longpoll: [
Expand Down
3 changes: 3 additions & 0 deletions lib/realtime_web/plugs/auth_tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ defmodule RealtimeWeb.AuthTenant do
[] ->
nil

[""] ->
nil

[value | _] ->
[bearer, token] = value |> String.split(" ")
bearer = String.downcase(bearer)
Expand Down
Loading