Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion tests/etl/taskcluster_pulse/test_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
import pytest

from treeherder.etl.taskcluster_pulse.handler import handle_message, handle_task_defined
from treeherder.etl.taskcluster_pulse.handler import (
create_log_reference,
handle_message,
handle_task_defined,
)

ROOT_URL = "https://firefox-ci-tc.services.mozilla.com"
TASK_ID = "AJBb7wqZT6K9kz4niYAatg"


def test_create_log_reference_emits_live_backing_log_by_default():
logs = create_log_reference(ROOT_URL, TASK_ID, 0)
assert len(logs) == 1
assert logs[0]["name"] == "live_backing_log"
assert logs[0]["url"].endswith(f"task/{TASK_ID}/runs/0/artifacts/public/logs/live_backing.log")


def test_create_log_reference_only_live_backing_log_when_no_raw_log():
artifacts = [
{"name": "public/logs/live_backing.log"},
{"name": "public/test_info/something.txt"},
]
logs = create_log_reference(ROOT_URL, TASK_ID, 0, artifacts=artifacts)
assert len(logs) == 1
assert logs[0]["url"].endswith("artifacts/public/logs/live_backing.log")


def test_create_log_reference_appends_raw_log_when_present():
artifacts = [
{"name": "public/logs/live_backing.log"},
{"name": "public/test_info/xpcshell_raw.log"},
{"name": "public/test_info/mochitest_raw.log"},
]
logs = create_log_reference(ROOT_URL, TASK_ID, 0, artifacts=artifacts)
assert [log["name"] for log in logs] == [
"live_backing_log",
"structured_log",
"structured_log",
]
urls = [log["url"] for log in logs]
assert urls[0].endswith("artifacts/public/logs/live_backing.log")
assert urls[1].endswith("artifacts/public/test_info/xpcshell_raw.log")
assert urls[2].endswith("artifacts/public/test_info/mochitest_raw.log")


@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion treeherder/etl/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def _schedule_log_parsing(job, job_logs, result, repository):
# importing here to avoid an import loop
from treeherder.log_parser.tasks import parse_logs

task_types = {"errorsummary_json", "live_backing_log"}
task_types = {"errorsummary_json", "live_backing_log", "structured_log"}
sheriffed_repos = {
"autoland",
"mozilla-central",
Expand Down
69 changes: 48 additions & 21 deletions treeherder/etl/taskcluster_pulse/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,31 @@ def result_from_run(job_run):
return "unknown"


# Creates a log entry for Treeherder to retrieve and parse. This log is
# displayed on the Treeherder Log Viewer once parsed.
def create_log_reference(root_url, task_id, run_id):
log_url = taskcluster_urls.api(
root_url, "queue", "v1", "task/{taskId}/runs/{runId}/artifacts/public/logs/live_backing.log"
).format(taskId=task_id, runId=run_id)
return {
"name": "live_backing_log",
"url": log_url,
}
# Creates the log entries for Treeherder to retrieve and parse. These logs
# are displayed on the Treeherder Log Viewer once parsed.
# `public/logs/live_backing.log` is always included. Any artifact whose name
# ends with `_raw.log` is appended as an additional reference. All entries
# share the `live_backing_log` JobLog name so the existing parser dispatch
# (jobs.py:_schedule_log_parsing, log_parser/tasks.py:parser_tasks) handles
# them; JobLog's `(job, name, url)` unique constraint allows the duplicates.
def create_log_reference(root_url, task_id, run_id, artifacts=None):
def _ref(name, artifact_path):
return {
"name": name,
"url": taskcluster_urls.api(
root_url,
"queue",
"v1",
f"task/{{taskId}}/runs/{{runId}}/artifacts/{artifact_path}",
).format(taskId=task_id, runId=run_id),
}

logs = [_ref("live_backing_log", "public/logs/live_backing.log")]
for artifact in artifacts or []:
name = artifact.get("name", "")
if name.endswith("_raw.log"):
logs.append(_ref("structured_log", name))
return logs


# Filters the task routes for the treeherder specific route. Once found,
Expand Down Expand Up @@ -371,11 +386,23 @@ async def handle_task_completed(push_info, task, message, session):

job["timeStarted"] = job_run["started"]
job["timeCompleted"] = job_run["resolved"]
job["logs"] = [
create_log_reference(message["root_url"], payload["status"]["taskId"], job_run["runId"]),
]

task_id = payload["status"]["taskId"]
run_id = job_run["runId"]
try:
artifacts = await fetch_artifacts(message["root_url"], task_id, run_id, session)
except Exception:
logger.debug("Artifacts could not be found for task: %s run: %s", task_id, run_id)
artifacts = []

job["logs"] = create_log_reference(message["root_url"], task_id, run_id, artifacts=artifacts)
job = await add_artifact_uploaded_links(
message["root_url"], payload["status"]["taskId"], payload["runId"], job, session
message["root_url"],
task_id,
payload["runId"],
job,
session,
artifacts=artifacts,
)
return job

Expand Down Expand Up @@ -434,13 +461,13 @@ async def fetch_artifacts(root_url, task_id, run_id, session):
# fetch them in order to determine if there is an error_summary log;
# TODO refactor this when there is a way to only retrieve the error_summary
# artifact: https://bugzilla.mozilla.org/show_bug.cgi?id=1629716
async def add_artifact_uploaded_links(root_url, task_id, run_id, job, session):
artifacts = []
try:
artifacts = await fetch_artifacts(root_url, task_id, run_id, session)
except Exception:
logger.debug("Artifacts could not be found for task: %s run: %s", task_id, run_id)
return job
async def add_artifact_uploaded_links(root_url, task_id, run_id, job, session, artifacts=None):
if artifacts is None:
try:
artifacts = await fetch_artifacts(root_url, task_id, run_id, session)
except Exception:
logger.debug("Artifacts could not be found for task: %s run: %s", task_id, run_id)
return job

seen = {}
links = []
Expand Down
78 changes: 77 additions & 1 deletion treeherder/log_parser/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from requests.exceptions import HTTPError

from treeherder.etl.artifact import serialize_artifact_json_blobs, store_job_artifacts
from treeherder.etl.text import astral_filter
from treeherder.log_parser.artifactbuildercollection import (
ArtifactBuilderCollection,
LogSizeError,
)
from treeherder.model.models import Job, JobLog
from treeherder.model.models import Job, JobLog, StructuredLogError
from treeherder.utils.http import fetch_text
from treeherder.workers.task import retryable_task

from . import failureline, intermittents
Expand All @@ -33,6 +35,7 @@ def parse_logs(job_id, job_log_ids, priority):
parser_tasks = {
"errorsummary_json": store_failure_lines,
"live_backing_log": post_log_artifacts,
"structured_log": post_structured_log_artifacts,
}

# We don't want to stop parsing logs for most Exceptions however we still
Expand Down Expand Up @@ -122,6 +125,79 @@ def post_log_artifacts(job_log):
raise


def post_structured_log_artifacts(job_log):
"""Download a structured (mozlog JSON-lines) log and store error entries."""
logger.info("Downloading/parsing structured log for log %s", job_log.id)

try:
log_text = fetch_text(job_log.url)
except HTTPError as e:
job_log.update_status(JobLog.FAILED)
if e.response is not None and e.response.status_code in (403, 404):
logger.warning("Unable to retrieve structured log for %s: %s", job_log.id, e)
return
logger.error("Failed to download structured log for %s: %s", job_log.id, e)
raise
except Exception as e:
job_log.update_status(JobLog.FAILED)
logger.error("Failed to download structured log for %s: %s", job_log.id, e)
raise

if not log_text:
job_log.update_status(JobLog.PARSED)
return

error_entries = []
for raw_line in log_text.splitlines():
try:
entry = json.loads(raw_line)
except (ValueError, TypeError):
continue
if not isinstance(entry, dict):
continue
level = (entry.get("level") or "").upper()
if level not in ("ERROR", "CRITICAL"):
continue

time_value = entry.get("time")
if isinstance(time_value, float):
time_value = int(time_value)
elif not isinstance(time_value, int):
time_value = None

pid_value = entry.get("pid")
if not isinstance(pid_value, int) or pid_value < 0:
pid_value = None

error_entries.append(
StructuredLogError(
job_log=job_log,
action=str(entry.get("action") or "")[:32],
time=time_value,
thread=astral_filter(str(entry.get("thread") or ""))[:255],
pid=pid_value,
source=astral_filter(str(entry.get("source") or ""))[:255],
message=astral_filter(str(entry.get("message") or "")),
level=level[:16],
)
)

try:
StructuredLogError.objects.filter(job_log=job_log).delete()
if error_entries:
StructuredLogError.objects.bulk_create(error_entries)
job_log.update_status(JobLog.PARSED)
logger.info(
"Stored structured log errors for %s %s %s",
job_log.job.repository.name,
job_log.job.id,
job_log.id,
)
except Exception as e:
logger.error("Failed to store structured log errors for %s: %s", job_log.id, e)
raise


def extract_text_log_artifacts(job_log):
"""Generate a set of artifacts by parsing from the raw text log."""

Expand Down
37 changes: 37 additions & 0 deletions treeherder/model/migrations/0049_structuredlogerror.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("model", "0048_alter_failureline_action"),
]

operations = [
migrations.CreateModel(
name="StructuredLogError",
fields=[
("id", models.BigAutoField(primary_key=True, serialize=False)),
("action", models.CharField(blank=True, max_length=32)),
("time", models.BigIntegerField(blank=True, null=True)),
("thread", models.CharField(blank=True, max_length=255)),
("pid", models.PositiveIntegerField(blank=True, null=True)),
("source", models.CharField(blank=True, max_length=255)),
("message", models.TextField(blank=True)),
("level", models.CharField(blank=True, max_length=16)),
(
"job_log",
models.ForeignKey(
on_delete=models.deletion.CASCADE,
related_name="structured_log_error",
to="model.joblog",
),
),
],
options={
"db_table": "structured_log_error",
"indexes": [
models.Index(fields=["job_log"], name="structured__job_log_idx"),
],
},
),
]
23 changes: 23 additions & 0 deletions treeherder/model/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,29 @@ def get_failure_line(self):
return None


class StructuredLogError(models.Model):
"""A detected error entry from a structured (mozlog JSON-lines) log."""

id = models.BigAutoField(primary_key=True)
job_log = models.ForeignKey(
JobLog, on_delete=models.CASCADE, related_name="structured_log_error"
)
action = models.CharField(max_length=32, blank=True)
time = models.BigIntegerField(null=True, blank=True)
thread = models.CharField(max_length=255, blank=True)
pid = models.PositiveIntegerField(null=True, blank=True)
source = models.CharField(max_length=255, blank=True)
message = models.TextField(blank=True)
level = models.CharField(max_length=16, blank=True)

class Meta:
db_table = "structured_log_error"
indexes = [models.Index(fields=["job_log"])]

def __str__(self):
return f"{self.id} {self.job_log_id}"


class TextLogErrorMetadata(models.Model):
"""
Link matching TextLogError and FailureLine instances.
Expand Down