|
| 1 | +import datetime |
| 2 | +import json |
| 3 | + |
| 4 | +import humanize |
| 5 | +from dateutil.parser import isoparse |
| 6 | + |
| 7 | + |
| 8 | +class TaskRun: |
| 9 | + """Representation of a Tekton Kubernetes TaskRun object""" |
| 10 | + |
| 11 | + # Possible status values |
| 12 | + SUCCEEDED = "succeeded" |
| 13 | + FAILED = "failed" |
| 14 | + UNKNOWN = "unknown" |
| 15 | + |
| 16 | + def __init__(self, obj: dict) -> None: |
| 17 | + self.obj = obj |
| 18 | + |
| 19 | + @property |
| 20 | + def pipelinetask(self) -> str: |
| 21 | + return self.obj["metadata"]["labels"]["tekton.dev/pipelineTask"] |
| 22 | + |
| 23 | + @property |
| 24 | + def start_time(self) -> datetime.datetime: |
| 25 | + return isoparse(self.obj["status"]["startTime"]) |
| 26 | + |
| 27 | + @property |
| 28 | + def completion_time(self) -> datetime.datetime: |
| 29 | + return isoparse(self.obj["status"]["completionTime"]) |
| 30 | + |
| 31 | + @property |
| 32 | + def duration(self) -> str: |
| 33 | + return humanize.naturaldelta(self.completion_time - self.start_time) |
| 34 | + |
| 35 | + @property |
| 36 | + def status(self) -> str: |
| 37 | + """ |
| 38 | + Compute a status for the TaskRun. |
| 39 | +
|
| 40 | + Returns: |
| 41 | + A simplified overall status |
| 42 | + """ |
| 43 | + conditions = self.obj["status"]["conditions"] |
| 44 | + conditions = [x for x in conditions if x["type"].lower() == self.SUCCEEDED] |
| 45 | + |
| 46 | + # Figure out the status from the first succeeded condition, if it exists. |
| 47 | + if not conditions: |
| 48 | + return self.UNKNOWN |
| 49 | + |
| 50 | + condition_reason = conditions[0]["reason"].lower() |
| 51 | + |
| 52 | + if condition_reason.lower() == self.SUCCEEDED: |
| 53 | + return self.SUCCEEDED |
| 54 | + elif condition_reason.lower() == self.FAILED: |
| 55 | + return self.FAILED |
| 56 | + else: |
| 57 | + return self.UNKNOWN |
| 58 | + |
| 59 | + |
| 60 | +class PipelineRun: |
| 61 | + """Representation of a Tekton Kubernetes PipelineRun object""" |
| 62 | + |
| 63 | + # TaskRun status mapped to markdown icons |
| 64 | + TASKRUN_STATUS_ICONS = { |
| 65 | + TaskRun.UNKNOWN: ":grey_question:", |
| 66 | + TaskRun.SUCCEEDED: ":heavy_check_mark:", |
| 67 | + TaskRun.FAILED: ":x:", |
| 68 | + } |
| 69 | + |
| 70 | + # Markdown summary template |
| 71 | + SUMMARY_TEMPLATE = """ |
| 72 | +# Pipeline Summary |
| 73 | +
|
| 74 | +Pipeline: *{pipeline}* |
| 75 | +PipelineRun: *{pipelinerun}* |
| 76 | +Start Time: *{start_time}* |
| 77 | +
|
| 78 | +## Tasks |
| 79 | +
|
| 80 | +| Status | Task | Start Time | Duration | |
| 81 | +| ------ | ---- | ---------- | -------- | |
| 82 | +{taskruns} |
| 83 | +""" |
| 84 | + |
| 85 | + # Markdown TaskRun template |
| 86 | + TASKRUN_TEMPLATE = "| {icon} | {name} | {start_time} | {duration} |" |
| 87 | + |
| 88 | + def __init__(self, obj: dict, taskruns: list[TaskRun]) -> None: |
| 89 | + self.obj = obj |
| 90 | + self.taskruns = taskruns |
| 91 | + |
| 92 | + @classmethod |
| 93 | + def from_files(cls, obj_path: str, taskruns_path: str) -> "PipelineRun": |
| 94 | + """ |
| 95 | + Construct a PipelineRun representation from Kubernetes objects. |
| 96 | +
|
| 97 | + Args: |
| 98 | + obj_path: Path to a JSON formatted file with a PipelineRun definition |
| 99 | + taskruns_path: Path to a JSON formatted file with list of TaskRun definitions |
| 100 | +
|
| 101 | + Returns: |
| 102 | + A PipelineRun object |
| 103 | + """ |
| 104 | + with open(taskruns_path) as fh: |
| 105 | + taskruns = [TaskRun(tr) for tr in json.load(fh)] |
| 106 | + |
| 107 | + with open(obj_path) as fh: |
| 108 | + obj = json.load(fh) |
| 109 | + |
| 110 | + return cls(obj, taskruns) |
| 111 | + |
| 112 | + @property |
| 113 | + def pipeline(self) -> str: |
| 114 | + return self.obj["metadata"]["labels"]["tekton.dev/pipeline"] |
| 115 | + |
| 116 | + @property |
| 117 | + def name(self) -> str: |
| 118 | + return self.obj["metadata"]["name"] |
| 119 | + |
| 120 | + @property |
| 121 | + def start_time(self) -> datetime: |
| 122 | + return isoparse(self.obj["status"]["startTime"]) |
| 123 | + |
| 124 | + @property |
| 125 | + def finally_taskruns(self) -> list[TaskRun]: |
| 126 | + """ |
| 127 | + Returns all taskruns in the finally spec. |
| 128 | +
|
| 129 | + Returns: |
| 130 | + A list of TaskRuns |
| 131 | + """ |
| 132 | + pipeline_spec = self.obj["status"]["pipelineSpec"] |
| 133 | + finally_task_names = [task["name"] for task in pipeline_spec.get("finally", [])] |
| 134 | + return [tr for tr in self.taskruns if tr.pipelinetask in finally_task_names] |
| 135 | + |
| 136 | + def markdown_summary(self, include_final_tasks: bool = False) -> str: |
| 137 | + """ |
| 138 | + Construct a markdown summary of the PipelineRun |
| 139 | +
|
| 140 | + Args: |
| 141 | + include_final_tasks (bool): Set to true to summarize finally TaskRuns |
| 142 | +
|
| 143 | + Returns: |
| 144 | + A summary in markdown format |
| 145 | + """ |
| 146 | + # Sort TaskRuns by startTime |
| 147 | + taskruns = sorted(self.taskruns, key=lambda tr: tr.start_time) |
| 148 | + |
| 149 | + taskrun_parts = [] |
| 150 | + |
| 151 | + for taskrun in taskruns: |
| 152 | + |
| 153 | + # Ignore final tasks if not desired |
| 154 | + if (not include_final_tasks) and taskrun in self.finally_taskruns: |
| 155 | + continue |
| 156 | + |
| 157 | + icon = self.TASKRUN_STATUS_ICONS[taskrun.status] |
| 158 | + |
| 159 | + tr = self.TASKRUN_TEMPLATE.format( |
| 160 | + icon=icon, |
| 161 | + name=taskrun.pipelinetask, |
| 162 | + start_time=taskrun.start_time, |
| 163 | + duration=taskrun.duration, |
| 164 | + ) |
| 165 | + taskrun_parts.append(tr) |
| 166 | + |
| 167 | + taskruns_md = "\n".join(taskrun_parts) |
| 168 | + |
| 169 | + return self.SUMMARY_TEMPLATE.format( |
| 170 | + pipelinerun=self.name, |
| 171 | + pipeline=self.pipeline, |
| 172 | + start_time=self.start_time, |
| 173 | + taskruns=taskruns_md, |
| 174 | + ) |
0 commit comments