Skip to content

Commit 3a1b43c

Browse files
jrcastro2kpsherva
authored andcommitted
logs: fix logs ordering, group by subtask
* closes CERNDocumentServer/cds-rdm#615
1 parent a402542 commit 3a1b43c

File tree

9 files changed

+200
-24
lines changed

9 files changed

+200
-24
lines changed

invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsLogs.js

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,63 @@ import { http, withCancel } from "react-invenio-forms";
2222
import { DateTime } from "luxon";
2323
import { i18next } from "@translations/invenio_jobs/i18next";
2424

25+
/**
26+
* TaskGroup component - displays logs for a single task group
27+
* @param {Object} taskGroup - Task group containing taskId, taskName, parentTaskId, and logs
28+
* @param {Object} levelClass - Mapping of log levels to CSS classes
29+
*/
30+
const TaskGroup = ({ taskGroup, levelClass }) => (
31+
<div className={taskGroup.parentTaskId ? "subtask-container" : ""}>
32+
{taskGroup.logs.map((log) => (
33+
<div
34+
key={`${log.timestamp}-${log.level}-${log.message}`}
35+
className={`log-line ${log.level.toLowerCase()}`}
36+
>
37+
<span className="log-timestamp">[{log.formatted_timestamp}]</span>{" "}
38+
<span className={levelClass[log.level] || ""}>{log.level}</span>{" "}
39+
<span className="log-message">{log.message}</span>
40+
</div>
41+
))}
42+
</div>
43+
);
44+
45+
TaskGroup.propTypes = {
46+
taskGroup: PropTypes.shape({
47+
taskId: PropTypes.string.isRequired,
48+
parentTaskId: PropTypes.string,
49+
logs: PropTypes.array.isRequired,
50+
}).isRequired,
51+
levelClass: PropTypes.object.isRequired,
52+
};
53+
2554
export class RunsLogs extends Component {
2655
constructor(props) {
2756
super(props);
2857

2958
const { logs, run, sort, warnings } = props;
3059

60+
const formattedLogs = logs.map((log) => ({
61+
...log,
62+
formatted_timestamp: DateTime.fromISO(log.timestamp).toFormat(
63+
"yyyy-MM-dd HH:mm"
64+
),
65+
}));
66+
3167
this.state = {
3268
error: null,
33-
logs: logs.map((log) => ({
34-
...log,
35-
formatted_timestamp: DateTime.fromISO(log.timestamp).toFormat(
36-
"yyyy-MM-dd HH:mm"
37-
),
38-
})),
69+
logs: formattedLogs,
3970
run,
4071
sort,
4172
warnings: warnings || [],
4273
runDuration: this.getDurationInMinutes(run.started_at, run.finished_at),
4374
formatted_started_at: this.formatDatetime(run.started_at),
4475
};
76+
77+
// Cache for memoized log tree
78+
this.logTreeCache = {
79+
logs: null,
80+
tree: null,
81+
};
4582
}
4683

4784
componentDidMount() {
@@ -60,6 +97,18 @@ export class RunsLogs extends Component {
6097
this.statusFetchCancel?.cancel();
6198
}
6299

100+
getLogTree() {
101+
const { logs } = this.state;
102+
// Return cached tree if logs haven't changed
103+
if (this.logTreeCache.logs === logs) {
104+
return this.logTreeCache.tree;
105+
}
106+
// Rebuild tree and update cache
107+
const tree = this.buildLogTree(logs);
108+
this.logTreeCache = { logs, tree };
109+
return tree;
110+
}
111+
63112
getDurationInMinutes(startedAt, finishedAt) {
64113
if (!startedAt) return 0;
65114
const start = DateTime.fromISO(startedAt);
@@ -71,6 +120,32 @@ export class RunsLogs extends Component {
71120
return ts ? DateTime.fromISO(ts).toFormat("yyyy-MM-dd HH:mm") : null;
72121
}
73122

123+
buildLogTree = (logs) => {
124+
/**
125+
* Build flat task groups from log list.
126+
* Returns array of task groups, each with taskId, parentTaskId, and logs.
127+
* Root tasks have parentTaskId == null; subtasks have parentTaskId set.
128+
*/
129+
const taskGroups = {};
130+
131+
logs.forEach((log) => {
132+
const context = log.context || {};
133+
const taskId = context.task_id || "unknown";
134+
135+
if (!taskGroups[taskId]) {
136+
taskGroups[taskId] = {
137+
taskId,
138+
parentTaskId: context.parent_task_id || null,
139+
logs: [],
140+
};
141+
}
142+
143+
taskGroups[taskId].logs.push(log);
144+
});
145+
146+
return Object.values(taskGroups);
147+
};
148+
74149
fetchLogs = async (runId, sort) => {
75150
try {
76151
const searchAfterParams = (sort || [])
@@ -153,6 +228,7 @@ export class RunsLogs extends Component {
153228
formatted_started_at: formattedStartedAt,
154229
warnings,
155230
} = this.state;
231+
const logTree = this.getLogTree();
156232
const levelClass = {
157233
DEBUG: "",
158234
INFO: "primary",
@@ -254,7 +330,7 @@ export class RunsLogs extends Component {
254330
</List.Item>
255331
</List>
256332
</Grid.Column>
257-
<Grid.Column className="log-table" width={13}>
333+
<Grid.Column className="job-log-table" width={13}>
258334
{/* Display error message for failed jobs */}
259335
{(run.status === "FAILED" ||
260336
run.status === "PARTIAL_SUCCESS") && (
@@ -278,19 +354,15 @@ export class RunsLogs extends Component {
278354
</Message>
279355
)}
280356
<Segment>
281-
{logs.map((log) => (
282-
<div
283-
key={`${log.timestamp}-${log.level}-${log.message}`}
284-
className={`log-line ${log.level.toLowerCase()}`}
285-
>
286-
<span className="log-timestamp">
287-
[{log.formatted_timestamp}]
288-
</span>{" "}
289-
<span className={levelClass[log.level] || ""}>
290-
{log.level}
291-
</span>{" "}
292-
<span className="log-message">{log.message}</span>
293-
</div>
357+
{logTree.map((taskGroup) => (
358+
<React.Fragment key={taskGroup.taskId}>
359+
{!taskGroup.parentTaskId &&
360+
logTree.indexOf(taskGroup) > 0 && <Divider />}
361+
<TaskGroup
362+
taskGroup={taskGroup}
363+
levelClass={levelClass}
364+
/>
365+
</React.Fragment>
294366
))}
295367
</Segment>
296368
</Grid.Column>

invenio_jobs/logging/celery_signals.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def capture_context(sender=None, headers=None, body=None, **kwargs):
2323
if (
2424
"context" not in headers and job_context.get() is not EMPTY_JOB_CTX
2525
): # Ensure context is only added if missing
26-
headers["context"] = job_context.get()
26+
headers["context"] = dict(job_context.get())
2727

2828

2929
# Restore context when a task starts executing
@@ -35,6 +35,12 @@ def restore_context(task=None, **kwargs):
3535
if job_context.get() is EMPTY_JOB_CTX:
3636
task_context = getattr(task.request, "context", None)
3737
if task_context:
38+
# Update context with current task's metadata from Celery's own request
39+
task_context = dict(task_context)
40+
task_context["task_id"] = str(task.request.id)
41+
task_context["parent_task_id"] = (
42+
str(task.request.parent_id) if task.request.parent_id else None
43+
)
3844
token = job_context.set(task_context)
3945
# Store token in task.request
4046
task.request._job_context_token = token

invenio_jobs/logging/index_templates/os-v1/job-logs-v1.0.0.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
"properties": {
1616
"job_id": { "type": "keyword" },
1717
"run_id": { "type": "keyword" },
18-
"identity_id": { "type": "keyword" }
18+
"identity_id": { "type": "keyword" },
19+
"task_id": { "type": "keyword" },
20+
"parent_task_id": { "type": "keyword" }
1921
},
2022
"dynamic": true
2123
}

invenio_jobs/logging/index_templates/os-v2/job-logs-v1.0.0.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
"properties": {
1616
"job_id": { "type": "keyword" },
1717
"run_id": { "type": "keyword" },
18-
"identity_id": { "type": "keyword" }
18+
"identity_id": { "type": "keyword" },
19+
"task_id": { "type": "keyword" },
20+
"parent_task_id": { "type": "keyword" }
1921
},
2022
"dynamic": true
2123
}

invenio_jobs/logging/jobs.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,16 @@ def emit(self, record):
4444

4545
def enrich_log(self, record):
4646
"""Enrich log record with contextvars' global context."""
47+
context = dict(job_context.get())
48+
4749
log_data = {
4850
"timestamp": datetime.now().isoformat(),
4951
"level": record.levelname,
5052
"message": record.getMessage(),
5153
"module": record.module,
5254
"function": record.funcName,
5355
"line": record.lineno,
54-
"context": job_context.get(),
56+
"context": context,
5557
}
5658
serialized_data = JobLogEntrySchema().load(log_data)
5759
return serialized_data

invenio_jobs/services/schema.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ class LogContextSchema(Schema):
389389
job_id = fields.Str(required=True)
390390
run_id = fields.Str(required=True)
391391
identity_id = fields.Str(required=True)
392+
task_id = fields.Str(allow_none=True)
393+
parent_task_id = fields.Str(allow_none=True)
392394

393395

394396
class JobLogEntrySchema(Schema):

invenio_jobs/services/services.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,18 @@ def search(self, identity, params):
556556
# Track if we're truncating results
557557
truncated = total > max_docs
558558

559+
# Sort chronologically by timestamp and _id
560+
# Note: Hierarchical grouping by task hierarchy is handled client-side
561+
# in the UI via buildLogTree() function
562+
# This is not done in the backend by updating the sort parameters due
563+
# to 2 main reasons:
564+
# 1 - For dynamically loading the logs while the jobs is being executed
565+
# we use the search_after parameter that will return only new values,
566+
# meaning that they cant be properly grouped in the backend without the
567+
# risk of being splitted by this.
568+
# 2 - By changing the sort order from timestamp to prioritize the
569+
# task_id, the logs will loose the chronological order. There is no easy
570+
# way to group by task_id and prioritize by timestamp.
559571
search = search.sort("@timestamp", "_id").extra(size=batch_size)
560572
if search_after:
561573
search = search.extra(search_after=search_after)

invenio_jobs/tasks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ def execute_run(self, run_id, identity_id, kwargs=None):
6868
"run_id": str(run_id),
6969
"job_id": str(run.job.id),
7070
"identity_id": str(identity_id),
71+
"task_id": str(self.request.id),
72+
"parent_task_id": (
73+
str(self.request.parent_id) if self.request.parent_id else None
74+
),
7175
}
7276
):
7377
update_run(

tests/test_services_job_logs.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
# Invenio-Jobs is free software; you can redistribute it and/or modify it
77
# under the terms of the MIT License; see LICENSE file for more details.
88

9+
from datetime import datetime, timezone
10+
911
import pytest
1012

13+
from invenio_jobs.api import AttrDict
1114
from invenio_jobs.proxies import current_jobs_logs_service
1215

1316

@@ -103,3 +106,74 @@ def fake_search(self, *args, **kwargs):
103106
assert "warnings" not in payload
104107

105108
assert payload["hits"]["sort"] == hits[2].meta.sort # Last hit (log-1)
109+
110+
111+
def _make_hit_with_lineage(idx, task_id, parent_task_id=None):
112+
"""Create a fake search hit with task lineage metadata."""
113+
timestamp = datetime(2025, 1, 1, tzinfo=timezone.utc).timestamp() + idx
114+
sort_value = [
115+
task_id,
116+
idx,
117+
timestamp,
118+
f"id-{idx}",
119+
]
120+
hit = AttrDict(
121+
{
122+
"@timestamp": datetime.fromtimestamp(timestamp, timezone.utc).isoformat(),
123+
"level": "INFO",
124+
"message": f"log-{idx}",
125+
"module": "tests",
126+
"function": "fn",
127+
"line": idx,
128+
"context": {
129+
"job_id": "job-123",
130+
"run_id": "run-456",
131+
"identity_id": "user-789",
132+
"task_id": task_id,
133+
"parent_task_id": parent_task_id,
134+
},
135+
"sort": sort_value,
136+
}
137+
)
138+
hit.meta = AttrDict({"sort": sort_value})
139+
return hit
140+
141+
142+
@pytest.mark.usefixtures("app")
143+
def test_job_logs_search_with_task_lineage(monkeypatch, anon_identity, app, FakeSearch):
144+
"""Service returns logs with task lineage metadata."""
145+
service = current_jobs_logs_service
146+
147+
original_max = app.config.get("JOBS_LOGS_MAX_RESULTS")
148+
original_batch = app.config.get("JOBS_LOGS_BATCH_SIZE")
149+
app.config["JOBS_LOGS_MAX_RESULTS"] = 10
150+
app.config["JOBS_LOGS_BATCH_SIZE"] = 10
151+
152+
# Create hits with task lineage
153+
hits = [
154+
_make_hit_with_lineage(1, "task-A"),
155+
_make_hit_with_lineage(2, "task-A"),
156+
_make_hit_with_lineage(1, "task-B", "task-A"),
157+
_make_hit_with_lineage(2, "task-B", "task-A"),
158+
]
159+
160+
def fake_search(self, *args, **kwargs):
161+
return FakeSearch(hits)
162+
163+
monkeypatch.setattr(service.__class__, "_search", fake_search)
164+
165+
payload = None
166+
try:
167+
with app.app_context():
168+
result = service.search(anon_identity, {"q": "test"})
169+
payload = result.to_dict()
170+
finally:
171+
app.config["JOBS_LOGS_MAX_RESULTS"] = original_max
172+
app.config["JOBS_LOGS_BATCH_SIZE"] = original_batch
173+
174+
assert payload["hits"]["total"] == len(hits)
175+
assert len(payload["hits"]["hits"]) == len(hits)
176+
# Verify task lineage fields are present
177+
first_hit = payload["hits"]["hits"][0]
178+
assert "task_id" in first_hit["context"]
179+
assert "parent_task_id" in first_hit["context"]

0 commit comments

Comments
 (0)