|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from banal import ensure_dict |
| 4 | + |
| 5 | + |
| 6 | +def extract_job_summary(task_kwargs: dict) -> dict: |
| 7 | + """Extract useful structured fields from job task_kwargs.""" |
| 8 | + summary: dict = {} |
| 9 | + dataset = task_kwargs.get("dataset") |
| 10 | + if dataset: |
| 11 | + summary["dataset"] = dataset |
| 12 | + batch = task_kwargs.get("batch") |
| 13 | + if batch: |
| 14 | + summary["batch"] = batch |
| 15 | + payload = task_kwargs.get("payload") |
| 16 | + if not isinstance(payload, dict): |
| 17 | + return summary |
| 18 | + entities = payload.get("entities") |
| 19 | + if not isinstance(entities, list) or not entities: |
| 20 | + return summary |
| 21 | + summary["entities_count"] = len(entities) |
| 22 | + entity_ids = [e["id"] for e in entities if isinstance(e, dict) and e.get("id")] |
| 23 | + if entity_ids: |
| 24 | + summary["entity_id_min"] = min(entity_ids) |
| 25 | + summary["entity_id_max"] = max(entity_ids) |
| 26 | + if len(entity_ids) < 11: |
| 27 | + summary["entity_ids"] = entity_ids |
| 28 | + content_hashes = [ |
| 29 | + h |
| 30 | + for e in entities |
| 31 | + if isinstance(e, dict) |
| 32 | + for h in ensure_dict(e.get("properties")).get("contentHash", []) |
| 33 | + ] |
| 34 | + if content_hashes: |
| 35 | + summary["content_hash_min"] = min(content_hashes) |
| 36 | + summary["content_hash_max"] = max(content_hashes) |
| 37 | + return summary |
| 38 | + |
| 39 | + |
| 40 | +def patch_procrastinate_logging(): |
| 41 | + """Patch procrastinate to produce concise, structured job logs. |
| 42 | +
|
| 43 | + Upstream procrastinate logs full repr() of all task kwargs in |
| 44 | + Job.call_string, creating extremely noisy log lines when payloads |
| 45 | + contain entity data. This patch: |
| 46 | + 1. Shortens call_string to just task_name[id] |
| 47 | + 2. Strips the full payload from log_context() |
| 48 | + 3. Promotes dataset/entity/hash summary to top-level structlog kwargs |
| 49 | + """ |
| 50 | + from procrastinate.jobs import Job |
| 51 | + from procrastinate.worker import Worker |
| 52 | + |
| 53 | + @property |
| 54 | + def call_string(self): |
| 55 | + return f"{self.task_name}[{self.id}]" |
| 56 | + |
| 57 | + Job.call_string = call_string |
| 58 | + |
| 59 | + _original_log_context = Job.log_context |
| 60 | + |
| 61 | + def log_context(self): |
| 62 | + ctx = _original_log_context(self) |
| 63 | + ctx.pop("task_kwargs", None) |
| 64 | + ctx.update(extract_job_summary(self.task_kwargs)) |
| 65 | + return ctx |
| 66 | + |
| 67 | + Job.log_context = log_context |
| 68 | + |
| 69 | + _original_log_extra = Worker._log_extra |
| 70 | + |
| 71 | + def _log_extra(self, action, context, job_result, **kwargs): |
| 72 | + extra = _original_log_extra( |
| 73 | + self, action=action, context=context, job_result=job_result, **kwargs |
| 74 | + ) |
| 75 | + if context: |
| 76 | + extra["task_name"] = context.job.task_name |
| 77 | + extra["queue_name"] = context.job.queue |
| 78 | + extra.update(extract_job_summary(context.job.task_kwargs)) |
| 79 | + return extra |
| 80 | + |
| 81 | + Worker._log_extra = _log_extra |
0 commit comments