Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ config :logflare, Logflare.Scheduler,
run_strategy: Quantum.RunStrategy.Local,
schedule: "*/15 * * * *",
task: {Logflare.Sources, :recent_events_touch, []}
],
alerts_scheduler_sync: [
run_strategy: Quantum.RunStrategy.Local,
schedule: "0 * * * *",
Copy link
Contributor Author

@ruslandoga ruslandoga Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's possible for alert schedule be rarer than alerts_scheduler_sync schedule, less than once in 60 minutes? That would probably mean that those jobs would never run, since in the current do_sync_alert_jobs they would keep getting re-added (which in Quantum doesn't seem (?) to execute the job right away but kind of reschedules and effectively postpones them indefinitely).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there are jobs that run on a minutely basis, or every hour

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that behaviour would not be good. Perhaps separate sync schedule jobs, syncing once a day for hourly schedules and once a minute for jobs less than an hour

Copy link
Contributor Author

@ruslandoga ruslandoga Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I was wrong and misunderstood how Quantum works. The original "wipe and re-add" approach is safe for all but the jobs coinciding with alerts_scheduler_sync schedule ("0 * * * *") so I think it still makes sense to make do_sync_alert_jobs a bit smarter. Right now I'm going with something like this

  defp do_sync_alert_jobs do
    wanted_jobs = init_alert_jobs()
    wanted_jobs_set = MapSet.new(wanted_jobs, & &1.name)
    current_jobs = AlertsScheduler.jobs()

    # Delete jobs that are no longer wanted
    Enum.each(current_jobs, fn {name, _job} ->
      if not MapSet.member?(wanted_jobs_set, name) do
        AlertsScheduler.delete_job(name)
      end
    end)

    # Upsert all wanted jobs
    Enum.each(wanted_jobs, &AlertsScheduler.add_job/1)
  end

task: {Logflare.Alerting, :sync_alert_jobs, []}
]
]

Expand Down
13 changes: 11 additions & 2 deletions lib/logflare/alerting.ex
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ defmodule Logflare.Alerting do
@spec create_alert_job_struct(AlertQuery.t()) :: Quantum.Job.t()
def create_alert_job_struct(%AlertQuery{} = alert_query) do
AlertsScheduler.new_job(run_strategy: Quantum.RunStrategy.Local)
|> Quantum.Job.set_task({__MODULE__, :run_alert, [alert_query, :scheduled]})
|> Quantum.Job.set_task({__MODULE__, :run_alert, [alert_query.id, :scheduled]})
|> Quantum.Job.set_schedule(Crontab.CronExpression.Parser.parse!(alert_query.cron))
|> Quantum.Job.set_name(to_job_name(alert_query))
end
Expand Down Expand Up @@ -268,8 +268,17 @@ defmodule Logflare.Alerting do

Send notifications if necessary configurations are set. If no results are returned from the query execution, no alert is sent.
"""
@spec run_alert(AlertQuery.t(), :scheduled) ::
@spec run_alert(AlertQuery.t() | integer(), :scheduled) ::
:ok | {:error, :not_enabled} | {:error, :below_min_cluster_size}
def run_alert(alert_id, :scheduled) when is_integer(alert_id) do
# sync the alert job for the next run
sync_alert_job(alert_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be slightly better to return the alert job (if present) as an :ok tuple from the sync job, then that would allow us to avoid a 2nd query to re-fetch the alert job.

Copy link
Contributor Author

@ruslandoga ruslandoga Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which second query do you mean? The run_alert/2 below seems to operate on AlertQuery, not Quantum.Job, and the job only knows about alert_id now (since we don't really need to put %AlertQuery{} in the job anymore if we refetch the alert definition each time).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync_alert_job would perform 1 db query on the scheduler_node, then perform another db query at get_alert_query_by at line 285, so that would result in 2 db queries being performed.

alternative is to fetch the alert_query first then pass it to sync_alert_job, reversing the order and reducing db queries to 1

Copy link
Contributor Author

@ruslandoga ruslandoga Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to call sync_alert_job which "re-adds" the job from run_query, or is it enough to unschedule the job for alerts that no longer exist, i.e., 4519e14. Or maybe it can just be no-op and sync would then be done periodically (every 60 minutes), but that has another problem: #3059 (comment)

Re-adding the job from run_query/1 feels off for cron jobs somehow, but in light of #3059 (comment) it might be the only way to reliably re-sync active jobs. One possible problem with that approach is when an rare alert gets modified to become less rare, the update won't take place until it runs at least once which might be surprising.

Copy link
Contributor Author

@ruslandoga ruslandoga Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd like to go with no syncing in run_alert/1 since it already uses the up-to-date alert query thanks to fetching it by id on each run (and being no-op if it doesn't exist), and try and update sync_alert_jobs to be a bit smarter (instead of "full" delete followed by "full" insert) to avoid the problem in #3059 (comment)


UPDATE: done.


if alert_query = get_alert_query_by(id: alert_id) do
run_alert(alert_query, :scheduled)
end
end

def run_alert(%AlertQuery{} = alert_query, :scheduled) do
# perform pre-run checks
cfg = Application.get_env(:logflare, Logflare.Alerting)
Expand Down
10 changes: 5 additions & 5 deletions test/logflare/alerting_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ defmodule Logflare.AlertingTest do
assert {:ok,
%Quantum.Job{
run_strategy: %Quantum.RunStrategy.Local{},
task: {Logflare.Alerting, :run_alert, [%AlertQuery{id: ^alert_id}, :scheduled]}
task: {Logflare.Alerting, :run_alert, [^alert_id, :scheduled]}
}} = Alerting.upsert_alert_job(alert)

assert %Quantum.Job{
task: {Logflare.Alerting, :run_alert, [%AlertQuery{id: ^alert_id}, :scheduled]}
task: {Logflare.Alerting, :run_alert, [^alert_id, :scheduled]}
} = Alerting.get_alert_job(alert_id)

assert {:ok, _} = Alerting.delete_alert_job(alert)
Expand All @@ -361,7 +361,7 @@ defmodule Logflare.AlertingTest do
Alerting.upsert_alert_job(alert)
original_job = Alerting.get_alert_job(alert.id)
assert Alerting.get_alert_job(alert.id)
new_alert = %{alert | query: "select 1"}
new_alert = %{alert | cron: "0 0 2 * *"}
Alerting.upsert_alert_job(new_alert)
assert original_job != Alerting.get_alert_job(alert.id)
end
Expand All @@ -371,7 +371,7 @@ defmodule Logflare.AlertingTest do

assert [
%Quantum.Job{
task: {Logflare.Alerting, :run_alert, [%AlertQuery{id: ^alert_id}, :scheduled]}
task: {Logflare.Alerting, :run_alert, [^alert_id, :scheduled]}
}
] = Alerting.init_alert_jobs()

Expand Down Expand Up @@ -408,7 +408,7 @@ defmodule Logflare.AlertingTest do

assert %Quantum.Job{
name: ^job_name,
task: {Logflare.Alerting, :run_alert, [%AlertQuery{id: ^alert_id}, :scheduled]}
task: {Logflare.Alerting, :run_alert, [^alert_id, :scheduled]}
} = Alerting.get_alert_job(alert_id)
end)
end
Expand Down
Loading