Skip to content

Commit 7bd7eda

Browse files
committed
ParallelBarrier, support for force_rebuild and convenient restarts
1 parent 96bd0cb commit 7bd7eda

File tree

9 files changed

+865
-154
lines changed

9 files changed

+865
-154
lines changed

src/redis_release/bht/backchain.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ def latch_chain_to_chain(
4040
) -> None:
4141
"""Latch two chains together. Both are expected to be formed using PPAs.
4242
43-
If precondition exists in the anchor point, it is replaced by the next chain.
43+
If both precondition in the anchor point and postcondition of the next
44+
chain exist, and they are the same type then the precondition in the
45+
anchor point is replaced by the next chain.
4446
Otherwise the next chain is added as a leftmost child to the anchor point.
4547
4648
If the next chain is a sequence, its children are merged into the anchor point.

src/redis_release/bht/behaviours.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@
2121
from py_trees.common import Status
2222
from py_trees.composites import Selector, Sequence
2323
from py_trees.decorators import Inverter, Repeat, Retry, Timeout
24-
from pydantic import BaseModel
24+
25+
from redis_release.bht.state import reset_model_to_defaults
2526

2627
from ..github_client_async import GitHubClientAsync
2728
from ..models import WorkflowConclusion, WorkflowRun, WorkflowStatus
2829
from .decorators import FlagGuard
2930
from .logging_wrapper import PyTreesLoggerWrapper
30-
from .state import PackageMeta, ReleaseMeta, Workflow
31+
from .state import Package, PackageMeta, ReleaseMeta, Workflow
3132

3233
logger = logging.getLogger(__name__)
3334

@@ -115,6 +116,7 @@ def initialise(self) -> None:
115116
return
116117
self.workflow.inputs["release_tag"] = self.release_meta.tag
117118
ref = self.package_meta.ref if self.package_meta.ref is not None else "main"
119+
self.workflow.ephemeral.trigger_attempted = True
118120
self.task = asyncio.create_task(
119121
self.github_client.trigger_workflow(
120122
self.package_meta.repo,
@@ -370,6 +372,46 @@ def update(self) -> Status:
370372
return self.log_exception_and_return_failure(e)
371373

372374

375+
class ResetPackageState(ReleaseAction):
376+
def __init__(
377+
self,
378+
name: str,
379+
package: Package,
380+
default_package: Package,
381+
log_prefix: str = "",
382+
) -> None:
383+
self.package = package
384+
self.default_package = default_package
385+
super().__init__(name=name, log_prefix=log_prefix)
386+
387+
def update(self) -> Status:
388+
reset_model_to_defaults(self.package, self.default_package)
389+
390+
self.feedback_message = "Package state reset to default values"
391+
self.logger.info(f"[green]{self.feedback_message}[/green]")
392+
return Status.SUCCESS
393+
394+
395+
class ResetWorkflowState(ReleaseAction):
396+
def __init__(
397+
self,
398+
name: str,
399+
workflow: Workflow,
400+
default_workflow: Workflow,
401+
log_prefix: str = "",
402+
) -> None:
403+
self.workflow = workflow
404+
self.default_workflow = default_workflow
405+
super().__init__(name=name, log_prefix=log_prefix)
406+
407+
def update(self) -> Status: # type: ignore
408+
reset_model_to_defaults(self.workflow, self.default_workflow)
409+
410+
self.feedback_message = "Workflow state reset to default values"
411+
self.logger.info(f"[green]{self.feedback_message}[/green]")
412+
return Status.SUCCESS
413+
414+
373415
### Conditions ###
374416

375417

@@ -509,3 +551,16 @@ def update(self) -> Status:
509551
self.build_workflow.result
510552
)
511553
return Status.SUCCESS
554+
555+
556+
class IsForceRebuild(LoggingAction):
557+
def __init__(
558+
self, name: str, package_meta: PackageMeta, log_prefix: str = ""
559+
) -> None:
560+
self.package_meta = package_meta
561+
super().__init__(name=name, log_prefix=log_prefix)
562+
563+
def update(self) -> Status:
564+
if self.package_meta.ephemeral.force_rebuild:
565+
return Status.SUCCESS
566+
return Status.FAILURE

src/redis_release/bht/composites.py

Lines changed: 191 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
from py_trees.composites import Selector, Sequence
2-
from py_trees.decorators import Inverter, Repeat, Retry, Timeout
1+
from typing import Iterator, List, Optional
2+
from typing import Sequence as TypingSequence
3+
4+
from py_trees.behaviour import Behaviour
5+
from py_trees.common import OneShotPolicy, Status
6+
from py_trees.composites import Composite, Selector, Sequence
7+
from py_trees.decorators import Repeat, Retry, SuccessIsRunning, Timeout
38

49
from ..github_client_async import GitHubClientAsync
510
from .behaviours import (
@@ -14,12 +19,89 @@
1419
IsWorkflowIdentified,
1520
IsWorkflowSuccessful,
1621
IsWorkflowTriggered,
22+
ResetPackageState,
23+
ResetWorkflowState,
1724
Sleep,
1825
)
1926
from .behaviours import TriggerWorkflow as TriggerWorkflow
2027
from .behaviours import UpdateWorkflowStatus
2128
from .decorators import FlagGuard
22-
from .state import PackageMeta, ReleaseMeta, Workflow
29+
from .state import Package, PackageMeta, ReleaseMeta, Workflow
30+
31+
32+
class ParallelBarrier(Composite):
33+
"""
34+
A simplified parallel composite that runs all children until convergence.
35+
36+
This parallel composite:
37+
- Ticks all children on each tick
38+
- Skips children that have already converged (SUCCESS or FAILURE) in synchronized mode
39+
- Returns FAILURE if any child returns FAILURE
40+
- Returns SUCCESS if all children return SUCCESS
41+
- Returns RUNNING if any child is still RUNNING
42+
43+
Unlike py_trees.Parallel, this composite:
44+
- Has no policy configuration (always waits for all children)
45+
- Always operates in synchronized mode (skips converged children)
46+
- Has simpler logic focused on the all-must-succeed pattern
47+
48+
Args:
49+
name: the composite behaviour name
50+
children: list of children to add
51+
"""
52+
53+
def __init__(
54+
self,
55+
name: str,
56+
children: Optional[TypingSequence[Behaviour]] = None,
57+
):
58+
super().__init__(name, children)
59+
60+
def tick(self) -> Iterator[Behaviour]:
61+
"""
62+
Tick all children until they converge, then determine status.
63+
"""
64+
# Initialise if first time
65+
if self.status != Status.RUNNING:
66+
# subclass (user) handling
67+
self.initialise()
68+
69+
# Handle empty children case
70+
if not self.children:
71+
self.current_child = None
72+
self.stop(Status.SUCCESS)
73+
yield self
74+
return
75+
76+
# Tick all children, skipping those that have already converged
77+
for child in self.children:
78+
# Skip children that have already converged (synchronized mode)
79+
if child.status in [Status.SUCCESS, Status.FAILURE]:
80+
continue
81+
# Tick the child
82+
for node in child.tick():
83+
yield node
84+
85+
# Determine new status based on children's statuses
86+
self.current_child = self.children[-1]
87+
88+
new_status = Status.INVALID
89+
has_running = any(child.status == Status.RUNNING for child in self.children)
90+
if has_running:
91+
new_status = Status.RUNNING
92+
else:
93+
has_failed = any(child.status == Status.FAILURE for child in self.children)
94+
if has_failed:
95+
new_status = Status.FAILURE
96+
else:
97+
new_status = Status.SUCCESS
98+
99+
# If we've reached a final status, stop and terminate running children
100+
if new_status != Status.RUNNING:
101+
self.stop(new_status)
102+
103+
self.status = new_status
104+
yield self
23105

24106

25107
class FindWorkflowByUUID(FlagGuard):
@@ -203,3 +285,109 @@ def __init__(
203285
"extract_result_failed",
204286
log_prefix=log_prefix,
205287
)
288+
289+
290+
class ResetPackageStateGuarded(FlagGuard):
291+
"""
292+
Reset package once if force_rebuild is True.
293+
Always returns SUCCESS.
294+
"""
295+
296+
def __init__(
297+
self,
298+
name: str,
299+
package: Package,
300+
default_package: Package,
301+
log_prefix: str = "",
302+
) -> None:
303+
super().__init__(
304+
None if name == "" else name,
305+
ResetPackageState(
306+
"Reset Package State",
307+
package,
308+
default_package,
309+
log_prefix=log_prefix,
310+
),
311+
package.meta.ephemeral,
312+
"force_rebuild",
313+
flag_value=False,
314+
raise_on=[Status.SUCCESS, Status.FAILURE],
315+
guard_status=Status.SUCCESS,
316+
log_prefix=log_prefix,
317+
)
318+
319+
320+
class RestartPackageGuarded(FlagGuard):
321+
"""
322+
Reset package if we didn't trigger the workflow in current run
323+
This is intended to be used for build workflow since if build has failed
324+
we have to reset not only build but also publish which effectively means
325+
we have to reset the entire package and restart from scratch.
326+
327+
When reset is made we return RUNNING to give the tree opportunity to run the workflow again.
328+
"""
329+
330+
def __init__(
331+
self,
332+
name: str,
333+
package: Package,
334+
workflow: Workflow,
335+
default_package: Package,
336+
log_prefix: str = "",
337+
) -> None:
338+
reset_package_state = ResetPackageState(
339+
"Reset Package State",
340+
package,
341+
default_package,
342+
log_prefix=log_prefix,
343+
)
344+
reset_package_state_wrapped = SuccessIsRunning(
345+
"Success is Running", reset_package_state
346+
)
347+
super().__init__(
348+
None if name == "" else name,
349+
reset_package_state_wrapped,
350+
workflow.ephemeral,
351+
"trigger_attempted",
352+
flag_value=True,
353+
raise_on=[],
354+
guard_status=Status.FAILURE,
355+
log_prefix=log_prefix,
356+
)
357+
358+
359+
class RestartWorkflowGuarded(FlagGuard):
360+
"""
361+
Reset workflow if we didn't trigger the workflow in current run
362+
363+
This will only reset the workflow state
364+
365+
When reset is made we return RUNNING to give the tree opportunity to run the workflow again.
366+
"""
367+
368+
def __init__(
369+
self,
370+
name: str,
371+
workflow: Workflow,
372+
default_workflow: Workflow,
373+
log_prefix: str = "",
374+
) -> None:
375+
reset_workflow_state = ResetWorkflowState(
376+
"Reset Workflow State",
377+
workflow,
378+
default_workflow,
379+
log_prefix=log_prefix,
380+
)
381+
reset_workflow_state_wrapped = SuccessIsRunning(
382+
"Success is Running", reset_workflow_state
383+
)
384+
super().__init__(
385+
None if name == "" else name,
386+
reset_workflow_state_wrapped,
387+
workflow.ephemeral,
388+
"trigger_attempted",
389+
flag_value=True,
390+
raise_on=[],
391+
guard_status=Status.FAILURE,
392+
log_prefix=log_prefix,
393+
)

0 commit comments

Comments
 (0)