Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# The directory Mix will write compiled artifacts to.
/_build/
sdk_compliance_adapter/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/
sdk_compliance_adapter/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/
Expand All @@ -31,4 +33,4 @@ posthog-*.tar

# Local Config
config/integration.exs
config/dev.override.exs
config/dev.override.exs
5 changes: 5 additions & 0 deletions .sampo/changesets/doughty-duchess-otso.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
hex/posthog: minor
---

Add config for retries and `gzip` compression
16 changes: 14 additions & 2 deletions lib/posthog/api/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ defmodule PostHog.API.Client do
"""
@callback client(api_key :: String.t(), cloud :: String.t()) :: t()

@doc """
Creates a struct with additional options (e.g., gzip compression).
"""
@callback client(api_key :: String.t(), cloud :: String.t(), opts :: keyword()) :: t()

@optional_callbacks [client: 3]

@doc """
Sends an API request.

Expand All @@ -46,9 +53,14 @@ defmodule PostHog.API.Client do
response()

@impl __MODULE__
def client(api_key, api_host) do
def client(api_key, api_host), do: client(api_key, api_host, [])

@impl __MODULE__
def client(api_key, api_host, opts) do
gzip = Keyword.get(opts, :gzip, false)

client =
Req.new(base_url: api_host)
Req.new(base_url: api_host, compress_body: gzip)
|> Req.Request.put_private(:api_key, api_key)

%__MODULE__{client: client, module: __MODULE__}
Expand Down
21 changes: 20 additions & 1 deletion lib/posthog/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ defmodule PostHog.Config do
default: [],
doc:
"List of OTP app names of your applications. Stacktrace entries that belong to these apps will be marked as \"in_app\"."
],
max_retries: [
type: :non_neg_integer,
default: 3,
doc:
"Maximum number of retry attempts for failed batch requests."
],
gzip: [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why make it configurable?

In general, the idea behind api_client_module option was to provide an avenue to override the whole HTTP client and therefore any option. Otherwise the top-level configuration is doomed to eventually include most options that usually come with HTTP: retries, timeouts, logging, telemetry, middleware, proxies.

If the only reason to configure gzip is to let cpu-sensitive folks disable it, I imagine they would be motivated enough to just define their own client!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree. This is me trying to get to 100% on this arbitrary set of "assumptions" we made and that all clients should fulfill. This was my first dumb way to implement it, but I think I'm tending towards just implementing a client (on the compliance adapter folder) that demonstrates that this can be supported by the implementation (by implementing your own client like one would in Elixir) rather than enabling it as a config.

I won't be merging this soon, I'll iterate on it slowly.

Question for you: how do you feel with gzip being turned on by default in something like Elixir?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, let me know if you want me to take a stab at any of this!

Answering your question, I don't know much about gzip to be honest. Are there any trade offs? Looking at Req code, it calls built-in :zlib module, which is probably a C NIF under the hood.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would never reject a PR from you, but I can also work on this eventually, don't worry.

For gzip, there's always the fear that some people already using the SDK could be running very close to their max CPU and turning gzip on could bring them to 100% which isn't really ideal - gzip is notoriously for using CPU. We do need to turn it on to make it compliant and improve our ingestion pipeline, so I probably will do it anyway even if in the end I reach the conclusion that it needs a v3.

type: :boolean,
default: false,
doc: "Enable gzip compression for request bodies."
]
] ++ @shared_schema

Expand Down Expand Up @@ -144,7 +155,15 @@ defmodule PostHog.Config do
def validate(options) do
with {:ok, validated} <- NimbleOptions.validate(options, @compiled_configuration_schema) do
config = Map.new(validated)
client = config.api_client_module.client(config.api_key, config.api_host)

client_opts = [gzip: config[:gzip] || false]

client =
if function_exported?(config.api_client_module, :client, 3) do
config.api_client_module.client(config.api_key, config.api_host, client_opts)
else
config.api_client_module.client(config.api_key, config.api_host)
end
global_properties = Map.merge(config.global_properties, @system_global_properties)

final_config =
Expand Down
55 changes: 54 additions & 1 deletion lib/posthog/sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ defmodule PostHog.Sender do
@moduledoc false
use GenServer

@retryable_statuses [408, 500, 502, 503, 504]
@initial_retry_delay_ms 1_000
@max_retry_delay_ms 30_000

defstruct [
:registry,
:index,
:api_client,
:max_batch_time_ms,
:max_batch_events,
:max_retries,
events: [],
num_events: 0
]
Expand Down Expand Up @@ -58,6 +63,7 @@ defmodule PostHog.Sender do
api_client: Keyword.fetch!(opts, :api_client),
max_batch_time_ms: Keyword.fetch!(opts, :max_batch_time_ms),
max_batch_events: Keyword.fetch!(opts, :max_batch_events),
max_retries: Keyword.get(opts, :max_retries, 3),
events: [],
num_events: 0
}
Expand Down Expand Up @@ -99,7 +105,7 @@ defmodule PostHog.Sender do
# sender is currently busy and if there is another sender available it
# should be used instead.
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :busy end)
PostHog.API.batch(state.api_client, state.events)
send_with_retries(state.api_client, state.events, state.max_retries)
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :available end)
{:noreply, %{state | events: [], num_events: 0}}
end
Expand All @@ -111,5 +117,52 @@ defmodule PostHog.Sender do

def terminate(_reason, _state), do: :ok

defp send_with_retries(api_client, events, max_retries, attempt \\ 0)

defp send_with_retries(_api_client, _events, max_retries, attempt)
when attempt > max_retries,
do: :ok

defp send_with_retries(api_client, events, max_retries, attempt) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Req itself comes with retries plugin and I think it's even enabled by default, although maybe not for POST requests. We should be able to just configure Req and change tracked client to be nice and not cut it off.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely, I'll do more research, thank you 😄

case PostHog.API.batch(api_client, events) do
{:ok, %{status: status}} when status in 200..299 ->
:ok

{:ok, %{status: status} = response} when status in @retryable_statuses ->
delay = retry_delay(attempt, response)
Process.sleep(delay)
send_with_retries(api_client, events, max_retries, attempt + 1)

{:error, _reason} ->
delay = retry_delay(attempt, nil)
Process.sleep(delay)
send_with_retries(api_client, events, max_retries, attempt + 1)

_other ->
:ok
end
end

defp retry_delay(attempt, %Req.Response{} = response) do
case Req.Response.get_retry_after(response) do
nil -> exponential_backoff(attempt)
delay_ms -> delay_ms
end
end

defp retry_delay(attempt, %{headers: %{"retry-after" => [value | _]}}) do
case Integer.parse(value) do
{seconds, ""} -> :timer.seconds(seconds)
_ -> exponential_backoff(attempt)
end
end

defp retry_delay(attempt, _response), do: exponential_backoff(attempt)

defp exponential_backoff(attempt) do
import Bitwise
min(@initial_retry_delay_ms * (1 <<< attempt), @max_retry_delay_ms)
end

defp registry_key(index), do: {__MODULE__, index}
end
1 change: 1 addition & 0 deletions lib/posthog/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ defmodule PostHog.Supervisor do
supervisor_name: config.supervisor_name,
max_batch_time_ms: Map.get(config, :max_batch_time_ms, :timer.seconds(10)),
max_batch_events: Map.get(config, :max_batch_events, 100),
max_retries: Map.get(config, :max_retries, 3),
test_mode: config.test_mode,
index: index
]},
Expand Down
71 changes: 55 additions & 16 deletions sdk_compliance_adapter/lib/sdk_compliance_adapter/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ defmodule SdkComplianceAdapter.Router do
config = SdkComplianceAdapter.State.get_config()
interval_ms = config[:max_batch_time_ms] || 100

# Wait for events to be sent (interval + buffer)
wait_time = interval_ms + 500
Process.sleep(wait_time)
# Wait for batch timer to trigger
Process.sleep(interval_ms + 100)

# Poll until sender finishes (including retries)
wait_for_senders_available(60_000)

state = SdkComplianceAdapter.State.get()

Expand Down Expand Up @@ -147,28 +149,34 @@ defmodule SdkComplianceAdapter.Router do
end

defp build_config(params) do
api_key = params["api_key"]
host = params["host"]

# Default values optimized for testing
flush_at = params["flush_at"] || 1
flush_interval_ms = params["flush_interval_ms"] || 100

%{
api_key: api_key,
api_host: host,
config = %{
api_key: params["api_key"],
api_host: params["host"],
api_client_module: SdkComplianceAdapter.TrackedClient,
supervisor_name: SdkComplianceAdapter.PostHog,
max_batch_events: flush_at,
max_batch_time_ms: flush_interval_ms,
max_batch_events: params["flush_at"] || 1,
max_batch_time_ms: params["flush_interval_ms"] || 100,
# Use a single sender for predictable testing
sender_pool_size: 1
}

config =
if params["max_retries"],
do: Map.put(config, :max_retries, params["max_retries"]),
else: config

config =
if is_boolean(params["gzip"]),
do: Map.put(config, :gzip, params["gzip"]),
else: config

config
end

defp start_posthog(config) do
# Extract extra config that's not part of the validation schema
extra_config = Map.take(config, [:max_batch_events, :max_batch_time_ms, :sender_pool_size])
extra_config =
Map.take(config, [:max_batch_events, :max_batch_time_ms, :sender_pool_size, :max_retries])

# Build the base config for validation
base_config = [
Expand All @@ -178,6 +186,11 @@ defmodule SdkComplianceAdapter.Router do
supervisor_name: config.supervisor_name
]

base_config =
if Map.has_key?(config, :gzip),
do: Keyword.put(base_config, :gzip, config.gzip),
else: base_config

# Validate and start PostHog supervisor
case PostHog.Config.validate(base_config) do
{:ok, validated_config} ->
Expand All @@ -191,6 +204,32 @@ defmodule SdkComplianceAdapter.Router do
end
end

defp wait_for_senders_available(timeout_ms) do
deadline = System.monotonic_time(:millisecond) + timeout_ms
do_wait_for_senders(deadline)
end

defp do_wait_for_senders(deadline) do
if System.monotonic_time(:millisecond) > deadline do
:timeout
else
registry = PostHog.Registry.registry_name(SdkComplianceAdapter.PostHog)

senders =
Registry.select(registry, [
{{{PostHog.Sender, :_}, :"$1", :"$2"}, [], [{{:"$2", :"$1"}}]}
])

if Enum.all?(senders, fn {status, _pid} -> status == :available end) do
Process.sleep(50)
:ok
else
Process.sleep(50)
do_wait_for_senders(deadline)
end
end
end

defp stop_posthog do
case Process.whereis(SdkComplianceAdapter.PostHog) do
nil ->
Expand Down
30 changes: 20 additions & 10 deletions sdk_compliance_adapter/lib/sdk_compliance_adapter/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,28 @@ defmodule SdkComplianceAdapter.State do
end

def record_request(status_code, event_count, uuid_list) do
request_info = %{
timestamp_ms: System.system_time(:millisecond),
status_code: status_code,
retry_attempt: 0,
event_count: event_count,
uuid_list: uuid_list
}

Agent.update(__MODULE__, fn state ->
new_state = %{state | requests_made: state.requests_made ++ [request_info]}
retry_attempt =
state.requests_made
|> Enum.count(fn req -> req.uuid_list != [] and req.uuid_list == uuid_list end)

is_retry = retry_attempt > 0

request_info = %{
timestamp_ms: System.system_time(:millisecond),
status_code: status_code,
retry_attempt: retry_attempt,
event_count: event_count,
uuid_list: uuid_list
}

new_state = %{
state
| requests_made: state.requests_made ++ [request_info],
total_retries: if(is_retry, do: state.total_retries + 1, else: state.total_retries)
}

if status_code == 200 do
if status_code in 200..299 do
%{
new_state
| total_events_sent: state.total_events_sent + event_count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ defmodule SdkComplianceAdapter.TrackedClient do
@behaviour PostHog.API.Client

@impl true
def client(api_key, api_host) do
# Create the underlying Req client
def client(api_key, api_host), do: client(api_key, api_host, [])

@impl true
def client(api_key, api_host, opts) do
gzip = Keyword.get(opts, :gzip, false)

client =
Req.new(base_url: api_host)
Req.new(base_url: api_host, compress_body: gzip)
|> Req.Request.put_private(:api_key, api_key)

# Return the standard PostHog.API.Client struct with our module
%PostHog.API.Client{client: client, module: __MODULE__}
end

Expand Down
21 changes: 21 additions & 0 deletions sdk_compliance_adapter/mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
%{
"cowboy": {:hex, :cowboy, "2.14.2", "4008be1df6ade45e4f2a4e9e2d22b36d0b5aba4e20b0a0d7049e28d124e34847", [:make, :rebar3], [{:cowlib, ">= 2.16.0 and < 3.0.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, ">= 1.8.0 and < 3.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "569081da046e7b41b5df36aa359be71a0c8874e5b9cff6f747073fc57baf1ab9"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.16.0", "54592074ebbbb92ee4746c8a8846e5605052f29309d3a873468d76cdf932076f", [:make, :rebar3], [], "hexpm", "7f478d80d66b747344f0ea7708c187645cfcc08b11aa424632f78e25bf05db51"},
"finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"},
"hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"logger_json": {:hex, :logger_json, "7.0.4", "e315f2b9a755504658a745f3eab90d88d2cd7ac2ecfd08c8da94d8893965ab5c", [:mix], [{:decimal, ">= 0.0.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:ecto, "~> 3.11", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "d1369f8094e372db45d50672c3b91e8888bcd695fdc444a37a0734e96717c45c"},
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_ownership": {:hex, :nimble_ownership, "1.0.2", "fa8a6f2d8c592ad4d79b2ca617473c6aefd5869abfa02563a77682038bf916cf", [:mix], [], "hexpm", "098af64e1f6f8609c6672127cfe9e9590a5d3fcdd82bc17a377b8692fd81a879"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"plug": {:hex, :plug, "1.19.1", "09bac17ae7a001a68ae393658aa23c7e38782be5c5c00c80be82901262c394c0", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "560a0017a8f6d5d30146916862aaf9300b7280063651dd7e532b8be168511e62"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.5", "261f21b67aea8162239b2d6d3b4c31efde4daa22a20d80b19c2c0f21b34b270e", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "20884bf58a90ff5a5663420f5d2c368e9e15ed1ad5e911daf0916ea3c57f77ac"},
"plug_crypto": {:hex, :plug_crypto, "2.1.1", "19bda8184399cb24afa10be734f84a16ea0a2bc65054e23a62bb10f06bc89491", [:mix], [], "hexpm", "6470bce6ffe41c8bd497612ffde1a7e4af67f36a15eea5f921af71cf3e11247c"},
"ranch": {:hex, :ranch, "2.2.0", "25528f82bc8d7c6152c57666ca99ec716510fe0925cb188172f41ce93117b1b0", [:make, :rebar3], [], "hexpm", "fa0b99a1780c80218a4197a59ea8d3bdae32fbff7e88527d7d8a4787eff4f8e7"},
"req": {:hex, :req, "0.5.17", "0096ddd5b0ed6f576a03dde4b158a0c727215b15d2795e59e0916c6971066ede", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0b8bc6ffdfebbc07968e59d3ff96d52f2202d0536f10fef4dc11dc02a2a43e39"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
"uuid_v7": {:hex, :uuid_v7, "0.6.0", "1d65727ade8ca619ed40fdef90c4186b50c84657d2b412f7cb79777ab2d47559", [:mix], [{:ecto, "~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "1dc401134e61da847a7b2a3b28d2593893f457b9f2704893b1ba3ff7946ce91f"},
}
Loading
Loading