diff --git a/tests/etl/taskcluster_pulse/test_handler.py b/tests/etl/taskcluster_pulse/test_handler.py index ecfe4e0ce17..018dc75ea2d 100644 --- a/tests/etl/taskcluster_pulse/test_handler.py +++ b/tests/etl/taskcluster_pulse/test_handler.py @@ -1,6 +1,48 @@ import pytest -from treeherder.etl.taskcluster_pulse.handler import handle_message, handle_task_defined +from treeherder.etl.taskcluster_pulse.handler import ( + create_log_reference, + handle_message, + handle_task_defined, +) + +ROOT_URL = "https://firefox-ci-tc.services.mozilla.com" +TASK_ID = "AJBb7wqZT6K9kz4niYAatg" + + +def test_create_log_reference_emits_live_backing_log_by_default(): + logs = create_log_reference(ROOT_URL, TASK_ID, 0) + assert len(logs) == 1 + assert logs[0]["name"] == "live_backing_log" + assert logs[0]["url"].endswith(f"task/{TASK_ID}/runs/0/artifacts/public/logs/live_backing.log") + + +def test_create_log_reference_only_live_backing_log_when_no_raw_log(): + artifacts = [ + {"name": "public/logs/live_backing.log"}, + {"name": "public/test_info/something.txt"}, + ] + logs = create_log_reference(ROOT_URL, TASK_ID, 0, artifacts=artifacts) + assert len(logs) == 1 + assert logs[0]["url"].endswith("artifacts/public/logs/live_backing.log") + + +def test_create_log_reference_appends_raw_log_when_present(): + artifacts = [ + {"name": "public/logs/live_backing.log"}, + {"name": "public/test_info/xpcshell_raw.log"}, + {"name": "public/test_info/mochitest_raw.log"}, + ] + logs = create_log_reference(ROOT_URL, TASK_ID, 0, artifacts=artifacts) + assert [log["name"] for log in logs] == [ + "live_backing_log", + "structured_log", + "structured_log", + ] + urls = [log["url"] for log in logs] + assert urls[0].endswith("artifacts/public/logs/live_backing.log") + assert urls[1].endswith("artifacts/public/test_info/xpcshell_raw.log") + assert urls[2].endswith("artifacts/public/test_info/mochitest_raw.log") @pytest.mark.asyncio diff --git a/treeherder/etl/jobs.py b/treeherder/etl/jobs.py index 785d57cd856..caacdedcb03 100644 --- a/treeherder/etl/jobs.py +++ b/treeherder/etl/jobs.py @@ -344,7 +344,7 @@ def _schedule_log_parsing(job, job_logs, result, repository): # importing here to avoid an import loop from treeherder.log_parser.tasks import parse_logs - task_types = {"errorsummary_json", "live_backing_log"} + task_types = {"errorsummary_json", "live_backing_log", "structured_log"} sheriffed_repos = { "autoland", "mozilla-central", diff --git a/treeherder/etl/taskcluster_pulse/handler.py b/treeherder/etl/taskcluster_pulse/handler.py index 1bbc13da36c..f01321c41e3 100644 --- a/treeherder/etl/taskcluster_pulse/handler.py +++ b/treeherder/etl/taskcluster_pulse/handler.py @@ -56,16 +56,31 @@ def result_from_run(job_run): return "unknown" -# Creates a log entry for Treeherder to retrieve and parse. This log is -# displayed on the Treeherder Log Viewer once parsed. -def create_log_reference(root_url, task_id, run_id): - log_url = taskcluster_urls.api( - root_url, "queue", "v1", "task/{taskId}/runs/{runId}/artifacts/public/logs/live_backing.log" - ).format(taskId=task_id, runId=run_id) - return { - "name": "live_backing_log", - "url": log_url, - } +# Creates the log entries for Treeherder to retrieve and parse. These logs +# are displayed on the Treeherder Log Viewer once parsed. +# `public/logs/live_backing.log` is always included. Any artifact whose name +# ends with `_raw.log` is appended as an additional reference. All entries +# share the `live_backing_log` JobLog name so the existing parser dispatch +# (jobs.py:_schedule_log_parsing, log_parser/tasks.py:parser_tasks) handles +# them; JobLog's `(job, name, url)` unique constraint allows the duplicates. +def create_log_reference(root_url, task_id, run_id, artifacts=None): + def _ref(name, artifact_path): + return { + "name": name, + "url": taskcluster_urls.api( + root_url, + "queue", + "v1", + f"task/{{taskId}}/runs/{{runId}}/artifacts/{artifact_path}", + ).format(taskId=task_id, runId=run_id), + } + + logs = [_ref("live_backing_log", "public/logs/live_backing.log")] + for artifact in artifacts or []: + name = artifact.get("name", "") + if name.endswith("_raw.log"): + logs.append(_ref("structured_log", name)) + return logs # Filters the task routes for the treeherder specific route. Once found, @@ -371,11 +386,23 @@ async def handle_task_completed(push_info, task, message, session): job["timeStarted"] = job_run["started"] job["timeCompleted"] = job_run["resolved"] - job["logs"] = [ - create_log_reference(message["root_url"], payload["status"]["taskId"], job_run["runId"]), - ] + + task_id = payload["status"]["taskId"] + run_id = job_run["runId"] + try: + artifacts = await fetch_artifacts(message["root_url"], task_id, run_id, session) + except Exception: + logger.debug("Artifacts could not be found for task: %s run: %s", task_id, run_id) + artifacts = [] + + job["logs"] = create_log_reference(message["root_url"], task_id, run_id, artifacts=artifacts) job = await add_artifact_uploaded_links( - message["root_url"], payload["status"]["taskId"], payload["runId"], job, session + message["root_url"], + task_id, + payload["runId"], + job, + session, + artifacts=artifacts, ) return job @@ -434,13 +461,13 @@ async def fetch_artifacts(root_url, task_id, run_id, session): # fetch them in order to determine if there is an error_summary log; # TODO refactor this when there is a way to only retrieve the error_summary # artifact: https://bugzilla.mozilla.org/show_bug.cgi?id=1629716 -async def add_artifact_uploaded_links(root_url, task_id, run_id, job, session): - artifacts = [] - try: - artifacts = await fetch_artifacts(root_url, task_id, run_id, session) - except Exception: - logger.debug("Artifacts could not be found for task: %s run: %s", task_id, run_id) - return job +async def add_artifact_uploaded_links(root_url, task_id, run_id, job, session, artifacts=None): + if artifacts is None: + try: + artifacts = await fetch_artifacts(root_url, task_id, run_id, session) + except Exception: + logger.debug("Artifacts could not be found for task: %s run: %s", task_id, run_id) + return job seen = {} links = [] diff --git a/treeherder/log_parser/tasks.py b/treeherder/log_parser/tasks.py index 827a2bb2e50..bfeaf0c8408 100644 --- a/treeherder/log_parser/tasks.py +++ b/treeherder/log_parser/tasks.py @@ -6,11 +6,13 @@ from requests.exceptions import HTTPError from treeherder.etl.artifact import serialize_artifact_json_blobs, store_job_artifacts +from treeherder.etl.text import astral_filter from treeherder.log_parser.artifactbuildercollection import ( ArtifactBuilderCollection, LogSizeError, ) -from treeherder.model.models import Job, JobLog +from treeherder.model.models import Job, JobLog, StructuredLogError +from treeherder.utils.http import fetch_text from treeherder.workers.task import retryable_task from . import failureline, intermittents @@ -33,6 +35,7 @@ def parse_logs(job_id, job_log_ids, priority): parser_tasks = { "errorsummary_json": store_failure_lines, "live_backing_log": post_log_artifacts, + "structured_log": post_structured_log_artifacts, } # We don't want to stop parsing logs for most Exceptions however we still @@ -122,6 +125,79 @@ def post_log_artifacts(job_log): raise +def post_structured_log_artifacts(job_log): + """Download a structured (mozlog JSON-lines) log and store error entries.""" + logger.info("Downloading/parsing structured log for log %s", job_log.id) + + try: + log_text = fetch_text(job_log.url) + except HTTPError as e: + job_log.update_status(JobLog.FAILED) + if e.response is not None and e.response.status_code in (403, 404): + logger.warning("Unable to retrieve structured log for %s: %s", job_log.id, e) + return + logger.error("Failed to download structured log for %s: %s", job_log.id, e) + raise + except Exception as e: + job_log.update_status(JobLog.FAILED) + logger.error("Failed to download structured log for %s: %s", job_log.id, e) + raise + + if not log_text: + job_log.update_status(JobLog.PARSED) + return + + error_entries = [] + for raw_line in log_text.splitlines(): + try: + entry = json.loads(raw_line) + except (ValueError, TypeError): + continue + if not isinstance(entry, dict): + continue + level = (entry.get("level") or "").upper() + if level not in ("ERROR", "CRITICAL"): + continue + + time_value = entry.get("time") + if isinstance(time_value, float): + time_value = int(time_value) + elif not isinstance(time_value, int): + time_value = None + + pid_value = entry.get("pid") + if not isinstance(pid_value, int) or pid_value < 0: + pid_value = None + + error_entries.append( + StructuredLogError( + job_log=job_log, + action=str(entry.get("action") or "")[:32], + time=time_value, + thread=astral_filter(str(entry.get("thread") or ""))[:255], + pid=pid_value, + source=astral_filter(str(entry.get("source") or ""))[:255], + message=astral_filter(str(entry.get("message") or "")), + level=level[:16], + ) + ) + + try: + StructuredLogError.objects.filter(job_log=job_log).delete() + if error_entries: + StructuredLogError.objects.bulk_create(error_entries) + job_log.update_status(JobLog.PARSED) + logger.info( + "Stored structured log errors for %s %s %s", + job_log.job.repository.name, + job_log.job.id, + job_log.id, + ) + except Exception as e: + logger.error("Failed to store structured log errors for %s: %s", job_log.id, e) + raise + + def extract_text_log_artifacts(job_log): """Generate a set of artifacts by parsing from the raw text log.""" diff --git a/treeherder/model/migrations/0049_structuredlogerror.py b/treeherder/model/migrations/0049_structuredlogerror.py new file mode 100644 index 00000000000..dcb7c626930 --- /dev/null +++ b/treeherder/model/migrations/0049_structuredlogerror.py @@ -0,0 +1,37 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("model", "0048_alter_failureline_action"), + ] + + operations = [ + migrations.CreateModel( + name="StructuredLogError", + fields=[ + ("id", models.BigAutoField(primary_key=True, serialize=False)), + ("action", models.CharField(blank=True, max_length=32)), + ("time", models.BigIntegerField(blank=True, null=True)), + ("thread", models.CharField(blank=True, max_length=255)), + ("pid", models.PositiveIntegerField(blank=True, null=True)), + ("source", models.CharField(blank=True, max_length=255)), + ("message", models.TextField(blank=True)), + ("level", models.CharField(blank=True, max_length=16)), + ( + "job_log", + models.ForeignKey( + on_delete=models.deletion.CASCADE, + related_name="structured_log_error", + to="model.joblog", + ), + ), + ], + options={ + "db_table": "structured_log_error", + "indexes": [ + models.Index(fields=["job_log"], name="structured__job_log_idx"), + ], + }, + ), + ] diff --git a/treeherder/model/models.py b/treeherder/model/models.py index 3b2d04fc134..9eaa36c90e3 100644 --- a/treeherder/model/models.py +++ b/treeherder/model/models.py @@ -1324,6 +1324,29 @@ def get_failure_line(self): return None +class StructuredLogError(models.Model): + """A detected error entry from a structured (mozlog JSON-lines) log.""" + + id = models.BigAutoField(primary_key=True) + job_log = models.ForeignKey( + JobLog, on_delete=models.CASCADE, related_name="structured_log_error" + ) + action = models.CharField(max_length=32, blank=True) + time = models.BigIntegerField(null=True, blank=True) + thread = models.CharField(max_length=255, blank=True) + pid = models.PositiveIntegerField(null=True, blank=True) + source = models.CharField(max_length=255, blank=True) + message = models.TextField(blank=True) + level = models.CharField(max_length=16, blank=True) + + class Meta: + db_table = "structured_log_error" + indexes = [models.Index(fields=["job_log"])] + + def __str__(self): + return f"{self.id} {self.job_log_id}" + + class TextLogErrorMetadata(models.Model): """ Link matching TextLogError and FailureLine instances.