Skip to content

Commit 36a89a2

Browse files
committed
Use crontab DateTime (TZ) Support
1 parent 4a9a535 commit 36a89a2

File tree

16 files changed

+67
-210
lines changed

16 files changed

+67
-210
lines changed

lib/quantum/clock_broadcaster.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ defmodule Quantum.ClockBroadcaster do
4444
|> storage.last_execution_date()
4545
|> case do
4646
:unknown -> start_time
47-
date -> date
47+
date -> DateTime.from_naive!(date, "Etc/UTC")
4848
end
49-
|> NaiveDateTime.truncate(:second)
49+
|> DateTime.truncate(:second)
5050
# Roll back one second since handle_tick will start at `now + 1`.
51-
|> NaiveDateTime.add(-1, :second)
51+
|> DateTime.add(-1, :second)
5252

5353
:timer.send_interval(1000, :tick)
5454

@@ -80,16 +80,16 @@ defmodule Quantum.ClockBroadcaster do
8080

8181
defp handle_tick(%State{remaining_demand: remaining_demand, time: time} = state)
8282
when remaining_demand > 0 do
83-
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
83+
now = DateTime.truncate(DateTime.utc_now(), :second)
8484

8585
{events, new_time} =
8686
Enum.reduce_while(
8787
1..remaining_demand,
8888
{[], time},
8989
fn _, {list, time} = acc ->
90-
new_time = NaiveDateTime.add(time, 1, :second)
90+
new_time = DateTime.add(time, 1, :second)
9191

92-
case NaiveDateTime.compare(new_time, now) do
92+
case DateTime.compare(new_time, now) do
9393
:lt ->
9494
{:cont, {[%Event{time: new_time, catch_up: true} | list], new_time}}
9595

lib/quantum/clock_broadcaster/event.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Quantum.ClockBroadcaster.Event do
44
# Clock Event
55

66
@type t :: %__MODULE__{
7-
time: NaiveDateTime.t(),
7+
time: DateTime.t(),
88
catch_up: boolean()
99
}
1010

lib/quantum/clock_broadcaster/init_opts.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Quantum.ClockBroadcaster.InitOpts do
66
alias Quantum.{Scheduler, Storage}
77

88
@type t :: %__MODULE__{
9-
start_time: NaiveDateTime.t(),
9+
start_time: DateTime.t(),
1010
storage: Storage,
1111
scheduler: Scheduler,
1212
debug_logging: boolean()

lib/quantum/clock_broadcaster/start_opts.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Quantum.ClockBroadcaster.StartOpts do
77

88
@type t :: %__MODULE__{
99
name: GenServer.server(),
10-
start_time: NaiveDateTime.t(),
10+
start_time: DateTime.t(),
1111
storage: Storage,
1212
scheduler: Scheduler,
1313
debug_logging: boolean()

lib/quantum/clock_broadcaster/state.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule Quantum.ClockBroadcaster.State do
55

66
@type t :: %__MODULE__{
77
debug_logging: boolean(),
8-
time: NaiveDateTime.t(),
8+
time: DateTime.t(),
99
remaining_demand: non_neg_integer
1010
}
1111

lib/quantum/date_library.ex

Lines changed: 0 additions & 80 deletions
This file was deleted.

lib/quantum/execution_broadcaster.ex

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,6 @@ defmodule Quantum.ExecutionBroadcaster do
1212

1313
alias Quantum.ClockBroadcaster.Event, as: ClockEvent
1414

15-
alias Quantum.{
16-
DateLibrary,
17-
DateLibrary.InvalidDateTimeForTimezoneError,
18-
DateLibrary.InvalidTimezoneError
19-
}
20-
2115
alias Quantum.ExecutionBroadcaster.Event, as: ExecuteEvent
2216
alias Quantum.ExecutionBroadcaster.InitOpts
2317
alias Quantum.ExecutionBroadcaster.State
@@ -202,7 +196,7 @@ defmodule Quantum.ExecutionBroadcaster do
202196
} = state,
203197
time
204198
) do
205-
case NaiveDateTime.compare(time, time_to_execute) do
199+
case DateTime.compare(time, time_to_execute) do
206200
:gt ->
207201
raise "Jobs were skipped"
208202

@@ -226,7 +220,7 @@ defmodule Quantum.ExecutionBroadcaster do
226220
jobs
227221
|> Enum.reduce(
228222
%{state | execution_timeline: tail},
229-
&add_job_to_state(&1, &2, NaiveDateTime.add(time, 1, :second))
223+
&add_job_to_state(&1, &2, DateTime.add(time, 1, :second))
230224
)
231225
|> sort_state
232226
|> execute_events_to_fire(time)
@@ -240,11 +234,17 @@ defmodule Quantum.ExecutionBroadcaster do
240234
state,
241235
time
242236
) do
243-
job
244-
|> get_next_execution_time(time)
245-
|> case do
246-
{:ok, date} ->
247-
add_to_state(state, time, date, job)
237+
with {:ok, execution_date} <- get_next_execution_time(job, time) do
238+
add_to_state(state, time, execution_date, job)
239+
else
240+
{:error, :time_zone_not_found} ->
241+
Logger.error(
242+
"Invalid Timezone #{inspect(timezone)} provided for job #{inspect(name)}.",
243+
job: job,
244+
error: :time_zone_not_found
245+
)
246+
247+
state
248248

249249
{:error, _} ->
250250
Logger.warning(fn ->
@@ -256,54 +256,32 @@ defmodule Quantum.ExecutionBroadcaster do
256256

257257
state
258258
end
259-
rescue
260-
e in InvalidTimezoneError ->
261-
Logger.error(
262-
"Invalid Timezone #{inspect(timezone)} provided for job #{inspect(name)}.",
263-
job: job,
264-
error: e
265-
)
266-
267-
state
268259
end
269260

270261
defp get_next_execution_time(
271-
%Job{schedule: schedule, timezone: timezone, name: name} = job,
262+
%Job{schedule: schedule, timezone: :utc},
272263
time
273264
) do
274-
schedule
275-
|> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
276-
|> case do
277-
{:ok, date} ->
278-
{:ok, DateLibrary.to_utc!(date, timezone)}
265+
CrontabScheduler.get_next_run_date(schedule, time)
266+
end
279267

280-
{:error, _} = error ->
281-
error
268+
defp get_next_execution_time(
269+
%Job{schedule: schedule, timezone: timezone},
270+
time
271+
) do
272+
with {:ok, localized_time} <- DateTime.shift_zone(time, timezone),
273+
{:ok, localized_execution_time} <-
274+
CrontabScheduler.get_next_run_date(schedule, localized_time) do
275+
DateTime.shift_zone(localized_execution_time, "Etc/UTC")
282276
end
283-
rescue
284-
_ in InvalidDateTimeForTimezoneError ->
285-
next_time = NaiveDateTime.add(time, 60, :second)
286-
287-
Logger.warning(fn ->
288-
"""
289-
Next execution time for job #{inspect(name)} is not a valid time.
290-
Retrying with #{inspect(next_time)}
291-
"""
292-
end)
293-
294-
get_next_execution_time(job, next_time)
295277
end
296278

297279
defp sort_state(%State{execution_timeline: execution_timeline} = state) do
298-
%{
299-
state
300-
| execution_timeline:
301-
Enum.sort_by(execution_timeline, fn {date, _} -> NaiveDateTime.to_erl(date) end)
302-
}
280+
%{state | execution_timeline: Enum.sort_by(execution_timeline, &elem(&1, 0), DateTime)}
303281
end
304282

305283
defp add_to_state(%State{execution_timeline: execution_timeline} = state, time, date, job) do
306-
unless NaiveDateTime.compare(time, date) in [:lt, :eq] do
284+
unless DateTime.compare(time, date) in [:lt, :eq] do
307285
raise Quantum.ExecutionBroadcaster.JobInPastError
308286
end
309287

lib/quantum/execution_broadcaster/state.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule Quantum.ExecutionBroadcaster.State do
88

99
@type t :: %__MODULE__{
1010
uninitialized_jobs: [Job.t()],
11-
execution_timeline: [{NaiveDateTime.t(), [Job.t()]}],
11+
execution_timeline: [{DateTime.t(), [Job.t()]}],
1212
storage: StorageAdapter,
1313
storage_pid: StorageAdapter.storage_pid(),
1414
scheduler: Quantum,

lib/quantum/storage.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ defmodule Quantum.Storage do
6262
Returns `:unknown` if the storage does not know the last execution time.
6363
In this case all jobs will be run at the next applicable date.
6464
"""
65-
@callback last_execution_date(storage_pid :: storage_pid) :: :unknown | NaiveDateTime.t()
65+
@callback last_execution_date(storage_pid :: storage_pid) :: :unknown | DateTime.t()
6666

6767
@doc """
6868
Update last execution time to given date.
6969
"""
7070
@callback update_last_execution_date(
7171
storage_pid :: storage_pid,
72-
last_execution_date :: NaiveDateTime.t()
72+
last_execution_date :: DateTime.t()
7373
) :: :ok
7474

7575
@doc """

lib/quantum/supervisor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ defmodule Quantum.Supervisor do
4646
opts
4747
|> Map.take([:debug_logging, :storage, :scheduler])
4848
|> Map.put(:name, clock_broadcaster_name)
49-
|> Map.put(:start_time, NaiveDateTime.utc_now())
49+
|> Map.put(:start_time, DateTime.utc_now())
5050
)
5151

5252
job_broadcaster_opts =

0 commit comments

Comments
 (0)