Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
name: Lint
on:
pull_request:
paths:
- "lib/**"
- "test/**"
- "config/**"
- "priv/**"
- "assets/**"
- "rel/**"
- "mix.exs"
- "Dockerfile"
- "run.sh"

push:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
tests:
name: Lint
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Setup elixir
id: beam
uses: erlef/setup-beam@v1
with:
otp-version: 27.x # Define the OTP version [required]
elixir-version: 1.17.x # Define the elixir version [required]
- name: Cache Mix
uses: actions/cache@v4
with:
path: |
deps
_build
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-

- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
run: docker compose -f docker-compose.dbs.yml up -d
- name: Run main database migrations
run: mix ecto.migrate --log-migrator-sql
- name: Run database tenant migrations
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
- name: Run format check
run: mix format --check-formatted
- name: Credo checks
run: mix credo
- name: Run hex audit
run: mix hex.audit
- name: Run mix_audit
run: mix deps.audit
- name: Run sobelow
run: mix sobelow --config .sobelow-conf
- name: Retrieve PLT Cache
uses: actions/cache@v4
id: plt-cache
with:
path: priv/plts
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Create PLTs
if: steps.plt-cache.outputs.cache-hit != 'true'
run: |
mkdir -p priv/plts
mix dialyzer.build
- name: Run dialyzer
run: mix dialyzer
- name: Run dev seeds
run: DB_ENC_KEY="1234567890123456" mix ecto.setup
46 changes: 14 additions & 32 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ on:
branches:
- main

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
MIX_ENV: test

jobs:
tests:
name: Tests
Expand All @@ -32,44 +39,19 @@ jobs:
- name: Cache Mix
uses: actions/cache@v4
with:
path: deps
key: ${{ runner.os }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
path: |
deps
_build
key: ${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ runner.os }}-mix-
${{ github.workflow }}-${{ runner.os }}-mix-${{ env.elixir }}-${{ env.otp }}-

- name: Pull postgres image quietly in background (used by test/support/containers.ex)
run: docker pull supabase/postgres:15.8.1.040 > /dev/null 2>&1 &
- name: Install dependencies
run: mix deps.get
- name: Set up Postgres
run: docker compose -f docker-compose.dbs.yml up -d
- name: Run main database migrations
run: mix ecto.migrate --log-migrator-sql
- name: Run database tenant migrations
run: mix ecto.migrate --migrations-path lib/realtime/tenants/repo/migrations
- name: Run format check
run: mix format --check-formatted
- name: Credo checks
run: mix credo
- name: Run hex audit
run: mix hex.audit
- name: Run mix_audit
run: mix deps.audit
- name: Run sobelow
run: mix sobelow --config .sobelow-conf
- name: Retrieve PLT Cache
uses: actions/cache@v4
id: plt-cache
with:
path: priv/plts
key: ${{ runner.os }}-${{ steps.beam.outputs.otp-version }}-${{ steps.beam.outputs.elixir-version }}-plts-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Create PLTs
if: steps.plt-cache.outputs.cache-hit != 'true'
run: |
mkdir -p priv/plts
mix dialyzer.build
- name: Run dialyzer
run: mix dialyzer
- name: Run dev seeds
run: DB_ENC_KEY="1234567890123456" mix ecto.setup
- name: Start epmd
run: epmd -daemon
- name: Run tests
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ PORT ?= 4000
# Common commands

dev: ## Start a dev server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=us-east-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server

dev.orange: ## Start another dev server (orange) on port 4001
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name [email protected] --cookie cookie -S mix phx.server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=eu-west-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name [email protected] --cookie cookie -S mix phx.server

seed: ## Seed the database
DB_ENC_KEY="1234567890123456" FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 mix run priv/repo/dev_seeds.exs
Expand Down
4 changes: 3 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime
# Disable caching to ensure the rendered spec is refreshed
config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache

config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
# Disabled but can print to stdout with:
# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
config :opentelemetry, traces_exporter: :none

config :mix_test_watch, clear: true
6 changes: 4 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)

no_channel_timeout_in_ms =
if config_env() == :test,
Expand Down Expand Up @@ -126,7 +127,8 @@ config :realtime,
no_channel_timeout_in_ms: no_channel_timeout_in_ms,
platform: platform,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size
broadcast_pool_size: broadcast_pool_size,
users_scope_shards: users_scope_shards

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
39 changes: 29 additions & 10 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.Database
alias Realtime.RateCounter
alias Realtime.Tenants

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

Expand All @@ -26,6 +28,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)

%Realtime.Api.Tenant{} = Tenants.Cache.get_tenant_by_external_id(tenant_id)

rate_counter_args = Tenants.db_events_per_second_rate(tenant_id, 4000)

RateCounter.new(rate_counter_args)

state = %{
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
db_host: args["db_host"],
Expand All @@ -41,7 +49,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_ref: nil,
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant_id: tenant_id
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand Down Expand Up @@ -74,7 +83,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_record_bytes: max_record_bytes,
max_changes: max_changes,
conn: conn,
tenant_id: tenant_id
tenant_id: tenant_id,
rate_counter_args: rate_counter_args
} = state
) do
cancel_timer(poll_ref)
Expand All @@ -84,7 +94,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant_id)

case handle_list_changes_result(list_changes, tenant_id) do
case handle_list_changes_result(list_changes, tenant_id, rate_counter_args) do
{:ok, row_count} ->
Backoff.reset(backoff)

Expand Down Expand Up @@ -177,20 +187,29 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
rows: [_ | _] = rows,
num_rows: rows_count
}},
tenant_id
tenant_id,
rate_counter_args
) do
for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id
case RateCounter.get(rate_counter_args) do
{:ok, %{limit: %{triggered: true}}} ->
:ok

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher)
_ ->
Realtime.GenCounter.add(rate_counter_args.id, rows_count)

for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
end
end

{:ok, rows_count}
end

defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}
defp handle_list_changes_result({:ok, _}, _, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _, _), do: {:error, reason}

def generate_record([
{"wal",
Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ defmodule Realtime.Application do
Realtime.PromEx.set_metrics_tags()
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
:syn.set_event_handler(Realtime.SynHandler)

:ok = :syn.add_node_to_scopes([:users, RegionNodes, Realtime.Tenants.Connect])
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])

region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
Expand Down
6 changes: 5 additions & 1 deletion lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ defmodule Realtime.GenRpcPubSub.Worker do
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)

@impl true
def init(pubsub), do: {:ok, pubsub}
def init(pubsub) do
Process.flag(:message_queue_data, :off_heap)
Process.flag(:fullsweep_after, 100)
{:ok, pubsub}
end

@impl true
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
Expand Down
8 changes: 4 additions & 4 deletions lib/realtime/monitoring/latency.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Realtime.Latency do
use Realtime.Logs

alias Realtime.Nodes
alias Realtime.Rpc
alias Realtime.GenRpc

defmodule Payload do
@moduledoc false
Expand All @@ -33,7 +33,7 @@ defmodule Realtime.Latency do
}
end

@every 5_000
@every 15_000
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
Expand Down Expand Up @@ -76,7 +76,7 @@ defmodule Realtime.Latency do
Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
{latency, response} =
:timer.tc(fn ->
Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
GenRpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
end)

latency_ms = latency / 1_000
Expand All @@ -85,7 +85,7 @@ defmodule Realtime.Latency do
from_node = Nodes.short_node_id_from_name(Node.self())

case response do
{:badrpc, reason} ->
{:error, :rpc_error, reason} ->
log_error(
"RealtimeNodeDisconnected",
"Unable to connect to #{short_name} from #{region}: #{reason}"
Expand Down
25 changes: 22 additions & 3 deletions lib/realtime/monitoring/prom_ex/plugins/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ defmodule Realtime.PromEx.Plugins.Tenant do
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Tenant payload size",
tags: [:tenant],
tags: [:tenant, :message_type],
unit: :byte,
reporter_options: [
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
),
distribution(
[:realtime, :payload, :size],
event_name: [:realtime, :tenants, :payload, :size],
measurement: :size,
description: "Payload size",
tags: [:message_type],
unit: :byte,
reporter_options: [
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
]
)
]
Expand Down Expand Up @@ -157,20 +158,38 @@ defmodule Realtime.PromEx.Plugins.Tenant do
description: "Sum of messages sent on a Realtime Channel.",
tags: [:tenant]
),
sum(
[:realtime, :channel, :global, :events],
event_name: [:realtime, :rate_counter, :channel, :events],
measurement: :sum,
description: "Global sum of messages sent on a Realtime Channel."
),
sum(
[:realtime, :channel, :presence_events],
event_name: [:realtime, :rate_counter, :channel, :presence_events],
measurement: :sum,
description: "Sum of presence messages sent on a Realtime Channel.",
tags: [:tenant]
),
sum(
[:realtime, :channel, :global, :presence_events],
event_name: [:realtime, :rate_counter, :channel, :presence_events],
measurement: :sum,
description: "Global sum of presence messages sent on a Realtime Channel."
),
sum(
[:realtime, :channel, :db_events],
event_name: [:realtime, :rate_counter, :channel, :db_events],
measurement: :sum,
description: "Sum of db messages sent on a Realtime Channel.",
tags: [:tenant]
),
sum(
[:realtime, :channel, :global, :db_events],
event_name: [:realtime, :rate_counter, :channel, :db_events],
measurement: :sum,
description: "Global sum of db messages sent on a Realtime Channel."
),
sum(
[:realtime, :channel, :joins],
event_name: [:realtime, :rate_counter, :channel, :joins],
Expand Down
Loading
Loading