Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 52 additions & 26 deletions .github/workflows/part_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,33 @@ env:
BUILD_EMBEDDED: true

jobs:
detectToolVersions:
name: "Detect Tool Versions"

runs-on: ubuntu-latest

outputs:
otpVersion: "${{ steps.toolVersions.outputs.OTP_VERSION }}"
elixirVersion: "${{ steps.toolVersions.outputs.ELIXIR_VERSION }}"

steps:
- name: Harden Runner
uses: step-security/harden-runner@ec9f2d5744a09debf3a187a3f4f675c53b671911 # v2.13.0
with:
egress-policy: audit

- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: "Read .tool-versions"
id: toolVersions
run: |
OTP_VERSION="$(cat .tool-versions | grep erlang | cut -d' ' -f2-)"
echo OTP: $OTP_VERSION
echo "OTP_VERSION=${OTP_VERSION}" >> $GITHUB_OUTPUT

ELIXIR_VERSION="$(cat .tool-versions | grep elixir | cut -d' ' -f2-)"
echo Rebar: $ELIXIR_VERSION
echo "ELIXIR_VERSION=${ELIXIR_VERSION}" >> $GITHUB_OUTPUT

format:
name: Check Formatting

Expand Down Expand Up @@ -39,57 +66,56 @@ jobs:
- run: mix format --check-formatted

test:
name: Run Tests & Submit Coverage
name: Run Tests & Submit Coverage (${{ matrix.name }})

runs-on: ubuntu-latest
needs: ["detectToolVersions"]

runs-on: ${{ matrix.runs-on }}

strategy:
fail-fast: false
matrix:
include:
- otp: '24.3'
elixir: '1.12'
- otp: '25.2'
elixir: '1.13'
- otp: '25.2'
elixir: '1.14'
- otp: '26.1'
elixir: '1.15'
- otp: '26.1'
elixir: 'main'
- currentMainVersion: true
enable_coverage_export: 'true'
# Lowest Supported
- otp: "24.2"
elixir: "1.15"
runs-on: ubuntu-22.04
name: "lowest"
# Latest Supported
- otp: "${{ needs.detectToolVersions.outputs.otpVersion }}"
elixir: "${{ needs.detectToolVersions.outputs.elixirVersion }}"
runs-on: ubuntu-24.04
name: "latest"
enable_coverage_export: "true"
# Test Main
- otp: "${{ needs.detectToolVersions.outputs.otpVersion }}"
elixir: "main"
runs-on: ubuntu-24.04
name: "main"

env:
MIX_ENV: test

steps:
- uses: actions/checkout@v4
- uses: erlef/setup-elixir@v1
if: ${{ !matrix.currentMainVersion }}
id: setupBEAMDynamic
id: setupBEAM
with:
otp-version: ${{ matrix.otp }}
elixir-version: ${{ matrix.elixir }}
- uses: erlef/setup-elixir@v1
if: ${{ matrix.currentMainVersion }}
id: setupBEAMCurrent
with:
version-file: '.tool-versions'
version-type: strict
- uses: actions/cache@v4
with:
path: deps
key: deps-${{ runner.os }}-${{ steps.setupBEAMCurrent.outputs.otp-version }}${{ steps.setupBEAMDynamic.outputs.otp-version }}-${{ steps.setupBEAMCurrent.outputs.elixir-version }}${{ steps.setupBEAMDynamic.outputs.elixir-version }}-${{ hashFiles('mix.lock') }}
key: deps-${{ runner.os }}-${{ steps.setupBEAM.outputs.otp-version }}-${{ steps.setupBEAM.outputs.elixir-version }}-${{ hashFiles('mix.lock') }}
restore-keys: |
deps-${{ runner.os }}-${{ steps.setupBEAMCurrent.outputs.otp-version }}${{ steps.setupBEAMDynamic.outputs.otp-version }}-${{ steps.setupBEAMCurrent.outputs.elixir-version }}${{ steps.setupBEAMDynamic.outputs.elixir-version }}-
deps-${{ runner.os }}-${{ steps.setupBEAM.outputs.otp-version }}-${{ steps.setupBEAM.outputs.elixir-version }}-
- run: mix deps.get
- uses: actions/cache@v4
with:
path: _build/test
key: compile-${{ env.MIX_ENV }}-${{ runner.os }}-${{ steps.setupBEAMCurrent.outputs.otp-version }}${{ steps.setupBEAMDynamic.outputs.otp-version }}-${{ steps.setupBEAMCurrent.outputs.elixir-version }}${{ steps.setupBEAMDynamic.outputs.elixir-version }}-${{ hashFiles('mix.lock') }}
key: compile-${{ env.MIX_ENV }}-${{ runner.os }}-${{ steps.setupBEAM.outputs.otp-version }}-${{ steps.setupBEAM.outputs.elixir-version }}-${{ hashFiles('mix.lock') }}
restore-keys: |
compile-${{ env.MIX_ENV }}-${{ runner.os }}-${{ steps.setupBEAMCurrent.outputs.otp-version }}${{ steps.setupBEAMDynamic.outputs.otp-version }}-${{ steps.setupBEAMCurrent.outputs.elixir-version }}${{ steps.setupBEAMDynamic.outputs.elixir-version }}-
compile-${{ env.MIX_ENV }}-${{ runner.os }}-${{ steps.setupBEAM.outputs.otp-version }}-${{ steps.setupBEAM.outputs.elixir-version }}-
- run: mix deps.compile
- run: mix compile --warning-as-errors
- run: mix coveralls.github
Expand Down
12 changes: 6 additions & 6 deletions lib/quantum/clock_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ defmodule Quantum.ClockBroadcaster do
|> storage.last_execution_date()
|> case do
:unknown -> start_time
date -> date
date -> DateTime.from_naive!(date, "Etc/UTC")
end
|> NaiveDateTime.truncate(:second)
|> DateTime.truncate(:second)
# Roll back one second since handle_tick will start at `now + 1`.
|> NaiveDateTime.add(-1, :second)
|> DateTime.add(-1, :second)

:timer.send_interval(1000, :tick)

Expand Down Expand Up @@ -80,16 +80,16 @@ defmodule Quantum.ClockBroadcaster do

defp handle_tick(%State{remaining_demand: remaining_demand, time: time} = state)
when remaining_demand > 0 do
now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
now = DateTime.truncate(DateTime.utc_now(), :second)

{events, new_time} =
Enum.reduce_while(
1..remaining_demand,
{[], time},
fn _, {list, time} = acc ->
new_time = NaiveDateTime.add(time, 1, :second)
new_time = DateTime.add(time, 1, :second)

case NaiveDateTime.compare(new_time, now) do
case DateTime.compare(new_time, now) do
:lt ->
{:cont, {[%Event{time: new_time, catch_up: true} | list], new_time}}

Expand Down
2 changes: 1 addition & 1 deletion lib/quantum/clock_broadcaster/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Quantum.ClockBroadcaster.Event do
# Clock Event

@type t :: %__MODULE__{
time: NaiveDateTime.t(),
time: DateTime.t(),
catch_up: boolean()
}

Expand Down
2 changes: 1 addition & 1 deletion lib/quantum/clock_broadcaster/init_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Quantum.ClockBroadcaster.InitOpts do
alias Quantum.{Scheduler, Storage}

@type t :: %__MODULE__{
start_time: NaiveDateTime.t(),
start_time: DateTime.t(),
storage: Storage,
scheduler: Scheduler,
debug_logging: boolean()
Expand Down
2 changes: 1 addition & 1 deletion lib/quantum/clock_broadcaster/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Quantum.ClockBroadcaster.StartOpts do

@type t :: %__MODULE__{
name: GenServer.server(),
start_time: NaiveDateTime.t(),
start_time: DateTime.t(),
storage: Storage,
scheduler: Scheduler,
debug_logging: boolean()
Expand Down
2 changes: 1 addition & 1 deletion lib/quantum/clock_broadcaster/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Quantum.ClockBroadcaster.State do

@type t :: %__MODULE__{
debug_logging: boolean(),
time: NaiveDateTime.t(),
time: DateTime.t(),
remaining_demand: non_neg_integer
}

Expand Down
80 changes: 0 additions & 80 deletions lib/quantum/date_library.ex

This file was deleted.

74 changes: 26 additions & 48 deletions lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ defmodule Quantum.ExecutionBroadcaster do

alias Quantum.ClockBroadcaster.Event, as: ClockEvent

alias Quantum.{
DateLibrary,
DateLibrary.InvalidDateTimeForTimezoneError,
DateLibrary.InvalidTimezoneError
}

alias Quantum.ExecutionBroadcaster.Event, as: ExecuteEvent
alias Quantum.ExecutionBroadcaster.InitOpts
alias Quantum.ExecutionBroadcaster.State
Expand Down Expand Up @@ -202,7 +196,7 @@ defmodule Quantum.ExecutionBroadcaster do
} = state,
time
) do
case NaiveDateTime.compare(time, time_to_execute) do
case DateTime.compare(time, time_to_execute) do
:gt ->
raise "Jobs were skipped"

Expand All @@ -226,7 +220,7 @@ defmodule Quantum.ExecutionBroadcaster do
jobs
|> Enum.reduce(
%{state | execution_timeline: tail},
&add_job_to_state(&1, &2, NaiveDateTime.add(time, 1, :second))
&add_job_to_state(&1, &2, DateTime.add(time, 1, :second))
)
|> sort_state
|> execute_events_to_fire(time)
Expand All @@ -240,11 +234,17 @@ defmodule Quantum.ExecutionBroadcaster do
state,
time
) do
job
|> get_next_execution_time(time)
|> case do
{:ok, date} ->
add_to_state(state, time, date, job)
with {:ok, execution_date} <- get_next_execution_time(job, time) do
add_to_state(state, time, execution_date, job)
else
{:error, :time_zone_not_found} ->
Logger.error(
"Invalid Timezone #{inspect(timezone)} provided for job #{inspect(name)}.",
job: job,
error: :time_zone_not_found
)

state

{:error, _} ->
Logger.warning(fn ->
Expand All @@ -256,54 +256,32 @@ defmodule Quantum.ExecutionBroadcaster do

state
end
rescue
e in InvalidTimezoneError ->
Logger.error(
"Invalid Timezone #{inspect(timezone)} provided for job #{inspect(name)}.",
job: job,
error: e
)

state
end

defp get_next_execution_time(
%Job{schedule: schedule, timezone: timezone, name: name} = job,
%Job{schedule: schedule, timezone: :utc},
time
) do
schedule
|> CrontabScheduler.get_next_run_date(DateLibrary.to_tz!(time, timezone))
|> case do
{:ok, date} ->
{:ok, DateLibrary.to_utc!(date, timezone)}
CrontabScheduler.get_next_run_date(schedule, time)
end

{:error, _} = error ->
error
defp get_next_execution_time(
%Job{schedule: schedule, timezone: timezone},
time
) do
with {:ok, localized_time} <- DateTime.shift_zone(time, timezone),
{:ok, localized_execution_time} <-
CrontabScheduler.get_next_run_date(schedule, localized_time) do
DateTime.shift_zone(localized_execution_time, "Etc/UTC")
end
rescue
_ in InvalidDateTimeForTimezoneError ->
next_time = NaiveDateTime.add(time, 60, :second)

Logger.warning(fn ->
"""
Next execution time for job #{inspect(name)} is not a valid time.
Retrying with #{inspect(next_time)}
"""
end)

get_next_execution_time(job, next_time)
end

defp sort_state(%State{execution_timeline: execution_timeline} = state) do
%{
state
| execution_timeline:
Enum.sort_by(execution_timeline, fn {date, _} -> NaiveDateTime.to_erl(date) end)
}
%{state | execution_timeline: Enum.sort_by(execution_timeline, &elem(&1, 0), DateTime)}
end

defp add_to_state(%State{execution_timeline: execution_timeline} = state, time, date, job) do
unless NaiveDateTime.compare(time, date) in [:lt, :eq] do
unless DateTime.compare(time, date) in [:lt, :eq] do
raise Quantum.ExecutionBroadcaster.JobInPastError
end

Expand Down
2 changes: 1 addition & 1 deletion lib/quantum/execution_broadcaster/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Quantum.ExecutionBroadcaster.State do

@type t :: %__MODULE__{
uninitialized_jobs: [Job.t()],
execution_timeline: [{NaiveDateTime.t(), [Job.t()]}],
execution_timeline: [{DateTime.t(), [Job.t()]}],
storage: StorageAdapter,
storage_pid: StorageAdapter.storage_pid(),
scheduler: Quantum,
Expand Down
Loading
Loading