Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ and this project adheres to

### Changed

- Support a batch of logs submitted via `run:batch_logs` in run channel
[#4123](https://github.com/OpenFn/lightning/issues/4123)

### Fixed

## [2.15.7] - 2026-01-07
Expand Down
77 changes: 77 additions & 0 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,83 @@ defmodule Lightning.Runs do
end
end

@doc """
Appends multiple log lines to a run in a batch operation.

Returns `{:ok, [%LogLine{}, ...]}` if all logs are inserted successfully, or `{:error, changeset}`
for the first validation error encountered.
"""
def append_run_logs_batch(run, log_entries, scrubber \\ nil)
when is_list(log_entries) do
valid_step_ids = fetch_valid_step_ids(run, log_entries)

changesets =
Enum.map(log_entries, fn log_entry ->
LogLine.new(run, log_entry, scrubber)
|> Ecto.Changeset.validate_change(:step_id, fn _field, step_id ->
validate_step_id(step_id, valid_step_ids)
end)
end)

with {:ok, log_lines} <- insert_all_logs(changesets) do
Enum.each(log_lines, &Events.log_appended/1)

{:ok, log_lines}
end
end

defp fetch_valid_step_ids(run, log_entries) do
step_ids =
log_entries
|> Enum.map(&(Map.get(&1, "step_id") || Map.get(&1, :step_id)))
|> Enum.reject(&is_nil/1)
|> Enum.uniq()

if Enum.empty?(step_ids) do
[]
else
from(s in Lightning.RunStep,
where: s.step_id in ^step_ids and s.run_id == ^run.id,
select: s.step_id
)
|> Repo.all()
end
end

defp validate_step_id(nil, _valid_step_ids), do: []

defp validate_step_id(step_id, valid_step_ids) do
if step_id in valid_step_ids do
[]
else
[{:step_id, "must be associated with the run"}]
end
end

defp insert_all_logs(changesets) do
case Enum.find(changesets, fn cs -> not cs.valid? end) do
nil ->
entries =
Enum.map(changesets, fn changeset ->
changeset
|> Ecto.Changeset.apply_changes()
|> Map.take(LogLine.__schema__(:fields))
end)

{_count, log_lines} =
Repo.insert_all(
Lightning.Invocation.LogLine,
entries,
returning: true
)

{:ok, log_lines}

invalid_changeset ->
{:error, invalid_changeset}
end
end

@doc """
Creates a Step for a given run and job.

Expand Down
12 changes: 12 additions & 0 deletions lib/lightning_web/channels/run_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ defmodule LightningWeb.RunChannel do
end
end

def handle_in("run:batch_logs", %{"logs" => payload}, socket) do
%{run: run, scrubber: scrubber} = socket.assigns

case Runs.append_run_logs_batch(run, payload, scrubber) do
{:error, changeset} ->
reply_with(socket, {:error, changeset})

{:ok, _} ->
reply_with(socket, :ok)
end
end

# Browser client handlers
def handle_in("fetch:run", _payload, socket) do
run_id = socket.assigns.run_id
Expand Down
184 changes: 184 additions & 0 deletions test/lightning_web/channels/run_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,190 @@ defmodule LightningWeb.RunChannelTest do
persisted_log_line = Lightning.Repo.one(Lightning.Invocation.LogLine)
assert persisted_log_line.timestamp == ~U[2023-11-08 11:57:33.874083Z]
end

test "run:batch_logs inserts multiple log lines in a single operation", %{
socket: socket,
run: run
} do
ref =
push(socket, "run:batch_logs", %{
"logs" => [
%{
"level" => "info",
"message" => ["First log line"],
"source" => "R/T",
"timestamp" => "1699444653874083"
},
%{
"level" => "debug",
"message" => ["Second log line"],
"source" => "R/T",
"timestamp" => "1699444653874084"
},
%{
"level" => "error",
"message" => ["Third log line"],
"source" => "R/T",
"timestamp" => "1699444653874085"
}
]
})

assert_reply ref, :ok, _

log_lines =
Lightning.Invocation.LogLine
|> where(run_id: ^run.id)
|> order_by(:timestamp)
|> Repo.all()

assert length(log_lines) == 3
assert Enum.at(log_lines, 0).message == "First log line"
assert Enum.at(log_lines, 1).message == "Second log line"
assert Enum.at(log_lines, 2).message == "Third log line"
end

test "run:batch_logs validates all messages", %{
socket: socket
} do
ref =
push(socket, "run:batch_logs", %{
"logs" => [
%{
"level" => "info",
"message" => ["Valid log"],
"source" => "R/T",
"timestamp" => "1699444653874083"
},
%{
"level" => "info",
"timestamp" => "1699444653874084"
# missing message
}
]
})

assert_reply ref, :error, errors
assert errors == %{message: ["This field can't be blank."]}
end

test "run:batch_logs validates step_id association", %{
socket: socket,
run: run,
workflow: workflow
} do
[job] = workflow.jobs
step_id_1 = Ecto.UUID.generate()
step_id_2 = Ecto.UUID.generate()
invalid_step_id = Ecto.UUID.generate()

# Start two steps
ref =
push(socket, "step:start", %{
"step_id" => step_id_1,
"job_id" => job.id,
"input_dataclip_id" => run.dataclip_id
})

assert_reply ref, :ok, _

ref =
push(socket, "step:start", %{
"step_id" => step_id_2,
"job_id" => job.id,
"input_dataclip_id" => run.dataclip_id
})

assert_reply ref, :ok, _

# Send batch with valid step_ids
ref =
push(socket, "run:batch_logs", %{
"logs" => [
%{
"message" => ["Log for step 1"],
"timestamp" => "1699444653874083",
"step_id" => step_id_1
},
%{
"message" => ["Log for step 2"],
"timestamp" => "1699444653874084",
"step_id" => step_id_2
},
%{
"message" => ["Log without step"],
"timestamp" => "1699444653874085"
}
]
})

assert_reply ref, :ok, _

log_lines =
Lightning.Invocation.LogLine
|> where(run_id: ^run.id)
|> order_by(:timestamp)
|> Repo.all()

assert length(log_lines) == 3
assert Enum.at(log_lines, 0).step_id == step_id_1
assert Enum.at(log_lines, 1).step_id == step_id_2
assert Enum.at(log_lines, 2).step_id == nil

# Send batch with invalid step_id
ref =
push(socket, "run:batch_logs", %{
"logs" => [
%{
"message" => ["Log with invalid step"],
"timestamp" => "1699444653874086",
"step_id" => invalid_step_id
}
]
})

assert_reply ref, :error, errors
assert errors == %{step_id: ["must be associated with the run"]}
end

test "run:batch_logs handles empty logs array", %{
socket: socket
} do
ref = push(socket, "run:batch_logs", %{"logs" => []})

assert_reply ref, :ok, _
end

test "run:batch_logs broadcasts events for each inserted log line", %{
socket: socket,
run: run
} do
# Subscribe to run events
Lightning.Runs.Events.subscribe(run)

ref =
push(socket, "run:batch_logs", %{
"logs" => [
%{
"message" => ["Log 1"],
"timestamp" => "1699444653874083"
},
%{
"message" => ["Log 2"],
"timestamp" => "1699444653874084"
}
]
})

assert_reply ref, :ok, _

# Should receive 2 log_appended events
assert_receive %Lightning.Runs.Events.LogAppended{log_line: log_line_1}
assert log_line_1.message == "Log 1"

assert_receive %Lightning.Runs.Events.LogAppended{log_line: log_line_2}
assert log_line_2.message == "Log 2"
end
end

describe "run:start" do
Expand Down