Skip to content

Commit 9bb88bb

Browse files
committed
✨ SlotProcessorServer with ProcessMetrics
1 parent 459a8f6 commit 9bb88bb

File tree

3 files changed

+290
-448
lines changed

3 files changed

+290
-448
lines changed

lib/sequin/postgres/replication_connection.ex

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ defmodule Sequin.Postgres.ReplicationConnection do
140140

141141
@behaviour :gen_statem
142142

143+
use Sequin.ProcessMetrics.Decorator
144+
143145
import Bitwise
144146

145147
alias Postgrex.Protocol
@@ -492,19 +494,17 @@ defmodule Sequin.Postgres.ReplicationConnection do
492494
end
493495

494496
def handle_event(:info, msg, @state, %{protocol: protocol, streaming: streaming} = s) do
495-
execute_timed(:handle_data, fn ->
496-
case Protocol.handle_copy_recv(msg, streaming, protocol) do
497-
{:ok, copies, protocol} ->
498-
handle_data(copies, %{s | protocol: protocol})
497+
case Protocol.handle_copy_recv(msg, streaming, protocol) do
498+
{:ok, copies, protocol} ->
499+
handle_data(copies, %{s | protocol: protocol})
499500

500-
:unknown ->
501-
%{state: {mod, mod_state}} = s
502-
maybe_handle(mod, :handle_info, [msg, mod_state], s)
501+
:unknown ->
502+
%{state: {mod, mod_state}} = s
503+
maybe_handle(mod, :handle_info, [msg, mod_state], s)
503504

504-
{error, reason, protocol} ->
505-
reconnect_or_stop(error, reason, protocol, s)
506-
end
507-
end)
505+
{error, reason, protocol} ->
506+
reconnect_or_stop(error, reason, protocol, s)
507+
end
508508
end
509509

510510
def handle_event({:timeout, :reconnect}, nil, @state, s) do
@@ -626,16 +626,4 @@ defmodule Sequin.Postgres.ReplicationConnection do
626626

627627
defp opts, do: Process.get(__MODULE__)
628628
defp put_opts(opts), do: Process.put(__MODULE__, opts)
629-
630-
defp incr_counter(name, amount \\ 1) do
631-
current = Process.get(name, 0)
632-
Process.put(name, current + amount)
633-
end
634-
635-
defp execute_timed(name, fun) do
636-
{time, result} = :timer.tc(fun, :millisecond)
637-
incr_counter(:"#{name}_total_ms", time)
638-
incr_counter(:"#{name}_count")
639-
result
640-
end
641629
end

lib/sequin/process_metrics.ex

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -131,19 +131,23 @@ defmodule Sequin.ProcessMetrics do
131131

132132
# Add the handle_info callback for process_logging
133133
def handle_info(:process_logging, state) do
134+
handle_process_logging()
135+
136+
Process.send_after(self(), :process_logging, process_metrics_interval())
137+
Sequin.ProcessMetrics.no_reply(state)
138+
end
139+
140+
defp handle_process_logging do
134141
# Get dynamic tags from process dictionary
135142
dynamic_tags = Sequin.ProcessMetrics.get_metadata()
136143
# Merge static and dynamic tags
137144
tags = Map.merge(process_metrics_tags(), dynamic_tags)
138145

139146
Sequin.ProcessMetrics.handle_process_logging(
140-
interval: process_metrics_interval(),
141147
metric_prefix: process_metrics_metric_prefix(),
142148
logger_prefix: process_metrics_logger_prefix(),
143149
tags: tags
144150
)
145-
146-
Sequin.ProcessMetrics.no_reply(state)
147151
end
148152
end
149153
end
@@ -277,7 +281,7 @@ defmodule Sequin.ProcessMetrics do
277281
* `metric_prefix` - The prefix to use for StatsD metrics
278282
* `tags` - Additional tags to include in StatsD metrics
279283
"""
280-
def handle_process_logging(interval: interval, metric_prefix: metric_prefix, logger_prefix: logger_prefix, tags: tags) do
284+
def handle_process_logging(metric_prefix: metric_prefix, logger_prefix: logger_prefix, tags: tags) do
281285
now = System.monotonic_time(:millisecond)
282286
last_logged_at = Process.get(@metrics_last_logged_at_key)
283287
interval_ms = if last_logged_at, do: now - last_logged_at
@@ -435,15 +439,8 @@ defmodule Sequin.ProcessMetrics do
435439
put_metrics(@default_state)
436440

437441
# Schedule next logging and update last logged time
438-
schedule_process_logging(interval)
439442
Process.put(@metrics_last_logged_at_key, now)
440443
end
441-
442-
# Private functions
443-
444-
defp schedule_process_logging(interval) do
445-
Process.send_after(self(), :process_logging, interval)
446-
end
447444
end
448445

449446
defmodule Sequin.ProcessMetrics.Decorator do

0 commit comments

Comments
 (0)