Skip to content

Commit a72a835

Browse files
authored
fix: move message queue data to off-heap for gen_rpc pub sub workers (#1548)
1 parent d309c55 commit a72a835

File tree

4 files changed

+16
-3
lines changed

4 files changed

+16
-3
lines changed

config/runtime.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
6868
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
6969
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
7070
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
71-
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
71+
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
7272
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
7373

7474
no_channel_timeout_in_ms =

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ defmodule Realtime.GenRpcPubSub.Worker do
6565
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)
6666

6767
@impl true
68-
def init(pubsub), do: {:ok, pubsub}
68+
def init(pubsub) do
69+
Process.flag(:message_queue_data, :off_heap)
70+
{:ok, pubsub}
71+
end
6972

7073
@impl true
7174
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.5",
7+
version: "2.51.6",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,12 @@
11
Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []})
22
Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__)
3+
4+
defmodule Realtime.GenRpcPubSubTest do
5+
use ExUnit.Case, async: true
6+
7+
test "it sets off_heap message_queue_data flag on the workers" do
8+
assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
9+
|> Process.whereis()
10+
|> Process.info(:message_queue_data) == {:message_queue_data, :off_heap}
11+
end
12+
end

0 commit comments

Comments
 (0)