diff --git a/CHANGELOG.md b/CHANGELOG.md index cf75a6eb47f..c47c0846bfa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,9 @@ and this project adheres to ### Changed +- Support a batch of logs submitted via `run:batch_logs` in run channel + [#4123](https://github.com/OpenFn/lightning/issues/4123) + ### Fixed ## [2.15.7] - 2026-01-07 diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 84153b4ecec..9155db0607c 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -287,6 +287,83 @@ defmodule Lightning.Runs do end end + @doc """ + Appends multiple log lines to a run in a batch operation. + + Returns `{:ok, [%LogLine{}, ...]}` if all logs are inserted successfully, or `{:error, changeset}` + for the first validation error encountered. + """ + def append_run_logs_batch(run, log_entries, scrubber \\ nil) + when is_list(log_entries) do + valid_step_ids = fetch_valid_step_ids(run, log_entries) + + changesets = + Enum.map(log_entries, fn log_entry -> + LogLine.new(run, log_entry, scrubber) + |> Ecto.Changeset.validate_change(:step_id, fn _field, step_id -> + validate_step_id(step_id, valid_step_ids) + end) + end) + + with {:ok, log_lines} <- insert_all_logs(changesets) do + Enum.each(log_lines, &Events.log_appended/1) + + {:ok, log_lines} + end + end + + defp fetch_valid_step_ids(run, log_entries) do + step_ids = + log_entries + |> Enum.map(&(Map.get(&1, "step_id") || Map.get(&1, :step_id))) + |> Enum.reject(&is_nil/1) + |> Enum.uniq() + + if Enum.empty?(step_ids) do + [] + else + from(s in Lightning.RunStep, + where: s.step_id in ^step_ids and s.run_id == ^run.id, + select: s.step_id + ) + |> Repo.all() + end + end + + defp validate_step_id(nil, _valid_step_ids), do: [] + + defp validate_step_id(step_id, valid_step_ids) do + if step_id in valid_step_ids do + [] + else + [{:step_id, "must be associated with the run"}] + end + end + + defp insert_all_logs(changesets) do + case Enum.find(changesets, fn cs -> not cs.valid? end) do + nil -> + entries = + Enum.map(changesets, fn changeset -> + changeset + |> Ecto.Changeset.apply_changes() + |> Map.take(LogLine.__schema__(:fields)) + end) + + {_count, log_lines} = + Repo.insert_all( + Lightning.Invocation.LogLine, + entries, + returning: true + ) + + {:ok, log_lines} + + invalid_changeset -> + {:error, invalid_changeset} + end + end + @doc """ Creates a Step for a given run and job. diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index a8a6d4fe5dc..a080b38db7f 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -224,6 +224,18 @@ defmodule LightningWeb.RunChannel do end end + def handle_in("run:batch_logs", %{"logs" => payload}, socket) do + %{run: run, scrubber: scrubber} = socket.assigns + + case Runs.append_run_logs_batch(run, payload, scrubber) do + {:error, changeset} -> + reply_with(socket, {:error, changeset}) + + {:ok, _} -> + reply_with(socket, :ok) + end + end + # Browser client handlers def handle_in("fetch:run", _payload, socket) do run_id = socket.assigns.run_id diff --git a/test/lightning_web/channels/run_channel_test.exs b/test/lightning_web/channels/run_channel_test.exs index f25c3170324..24c23c6ae30 100644 --- a/test/lightning_web/channels/run_channel_test.exs +++ b/test/lightning_web/channels/run_channel_test.exs @@ -1140,6 +1140,190 @@ defmodule LightningWeb.RunChannelTest do persisted_log_line = Lightning.Repo.one(Lightning.Invocation.LogLine) assert persisted_log_line.timestamp == ~U[2023-11-08 11:57:33.874083Z] end + + test "run:batch_logs inserts multiple log lines in a single operation", %{ + socket: socket, + run: run + } do + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "level" => "info", + "message" => ["First log line"], + "source" => "R/T", + "timestamp" => "1699444653874083" + }, + %{ + "level" => "debug", + "message" => ["Second log line"], + "source" => "R/T", + "timestamp" => "1699444653874084" + }, + %{ + "level" => "error", + "message" => ["Third log line"], + "source" => "R/T", + "timestamp" => "1699444653874085" + } + ] + }) + + assert_reply ref, :ok, _ + + log_lines = + Lightning.Invocation.LogLine + |> where(run_id: ^run.id) + |> order_by(:timestamp) + |> Repo.all() + + assert length(log_lines) == 3 + assert Enum.at(log_lines, 0).message == "First log line" + assert Enum.at(log_lines, 1).message == "Second log line" + assert Enum.at(log_lines, 2).message == "Third log line" + end + + test "run:batch_logs validates all messages", %{ + socket: socket + } do + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "level" => "info", + "message" => ["Valid log"], + "source" => "R/T", + "timestamp" => "1699444653874083" + }, + %{ + "level" => "info", + "timestamp" => "1699444653874084" + # missing message + } + ] + }) + + assert_reply ref, :error, errors + assert errors == %{message: ["This field can't be blank."]} + end + + test "run:batch_logs validates step_id association", %{ + socket: socket, + run: run, + workflow: workflow + } do + [job] = workflow.jobs + step_id_1 = Ecto.UUID.generate() + step_id_2 = Ecto.UUID.generate() + invalid_step_id = Ecto.UUID.generate() + + # Start two steps + ref = + push(socket, "step:start", %{ + "step_id" => step_id_1, + "job_id" => job.id, + "input_dataclip_id" => run.dataclip_id + }) + + assert_reply ref, :ok, _ + + ref = + push(socket, "step:start", %{ + "step_id" => step_id_2, + "job_id" => job.id, + "input_dataclip_id" => run.dataclip_id + }) + + assert_reply ref, :ok, _ + + # Send batch with valid step_ids + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "message" => ["Log for step 1"], + "timestamp" => "1699444653874083", + "step_id" => step_id_1 + }, + %{ + "message" => ["Log for step 2"], + "timestamp" => "1699444653874084", + "step_id" => step_id_2 + }, + %{ + "message" => ["Log without step"], + "timestamp" => "1699444653874085" + } + ] + }) + + assert_reply ref, :ok, _ + + log_lines = + Lightning.Invocation.LogLine + |> where(run_id: ^run.id) + |> order_by(:timestamp) + |> Repo.all() + + assert length(log_lines) == 3 + assert Enum.at(log_lines, 0).step_id == step_id_1 + assert Enum.at(log_lines, 1).step_id == step_id_2 + assert Enum.at(log_lines, 2).step_id == nil + + # Send batch with invalid step_id + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "message" => ["Log with invalid step"], + "timestamp" => "1699444653874086", + "step_id" => invalid_step_id + } + ] + }) + + assert_reply ref, :error, errors + assert errors == %{step_id: ["must be associated with the run"]} + end + + test "run:batch_logs handles empty logs array", %{ + socket: socket + } do + ref = push(socket, "run:batch_logs", %{"logs" => []}) + + assert_reply ref, :ok, _ + end + + test "run:batch_logs broadcasts events for each inserted log line", %{ + socket: socket, + run: run + } do + # Subscribe to run events + Lightning.Runs.Events.subscribe(run) + + ref = + push(socket, "run:batch_logs", %{ + "logs" => [ + %{ + "message" => ["Log 1"], + "timestamp" => "1699444653874083" + }, + %{ + "message" => ["Log 2"], + "timestamp" => "1699444653874084" + } + ] + }) + + assert_reply ref, :ok, _ + + # Should receive 2 log_appended events + assert_receive %Lightning.Runs.Events.LogAppended{log_line: log_line_1} + assert log_line_1.message == "Log 1" + + assert_receive %Lightning.Runs.Events.LogAppended{log_line: log_line_2} + assert log_line_2.message == "Log 2" + end end describe "run:start" do