Skip to content

Commit 29e3ce4

Browse files
committed
Handle Clock Skew
1 parent bad643f commit 29e3ce4

File tree

2 files changed

+31
-77
lines changed

2 files changed

+31
-77
lines changed

lib/quantum/clock_broadcaster.ex

Lines changed: 29 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -35,29 +35,41 @@ defmodule Quantum.ClockBroadcaster do
3535
:unknown -> start_time
3636
date -> date
3737
end
38+
|> NaiveDateTime.truncate(:second)
39+
# Roll back one second since handle_tick will start at `now + 1`.
40+
|> NaiveDateTime.add(-1, :second)
41+
42+
:timer.send_interval(1000, :tick)
3843

3944
{:producer,
4045
%State{
41-
time: %{start_time | microsecond: {0, 0}},
46+
time: start_time,
4247
debug_logging: debug_logging,
43-
remaining_demand: 0,
44-
timer: nil
48+
remaining_demand: 0
4549
}}
4650
end
4751

4852
@impl GenStage
49-
def handle_demand(
50-
demand,
51-
%State{remaining_demand: remaining_demand, time: time, timer: nil} = state
52-
)
53-
when demand > 0 do
54-
expected_event_count = demand + remaining_demand
53+
def handle_demand(demand, %State{remaining_demand: remaining_demand} = state) do
54+
handle_tick(%State{state | remaining_demand: remaining_demand + demand})
55+
end
56+
57+
@impl GenStage
58+
def handle_info(:tick, state) do
59+
handle_tick(state)
60+
end
61+
62+
defp handle_tick(%State{remaining_demand: 0} = state) do
63+
{:noreply, [], state}
64+
end
5565

56-
now = NaiveDateTime.utc_now()
66+
defp handle_tick(%State{remaining_demand: remaining_demand, time: time} = state)
67+
when remaining_demand > 0 do
68+
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
5769

5870
{events, new_time} =
5971
Enum.reduce_while(
60-
1..expected_event_count,
72+
1..remaining_demand,
6173
{[], time},
6274
fn _, {list, time} = acc ->
6375
new_time = NaiveDateTime.add(time, 1, :second)
@@ -66,74 +78,24 @@ defmodule Quantum.ClockBroadcaster do
6678
:lt ->
6779
{:cont, {[%Event{time: new_time, catch_up: true} | list], new_time}}
6880

69-
_ ->
81+
:eq ->
82+
{:cont, {[%Event{time: new_time, catch_up: false} | list], new_time}}
83+
84+
:gt ->
7085
{:halt, acc}
7186
end
7287
end
7388
)
7489

7590
events = Enum.reverse(events)
7691

77-
new_remaining_demand = expected_event_count - Enum.count(events)
92+
new_remaining_demand = remaining_demand - Enum.count(events)
7893

7994
if remaining_demand > 0 and new_remaining_demand == 0 do
8095
log_catched_up(state)
8196
end
8297

83-
new_timer =
84-
if new_remaining_demand > 0 do
85-
schedule_next_event_timer(new_time, now)
86-
end
87-
88-
{:noreply, events,
89-
%{state | time: new_time, remaining_demand: new_remaining_demand, timer: new_timer}}
90-
end
91-
92-
def handle_demand(demand, %State{timer: timer} = state) do
93-
Process.cancel_timer(timer)
94-
handle_demand(demand, %{state | timer: nil})
95-
end
96-
97-
@impl GenStage
98-
def handle_info(:ping, %State{remaining_demand: 0} = state) do
99-
{:noreply, [], state}
100-
end
101-
102-
def handle_info(:ping, %State{time: time, remaining_demand: remaining_demand} = state)
103-
when remaining_demand > 0 do
104-
now = NaiveDateTime.utc_now()
105-
new_time = NaiveDateTime.add(time, 1, :second)
106-
107-
case NaiveDateTime.compare(new_time, now) do
108-
:lt ->
109-
timer = schedule_next_event_timer(new_time, now)
110-
111-
{:noreply, [%Event{time: new_time, catch_up: false}],
112-
%{state | time: new_time, timer: timer}}
113-
114-
_ ->
115-
warn_event_too_early()
116-
117-
timer = schedule_next_event_timer(time, now)
118-
119-
{:noreply, [], %{state | timer: timer}}
120-
end
121-
end
122-
123-
defp schedule_next_event_timer(time, now) do
124-
next_event_diff =
125-
%{time | microsecond: {0, 0}}
126-
|> NaiveDateTime.add(1, :second)
127-
|> NaiveDateTime.diff(now, :millisecond)
128-
129-
next_event_diff =
130-
if next_event_diff < 0 do
131-
0
132-
else
133-
next_event_diff
134-
end
135-
136-
Process.send_after(self(), :ping, next_event_diff)
98+
{:noreply, events, %State{state | time: new_time, remaining_demand: new_remaining_demand}}
13799
end
138100

139101
defp log_catched_up(%State{debug_logging: false}), do: :ok
@@ -143,10 +105,4 @@ defmodule Quantum.ClockBroadcaster do
143105
Logger.debug(fn ->
144106
"[#{inspect(Node.self())}][#{__MODULE__}] Clock Producer catched up with past times and is now running in normal time"
145107
end)
146-
147-
defp warn_event_too_early,
148-
do:
149-
Logger.warn(fn ->
150-
"[#{inspect(Node.self())}][#{__MODULE__}] Clock Producer received a too early ping event, rescheduling"
151-
end)
152108
end

lib/quantum/clock_broadcaster/state.ex

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ defmodule Quantum.ClockBroadcaster.State do
66
@type t :: %__MODULE__{
77
debug_logging: boolean(),
88
time: NaiveDateTime.t(),
9-
# catch_up: boolean(),
10-
remaining_demand: non_neg_integer,
11-
timer: reference | nil
9+
remaining_demand: non_neg_integer
1210
}
1311

14-
@enforce_keys [:debug_logging, :time, :remaining_demand, :timer]
12+
@enforce_keys [:debug_logging, :time, :remaining_demand]
1513

1614
defstruct @enforce_keys
1715
end

0 commit comments

Comments
 (0)