Skip to content

Commit bcfee9f

Browse files
committed
Generic workflow executor
1 parent 2b8bab9 commit bcfee9f

File tree

1 file changed

+343
-0
lines changed

1 file changed

+343
-0
lines changed
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
"""Workflow execution classes for Redis release automation."""
2+
import json
3+
from abc import ABC, abstractmethod
4+
from typing import Any, Dict, Optional
5+
6+
from rich.console import Console
7+
8+
from .github_client import GitHubClient
9+
from .models import PackageState, PackageType, ReleaseState, WorkflowConclusion, WorkflowRun
10+
11+
console = Console()
12+
13+
14+
class Phase(ABC):
15+
"""Abstract base class for workflow phases."""
16+
17+
def __init__(
18+
self,
19+
state: ReleaseState,
20+
repo: str,
21+
orchestrator_config: Dict[str, Any],
22+
timeout_minutes: int = 45
23+
):
24+
self.state = state
25+
self.repo = repo
26+
self.orchestrator_config = orchestrator_config
27+
self.timeout_minutes = timeout_minutes
28+
29+
@property
30+
@abstractmethod
31+
def phase_name(self) -> str:
32+
"""Human-readable phase name for logging."""
33+
pass
34+
35+
@property
36+
@abstractmethod
37+
def package_state(self) -> PackageState:
38+
"""Get the package state for this phase."""
39+
pass
40+
41+
@property
42+
@abstractmethod
43+
def branch(self) -> str:
44+
"""Get the branch to run the workflow on."""
45+
pass
46+
47+
@property
48+
@abstractmethod
49+
def workflow_file(self) -> str:
50+
"""Get the workflow file name."""
51+
pass
52+
53+
@property
54+
@abstractmethod
55+
def workflow_inputs(self) -> Dict[str, Any]:
56+
"""Get the inputs to pass to the workflow."""
57+
pass
58+
59+
@abstractmethod
60+
def is_completed(self) -> bool:
61+
"""Check if this phase is already completed."""
62+
pass
63+
64+
@abstractmethod
65+
def get_workflow(self) -> Optional[WorkflowRun]:
66+
"""Get the current workflow for this phase."""
67+
pass
68+
69+
@abstractmethod
70+
def set_workflow(self, workflow: WorkflowRun) -> None:
71+
"""Set the workflow for this phase."""
72+
pass
73+
74+
@abstractmethod
75+
def set_completed(self, completed: bool) -> None:
76+
"""Mark this phase as completed."""
77+
pass
78+
79+
@abstractmethod
80+
def set_artifacts(self, artifacts: Dict[str, Dict[str, Any]]) -> None:
81+
"""Set artifacts for this phase."""
82+
pass
83+
84+
@abstractmethod
85+
def set_result(self, result_data: Dict[str, Any]) -> None:
86+
"""Set phase-specific result data."""
87+
pass
88+
89+
@abstractmethod
90+
def extract_result(self, github_client: GitHubClient, artifacts: Dict[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
91+
"""Extract phase-specific result data from artifacts."""
92+
pass
93+
94+
def _get_release_branch(self) -> str:
95+
"""Get the release branch based on the release tag.
96+
97+
Extracts major.minor from tag (e.g., "8.2.1" -> "release/8.2").
98+
99+
Returns:
100+
Release branch name
101+
102+
Raises:
103+
ValueError: If tag format is invalid
104+
"""
105+
tag_parts = self.state.tag.split(".")
106+
if len(tag_parts) < 2:
107+
raise ValueError(f"Invalid tag format '{self.state.tag}': expected at least major.minor version")
108+
109+
try:
110+
# Validate that major and minor are numeric
111+
int(tag_parts[0])
112+
int(tag_parts[1])
113+
except ValueError:
114+
raise ValueError(f"Invalid tag format '{self.state.tag}': major and minor versions must be numeric")
115+
116+
major_minor = f"{tag_parts[0]}.{tag_parts[1]}"
117+
return f"release/{major_minor}"
118+
119+
120+
class BuildPhase(Phase):
121+
"""Build phase implementation."""
122+
123+
@property
124+
def phase_name(self) -> str:
125+
return "Docker build"
126+
127+
@property
128+
def package_state(self) -> PackageState:
129+
return self.state.packages[PackageType.DOCKER]
130+
131+
@property
132+
def branch(self) -> str:
133+
"""Get the Docker branch based on the release tag."""
134+
return self._get_release_branch()
135+
136+
@property
137+
def workflow_file(self) -> str:
138+
"""Get the build workflow file from orchestrator config."""
139+
return self.orchestrator_config.get("workflow", "release_build_and_test.yml")
140+
141+
@property
142+
def workflow_inputs(self) -> Dict[str, Any]:
143+
"""Get the build workflow inputs."""
144+
return {
145+
"release_tag": self.state.tag,
146+
}
147+
148+
def is_completed(self) -> bool:
149+
return self.package_state.build_completed
150+
151+
def get_workflow(self) -> Optional[WorkflowRun]:
152+
return self.package_state.build_workflow
153+
154+
def set_workflow(self, workflow: WorkflowRun) -> None:
155+
self.package_state.build_workflow = workflow
156+
157+
def set_completed(self, completed: bool) -> None:
158+
self.package_state.build_completed = completed
159+
160+
def set_artifacts(self, artifacts: Dict[str, Dict[str, Any]]) -> None:
161+
self.package_state.build_artifacts = artifacts
162+
163+
def set_result(self, result_data: Dict[str, Any]) -> None:
164+
self.package_state.release_handle = result_data
165+
166+
def extract_result(self, github_client: GitHubClient, artifacts: Dict[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
167+
"""Extract release_handle from artifacts."""
168+
result = github_client.extract_result(self.repo, artifacts, "release_handle", "release_handle.json")
169+
if result is None:
170+
console.print("[red]Failed to extract release_handle from artifacts[/red]")
171+
return result
172+
173+
174+
class PublishPhase(Phase):
175+
"""Publish phase implementation."""
176+
177+
@property
178+
def phase_name(self) -> str:
179+
return "Docker publish"
180+
181+
@property
182+
def package_state(self) -> PackageState:
183+
return self.state.packages[PackageType.DOCKER]
184+
185+
@property
186+
def branch(self) -> str:
187+
"""Get the Docker branch based on the release tag."""
188+
return self._get_release_branch()
189+
190+
@property
191+
def workflow_file(self) -> str:
192+
"""Get the publish workflow file from orchestrator config."""
193+
return self.orchestrator_config.get("publish_workflow", "release_publish.yml")
194+
195+
@property
196+
def workflow_inputs(self) -> Dict[str, Any]:
197+
"""Get the publish workflow inputs.
198+
199+
Raises:
200+
RuntimeError: If release_handle is not available in package state
201+
"""
202+
if not self.package_state.release_handle:
203+
raise RuntimeError("release_handle is required for publish phase but not found in package state")
204+
205+
return {
206+
"release_handle": json.dumps(self.package_state.release_handle),
207+
}
208+
209+
def is_completed(self) -> bool:
210+
return self.package_state.publish_completed
211+
212+
def get_workflow(self) -> Optional[WorkflowRun]:
213+
return self.package_state.publish_workflow
214+
215+
def set_workflow(self, workflow: WorkflowRun) -> None:
216+
self.package_state.publish_workflow = workflow
217+
218+
def set_completed(self, completed: bool) -> None:
219+
self.package_state.publish_completed = completed
220+
221+
def set_artifacts(self, artifacts: Dict[str, Dict[str, Any]]) -> None:
222+
self.package_state.publish_artifacts = artifacts
223+
224+
def set_result(self, result_data: Dict[str, Any]) -> None:
225+
self.package_state.publish_info = result_data
226+
227+
def extract_result(self, github_client: GitHubClient, artifacts: Dict[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
228+
"""Extract release_info from artifacts."""
229+
result = github_client.extract_result(self.repo, artifacts, "release_info", "release_info.json")
230+
if result is None:
231+
console.print("[red]Failed to extract release_info from artifacts[/red]")
232+
return result
233+
234+
235+
class PhaseExecutor:
236+
"""Executes workflow phases."""
237+
238+
def execute_phase(self, phase: Phase, github_client: GitHubClient) -> bool:
239+
"""Execute a workflow phase.
240+
241+
Args:
242+
phase: The phase to execute
243+
github_client: GitHub client for API operations
244+
245+
Returns:
246+
True if phase succeeded, False otherwise
247+
"""
248+
if not self._trigger_workflow(phase, github_client):
249+
return False
250+
251+
# Wait for workflow completion if needed
252+
workflow = phase.get_workflow()
253+
console.print("[dim]Waiting for workflow completion...[/dim]")
254+
return self._wait_for_completion(phase, github_client, workflow)
255+
256+
def _trigger_workflow(self, phase: Phase, github_client: GitHubClient) -> bool:
257+
"""Trigger the workflow for a phase."""
258+
console.print(f"[dim]Using branch: {phase.branch}[/dim]")
259+
260+
if not github_client.check_workflow_exists(phase.repo, phase.workflow_file):
261+
console.print(
262+
f"[red]Workflow '{phase.workflow_file}' not found in {phase.repo}[/red]"
263+
)
264+
console.print(
265+
f"[yellow]Make sure the workflow file exists in branch '{phase.branch}'[/yellow]"
266+
)
267+
return False
268+
269+
try:
270+
workflow_run = github_client.trigger_workflow(
271+
phase.repo, phase.workflow_file, phase.workflow_inputs, ref=phase.branch
272+
)
273+
phase.set_workflow(workflow_run)
274+
return True
275+
276+
except Exception as e:
277+
console.print(f"[red]Failed to trigger {phase.phase_name}: {e}[/red]")
278+
return False
279+
280+
def _wait_for_completion(self, phase: Phase, github_client: GitHubClient, workflow: WorkflowRun) -> bool:
281+
"""Wait for workflow completion and handle results."""
282+
try:
283+
console.print(f"[blue]Waiting for {phase.phase_name} to complete...[/blue]")
284+
completed_run = github_client.wait_for_workflow_completion(
285+
workflow.repo,
286+
workflow.run_id,
287+
timeout_minutes=phase.timeout_minutes,
288+
)
289+
290+
phase.set_workflow(completed_run)
291+
292+
if completed_run.conclusion == WorkflowConclusion.SUCCESS:
293+
return self._handle_success(phase, github_client, completed_run)
294+
elif completed_run.conclusion == WorkflowConclusion.FAILURE:
295+
phase.set_completed(True) # completed, but failed
296+
console.print(f"[red]{phase.phase_name} failed[/red]")
297+
return False
298+
else:
299+
return self._handle_other_conclusion(phase, completed_run)
300+
301+
except Exception as e:
302+
console.print(f"[red]{phase.phase_name} failed: {e}[/red]")
303+
return False
304+
305+
def _handle_success(self, phase: Phase, github_client: GitHubClient, completed_run: WorkflowRun) -> bool:
306+
"""Handle successful workflow completion."""
307+
phase.set_completed(True)
308+
309+
# Get artifacts
310+
artifacts = github_client.get_workflow_artifacts(
311+
completed_run.repo, completed_run.run_id
312+
)
313+
phase.set_artifacts(artifacts)
314+
315+
# Extract phase-specific result data
316+
result_data = phase.extract_result(github_client, artifacts)
317+
if result_data is None:
318+
return False
319+
320+
phase.set_result(result_data)
321+
console.print(f"[green]{phase.phase_name} completed successfully[/green]")
322+
return True
323+
324+
def _handle_other_conclusion(self, phase: Phase, completed_run: WorkflowRun) -> bool:
325+
"""Handle non-success, non-failure conclusions."""
326+
phase.set_completed(True) # completed, but not successful
327+
conclusion_text = (
328+
completed_run.conclusion.value
329+
if completed_run.conclusion
330+
else "cancelled/skipped"
331+
)
332+
333+
if conclusion_text in ["cancelled", "cancelled/skipped"]:
334+
status_color = "yellow"
335+
elif conclusion_text in ["skipped"]:
336+
status_color = "blue"
337+
else:
338+
status_color = "red"
339+
340+
console.print(
341+
f"[dim]{phase.phase_name} completed with status:[/dim] [{status_color}]{conclusion_text}[/{status_color}]"
342+
)
343+
return False

0 commit comments

Comments
 (0)