Skip to content

Commit b6bccde

Browse files
committed
Make PostHog.Sender a pool
1 parent 11f97df commit b6bccde

File tree

4 files changed

+230
-104
lines changed

4 files changed

+230
-104
lines changed

lib/posthog/registry.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,7 @@ defmodule PostHog.Registry do
1313

1414
def via(supervisor_name, server_name),
1515
do: {:via, Registry, {registry_name(supervisor_name), server_name}}
16+
17+
def via(supervisor_name, pool_name, index),
18+
do: {:via, Registry, {registry_name(supervisor_name), {pool_name, index}}}
1619
end

lib/posthog/sender.ex

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule PostHog.Sender do
44

55
defstruct [
66
:registry,
7+
:index,
78
:api_client,
89
:max_batch_time_ms,
910
:max_batch_events,
@@ -15,7 +16,7 @@ defmodule PostHog.Sender do
1516
name =
1617
opts
1718
|> Keyword.fetch!(:supervisor_name)
18-
|> PostHog.Registry.via(__MODULE__)
19+
|> PostHog.Registry.via(__MODULE__, opts[:index])
1920

2021
callers = Process.get(:"$callers", [])
2122
Process.flag(:trap_exit, true)
@@ -33,8 +34,16 @@ defmodule PostHog.Sender do
3334
PostHog.Test.remember_event(supervisor_name, event)
3435

3536
_ ->
36-
supervisor_name
37-
|> PostHog.Registry.via(__MODULE__)
37+
senders =
38+
supervisor_name
39+
|> PostHog.Registry.registry_name()
40+
|> Registry.select([{{{__MODULE__, :_}, :"$1", :"$2"}, [], [{{:"$2", :"$1"}}]}])
41+
42+
# Pick the first available sender, otherwise random busy one.
43+
senders
44+
|> Keyword.get_lazy(:available, fn ->
45+
senders |> Keyword.values() |> Enum.random()
46+
end)
3847
|> GenServer.cast({:event, event})
3948
end
4049
end
@@ -45,6 +54,7 @@ defmodule PostHog.Sender do
4554
def init({opts, callers}) do
4655
state = %__MODULE__{
4756
registry: PostHog.Registry.registry_name(opts[:supervisor_name]),
57+
index: Keyword.fetch!(opts, :index),
4858
api_client: Keyword.fetch!(opts, :api_client),
4959
max_batch_time_ms: Keyword.fetch!(opts, :max_batch_time_ms),
5060
max_batch_events: Keyword.fetch!(opts, :max_batch_events),
@@ -54,6 +64,9 @@ defmodule PostHog.Sender do
5464

5565
Process.put(:"$callers", callers)
5666

67+
{:available, nil} =
68+
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :available end)
69+
5770
{:ok, state}
5871
end
5972

@@ -81,7 +94,9 @@ defmodule PostHog.Sender do
8194

8295
@impl GenServer
8396
def handle_continue(:send_batch, state) do
97+
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :busy end)
8498
PostHog.API.post_batch(state.api_client, state.events)
99+
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :available end)
85100
{:noreply, %{state | events: [], num_events: 0}}
86101
end
87102

@@ -91,4 +106,6 @@ defmodule PostHog.Sender do
91106
end
92107

93108
def terminate(_reason, _state), do: :ok
109+
110+
defp registry_key(index), do: {__MODULE__, index}
94111
end

lib/posthog/supervisor.ex

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,30 @@ defmodule PostHog.Supervisor do
2929
{Registry,
3030
keys: :unique,
3131
name: PostHog.Registry.registry_name(config.supervisor_name),
32-
meta: [config: config]},
32+
meta: [config: config]}
33+
] ++ senders(config)
34+
35+
Process.put(:"$callers", callers)
36+
37+
Supervisor.init(children, strategy: :one_for_one)
38+
end
39+
40+
defp senders(config) do
41+
pool_size = Map.get(config, :sender_pool_size, max(System.schedulers_online(), 2))
42+
43+
for index <- 1..pool_size do
44+
Supervisor.child_spec(
3345
{PostHog.Sender,
3446
[
3547
api_client: config.api_client,
3648
supervisor_name: config.supervisor_name,
3749
max_batch_time_ms: Map.get(config, :max_batch_time_ms, to_timeout(second: 10)),
3850
max_batch_events: Map.get(config, :max_batch_events, 100),
39-
test_mode: config.test_mode
40-
]}
41-
]
42-
43-
Process.put(:"$callers", callers)
44-
45-
Supervisor.init(children, strategy: :one_for_one)
51+
test_mode: config.test_mode,
52+
index: index
53+
]},
54+
id: {PostHog.Sender, index}
55+
)
56+
end
4657
end
4758
end

0 commit comments

Comments
 (0)