Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ defmodule Extensions.PostgresCdcRls do

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(platform_region, node())
launch_node = Realtime.Nodes.launch_node(platform_region, node(), tenant)

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
81 changes: 68 additions & 13 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Realtime.Nodes do
def get_node_for_tenant(%Tenant{} = tenant) do
with region <- Tenants.region(tenant),
tenant_region <- platform_region_translator(region),
node <- launch_node(tenant_region, node()) do
node <- launch_node(tenant_region, node(), tenant.external_id) do
{:ok, node, tenant_region}
end
end
Expand Down Expand Up @@ -99,33 +99,88 @@ defmodule Realtime.Nodes do
@doc """
Picks the node to launch the Postgres connection on.

If there are not two nodes in a region the connection is established from
Selection is deterministic within time buckets to prevent syn conflicts from
concurrent requests for the same tenant. Uses time-bucketed seeded random
selection to pick 2 candidate nodes, compares their loads, and picks the
least loaded one.

The time bucket approach ensures:
- Requests within same time window (default: 60s) pick same nodes → prevents conflicts
- Requests in different time windows pick different random nodes → better long-term distribution

If the uptime of the node is below the configured threshold for load balancing,
a consistent node is picked based on hashing the tenant ID.

If there are not two nodes in a region, the connection is established from
the `default` node given.
"""
@spec launch_node(String.t() | nil, atom()) :: atom()
def launch_node(region, default) do
@spec launch_node(String.t() | nil, atom(), String.t()) :: atom()
def launch_node(region, default, tenant_id) when is_binary(tenant_id) do
case region_nodes(region) do
[] ->
Logger.warning("Zero region nodes for #{region} using #{inspect(default)}")
default

[single_node] ->
single_node

nodes ->
load_aware_node_picker(nodes)
load_aware_node_picker(nodes, tenant_id)
end
end

defp load_aware_node_picker(regions_nodes) do
sampled_nodes = Enum.take_random(regions_nodes, 2)
nodes_with_loads = Enum.map(sampled_nodes, &{&1, node_load(&1)})
@node_selection_time_bucket_seconds Application.compile_env(
:realtime,
:node_selection_time_bucket_seconds,
60
)

defp load_aware_node_picker(regions_nodes, tenant_id) when is_binary(tenant_id) do
case regions_nodes do
nodes ->
node_count = length(nodes)

{node1, node2} = two_random_nodes(tenant_id, nodes, node_count)

# Compare loads and pick least loaded
load1 = node_load(node1)
load2 = node_load(node2)

if length(sampled_nodes) == 2 and Enum.all?(nodes_with_loads, fn {_, load} -> is_number(load) end) do
{node, _load} = Enum.min_by(nodes_with_loads, fn {_, load} -> load end)
node
else
Enum.random(regions_nodes)
if is_number(load1) and is_number(load2) do
if load1 <= load2, do: node1, else: node2
else
# Fallback to consistently picking a node if load data is not available
index = :erlang.phash2(tenant_id, node_count)
Enum.fetch!(nodes, index)
end
end
end

defp two_random_nodes(tenant_id, nodes, node_count) do
# Get current time bucket (unix timestamp / bucket_size)
time_bucket = div(System.system_time(:second), @node_selection_time_bucket_seconds)

# Seed the RNG without storing into the process dictionary
seed_value = :erlang.phash2({tenant_id, time_bucket})
rand_state = :rand.seed_s(:exsss, seed_value)

{id1, rand_state2} = :rand.uniform_s(node_count, rand_state)
{id2, _rand_state3} = :rand.uniform_s(node_count, rand_state2)

# Ensure id2 is different from id1 when multiple nodes available
id2 =
if id1 == id2 and node_count > 1 do
# Pick next node (wraps around using rem)
rem(id1, node_count) + 1
else
id2
end

node1 = Enum.at(nodes, id1 - 1)
node2 = Enum.at(nodes, id2 - 1)
{node1, node2}
end

@doc """
Gets the node load for a node either locally or remotely. Returns {:error, :not_enough_data} if the node has not been running for long enough to get reliable metrics.
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Realtime.Operations do
platform_region = Realtime.Nodes.platform_region_translator(region)
current_node = node(pid)

case Realtime.Nodes.launch_node(platform_region, false) do
case Realtime.Nodes.launch_node(platform_region, false, tenant) do
^current_node -> acc
_ -> stop_user_tenant_process(tenant, platform_region, acc)
end
Expand Down
74 changes: 54 additions & 20 deletions test/realtime/nodes_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -210,40 +210,56 @@ defmodule Realtime.NodesTest do
end
end

describe "launch_node/2 with load-aware picking" do
test "returns a node from the region when nodes are available" do
{:ok, remote_node} = Clustered.start()
describe "launch_node/3 load-aware but not enough uptime" do
test "returns the one node from the region when one node is available" do
region = "clustered-test-region"
spawn_fake_node(region, remote_node)
spawn_fake_node(region, :remote_node)

result = Nodes.launch_node(region, node())
result = Nodes.launch_node(region, node(), "test-tenant-123")

assert result == remote_node
assert result == :remote_node
end

test "returns default node when no region nodes available" do
result = Nodes.launch_node("empty-region", node())
result = Nodes.launch_node("empty-region", node(), "test-tenant-123")

assert result == node()
end

test "picks random node when one node has insufficient data" do
region = "uptime-test-region"
test "same tenant_id picks same nodes" do
region = "deterministic-region"
spawn_fake_node(region, :node_a)
spawn_fake_node(region, :node_b)
spawn_fake_node(region, :node_c)

stub(Nodes, :node_load, fn
:node_a -> {:error, :not_enough_data}
:node_b -> 100
end)
tenant_id = "test-tenant-456"

# Call 10 times, should always return same node with hashed tenant ID
results = for _ <- 1..10, do: Nodes.launch_node(region, node(), tenant_id)

assert length(Enum.uniq(results)) == 1
end

test "different tenant_ids distribute across nodes" do
region = "distribution-region"
spawn_fake_node(region, :node_a)
spawn_fake_node(region, :node_b)
spawn_fake_node(region, :node_c)

# Generate 30 different tenant IDs
tenant_ids = for i <- 1..30, do: "tenant-#{i}"

results = for _ <- 1..10, do: Nodes.launch_node(region, node())
results =
Enum.map(tenant_ids, fn id ->
Nodes.launch_node(region, node(), id)
end)

assert Enum.all?(results, &(&1 in [:node_a, :node_b]))
# Should distribute across multiple nodes (at least 2) using the hashed tenant IDs
assert length(Enum.uniq(results)) >= 2
end
end

describe "launch_node/2 with load-aware node picking enabled" do
describe "launch_node/3 with load-aware node picking enabled" do
setup do
Application.put_env(:realtime, :node_balance_uptime_threshold_in_ms, 0)

Expand All @@ -252,8 +268,25 @@ defmodule Realtime.NodesTest do
end)
end

test "compares load between nodes and picks the least loaded" do
{:ok, remote_node} = Clustered.start()
test "picks deterministic node when one node has insufficient data" do
region = "uptime-test-region"
spawn_fake_node(region, :node_a)
spawn_fake_node(region, :node_b)

stub(Nodes, :node_load, fn
:node_a -> {:error, :not_enough_data}
:node_b -> 100
end)

results = for _ <- 1..10, do: Nodes.launch_node(region, node(), "test-tenant-123")

# Deterministic with hashed tenant ID
assert length(Enum.uniq(results)) == 1
assert Enum.uniq(results) == [:node_b]
end

test "compares load between nodes and picks the least loaded deterministically" do
{:ok, remote_node} = Clustered.start(nil, [{:realtime, :node_balance_uptime_threshold_in_ms, 0}])

region = "load-test-region"
spawn_fake_node(region, node())
Expand All @@ -265,10 +298,11 @@ defmodule Realtime.NodesTest do
assert is_integer(local_load) and local_load >= 0
assert is_integer(remote_load) and remote_load >= 0

results = for _ <- 1..10, do: Nodes.launch_node(region, node())
results = for _ <- 1..10, do: Nodes.launch_node(region, node(), "test-tenant-789")

# Should be deterministic - all same node within time bucket
assert length(Enum.uniq(results)) == 1
assert Enum.all?(results, &(&1 in [node(), remote_node]))
assert length(Enum.uniq(results)) <= 2
end
end
end