Skip to content

Commit 5a0927e

Browse files
accoRTLS
authored andcommitted
add clock drift metrics
1 parent e2e5333 commit 5a0927e

File tree

2 files changed

+88
-2
lines changed

2 files changed

+88
-2
lines changed

lib/sequin/runtime/slot_processor_server.ex

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ defmodule Sequin.Runtime.SlotProcessorServer do
3939
alias Sequin.Runtime.PostgresRelationHashCache
4040
alias Sequin.Runtime.SlotMessageStore
4141
alias Sequin.Runtime.SlotProcessor.Message
42+
alias Sequin.Time
4243
alias Sequin.Workers.CreateReplicationSlotWorker
4344

4445
require Logger
@@ -456,7 +457,10 @@ defmodule Sequin.Runtime.SlotProcessorServer do
456457
# Int64 - Server's system clock (microseconds since 2000-01-01 midnight)
457458
# Byte1 - 1 if reply requested immediately to avoid timeout, 0 otherwise
458459
# The server is not asking for a reply
459-
def handle_data(<<?k, wal_end::64, _clock::64, 0>>, %State{} = state) do
460+
def handle_data(<<?k, wal_end::64, clock::64, 0>>, %State{} = state) do
461+
diff_ms = Time.microseconds_since_2000_to_ms_since_now(clock)
462+
Logger.info("Received keepalive message for slot", clock: clock, wal_end: wal_end, diff_ms: diff_ms)
463+
460464
execute_timed(:handle_data_keepalive, fn ->
461465
# Because these are <14 Postgres databases, they will not receive heartbeat messages
462466
# temporarily mark them as healthy if we receive a keepalive message
@@ -469,9 +473,11 @@ defmodule Sequin.Runtime.SlotProcessorServer do
469473

470474
# Check if we should send an ack even though not requested
471475
if should_send_ack?(state) do
476+
Logger.info("Sending ack")
472477
commit_lsn = get_commit_lsn(state, wal_end)
473478
reply = ack_message(commit_lsn)
474479
state = %{state | last_lsn_acked_at: Sequin.utc_now()}
480+
log_keepalive_ack(commit_lsn, clock)
475481
{:keep_state_and_ack, reply, state}
476482
else
477483
{:keep_state, state}
@@ -480,11 +486,20 @@ defmodule Sequin.Runtime.SlotProcessorServer do
480486
end
481487

482488
# The server is asking for a reply
483-
def handle_data(<<?k, wal_end::64, _clock::64, 1>>, %State{} = state) do
489+
def handle_data(<<?k, wal_end::64, clock::64, 1>>, %State{} = state) do
490+
diff_ms = Time.microseconds_since_2000_to_ms_since_now(clock)
491+
492+
Logger.info("Received keepalive message for slot (expecting reply)",
493+
clock: clock,
494+
wal_end: wal_end,
495+
diff_ms: diff_ms
496+
)
497+
484498
execute_timed(:handle_data_keepalive, fn ->
485499
commit_lsn = get_commit_lsn(state, wal_end)
486500
reply = ack_message(commit_lsn)
487501
state = %{state | last_lsn_acked_at: Sequin.utc_now()}
502+
log_keepalive_ack(commit_lsn, clock)
488503
{:keep_state_and_ack, reply, state}
489504
end)
490505
end
@@ -1783,4 +1798,16 @@ defmodule Sequin.Runtime.SlotProcessorServer do
17831798
latency_us = DateTime.diff(Sequin.utc_now(), ts, :microsecond)
17841799
Prometheus.observe_ingestion_latency(state.replication_slot.id, state.replication_slot.slot_name, latency_us)
17851800
end
1801+
1802+
defp log_keepalive_ack(commit_lsn, clock) do
1803+
diff_ms = Time.microseconds_since_2000_to_ms_since_now(clock)
1804+
message = "Responded to keepalive ack in #{diff_ms}ms"
1805+
tags = [commit_lsn: commit_lsn, diff_ms: diff_ms]
1806+
1807+
case diff_ms do
1808+
diff_ms when diff_ms < 100 -> Logger.info(message, tags)
1809+
diff_ms when diff_ms < 1000 -> Logger.warning(message, tags)
1810+
_ -> Logger.error(message, tags)
1811+
end
1812+
end
17861813
end

lib/sequin/time.ex

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,63 @@ defmodule Sequin.Time do
154154
defp convert_to_ms(num, "ms"), do: num
155155
defp convert_to_ms(num, "s"), do: num * 1000
156156
defp convert_to_ms(num, "m"), do: num * 60 * 1000
157+
158+
@doc """
159+
Calculate the difference in milliseconds between two timestamps.
160+
161+
## Parameters
162+
- start: The starting timestamp
163+
- finish: The ending timestamp (defaults to now)
164+
165+
## Examples
166+
iex> start = ~N[2023-01-01 00:00:00]
167+
iex> finish = ~N[2023-01-01 00:00:01]
168+
iex> Sequin.Time.ms_since(start, finish)
169+
1000
170+
"""
171+
@spec ms_since(DateTime.t() | NaiveDateTime.t(), DateTime.t() | NaiveDateTime.t() | nil) :: integer()
172+
def ms_since(start, finish \\ nil)
173+
174+
def ms_since(%DateTime{} = start, %DateTime{} = finish) do
175+
DateTime.diff(finish, start, :millisecond)
176+
end
177+
178+
def ms_since(%DateTime{} = start, nil) do
179+
ms_since(start, DateTime.utc_now())
180+
end
181+
182+
def ms_since(%NaiveDateTime{} = start, %NaiveDateTime{} = finish) do
183+
NaiveDateTime.diff(finish, start, :millisecond)
184+
end
185+
186+
def ms_since(%NaiveDateTime{} = start, nil) do
187+
ms_since(start, NaiveDateTime.utc_now())
188+
end
189+
190+
@doc """
191+
Convert microseconds since 2000-01-01 to milliseconds difference from now.
192+
193+
This is primarily used for Postgres replication protocol timestamps.
194+
195+
## Parameters
196+
- microseconds_since_2000: Microseconds since 2000-01-01 00:00:00 UTC
197+
198+
## Returns
199+
Millisecond difference between the timestamp and now (positive if timestamp is in future)
200+
"""
201+
@spec microseconds_since_2000_to_ms_since_now(integer()) :: integer()
202+
def microseconds_since_2000_to_ms_since_now(microseconds_since_2000) do
203+
# Define the epoch (2000-01-01 midnight in UTC)
204+
{:ok, epoch_2000} = NaiveDateTime.new(2000, 1, 1, 0, 0, 0)
205+
epoch_2000_in_microseconds = NaiveDateTime.diff(epoch_2000, ~N[1970-01-01 00:00:00], :microsecond)
206+
207+
# Convert microseconds since 2000 to microseconds since Unix epoch
208+
unix_microseconds = epoch_2000_in_microseconds + microseconds_since_2000
209+
210+
# Convert to DateTime
211+
timestamp = DateTime.from_unix!(unix_microseconds, :microsecond)
212+
213+
# Calculate difference in milliseconds
214+
ms_since(timestamp, DateTime.utc_now())
215+
end
157216
end

0 commit comments

Comments
 (0)