Skip to content

Commit 5bf4782

Browse files
authored
feat(writer): allow CRON trigger block to be run - WF-881 (#1250)
* fix(writer): allow CRON trigger block to be run - WF-881 * feat(writer): add support for CRON blueprints and triggers in BlueprintRunner - AB-881 * refactor(writer): remove trigger_type parameter from run_blueprint_via_api and streamline trigger selection logic - AB-881
1 parent 2852741 commit 5bf4782

File tree

3 files changed

+94
-54
lines changed

3 files changed

+94
-54
lines changed

src/writer/blueprints.py

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ def __init__(self, session: writer.core.WriterSession):
6969
def api_blueprints(self):
7070
return self._gather_api_blueprints()
7171

72+
@property
73+
def cron_blueprints(self):
74+
return self._gather_cron_blueprints()
75+
7276
@contextmanager
7377
def _get_executor(self) -> Generator[ThreadPoolExecutor, None, None]:
7478
"""Return the application's thread pool executor.
@@ -137,6 +141,17 @@ def is_blueprint_api_available(
137141
"""
138142
return blueprint_id in self.api_blueprints
139143

144+
def is_blueprint_cron_available(
145+
self, blueprint_id: str
146+
):
147+
"""
148+
Checks if a blueprint with the given key is available for Cron execution.
149+
150+
:param blueprint_id: The blueprint identifier.
151+
:return: True if the blueprint is available for Cron execution, False otherwise.
152+
"""
153+
return blueprint_id in self.cron_blueprints
154+
140155
def get_blueprint_api_trigger(
141156
self, blueprint_id: str
142157
):
@@ -152,48 +167,79 @@ def get_blueprint_api_trigger(
152167
)
153168
return self.api_blueprints[blueprint_id]
154169

155-
def _gather_api_blueprints(self):
170+
def get_blueprint_cron_trigger(
171+
self, blueprint_id: str
172+
):
156173
"""
157-
Gathers all blueprints that have an API trigger.
174+
Retrieves the Cron trigger for a given blueprint key.
175+
176+
:param blueprint_key: The blueprint identifier.
177+
:return: The Cron trigger component.
178+
"""
179+
if not self.is_blueprint_cron_available(blueprint_id):
180+
raise ValueError(
181+
f'Cron trigger not found for blueprint "{blueprint_id}".'
182+
)
183+
return self.cron_blueprints[blueprint_id]
158184

159-
:return: A set of blueprint keys that have an API trigger.
185+
def _gather_blueprints_by_trigger(self, trigger_type: str):
186+
"""
187+
Gathers all blueprints that have a trigger of the specified type.
188+
189+
:param trigger_type: The trigger component type (e.g., "blueprints_apitrigger").
190+
:return: A dict mapping blueprint IDs to their trigger IDs.
160191
"""
161192
triggers = [
162193
c for c in self.session.session_component_tree.components.values()
163-
if c.type == "blueprints_apitrigger"
164-
]
165-
api_blueprints = {}
194+
if c.type == trigger_type
195+
]
196+
blueprints = {}
166197

167198
for trigger in triggers:
168199
parent_blueprint_id = \
169200
self.session.session_component_tree.get_parent(trigger.id)[0]
170201
parent_blueprint = \
171202
self.session.session_component_tree.get_component(
172203
parent_blueprint_id
173-
)
204+
)
174205

175206
if (
176207
parent_blueprint
177208
and
178209
parent_blueprint.type == "blueprints_blueprint"
179210
):
180211
# Store the blueprint key against its trigger ID
181-
api_blueprints[parent_blueprint_id] = \
182-
trigger.id
212+
blueprints[parent_blueprint_id] = trigger.id
183213

184-
return api_blueprints
214+
return blueprints
215+
216+
def _gather_api_blueprints(self):
217+
"""
218+
Gathers all blueprints that have an API trigger.
219+
220+
:return: A dict mapping blueprint IDs to their API trigger IDs.
221+
"""
222+
return self._gather_blueprints_by_trigger("blueprints_apitrigger")
223+
224+
def _gather_cron_blueprints(self):
225+
"""
226+
Gathers all blueprints that have a Cron trigger.
227+
228+
:return: A dict mapping blueprint IDs to their Cron trigger IDs.
229+
"""
230+
return self._gather_blueprints_by_trigger("blueprints_crontrigger")
185231

186232
def run_blueprint_via_api(
187233
self,
188234
blueprint_id: str,
189-
trigger_type: Literal["API", "Cron"],
190235
branch_id: Optional[str] = None,
191236
execution_environment: Optional[Dict[str, Any]] = None
192237
):
193238
"""
194239
Executes a blueprint by its key via the API.
195240
196241
:param blueprint_id: The blueprint identifier.
242+
:param branch_id: Optional branch ID to start execution from.
197243
:param execution_environment: The execution environment for
198244
the blueprint.
199245
:return: The result of the blueprint execution.
@@ -202,9 +248,24 @@ def run_blueprint_via_api(
202248
execution_environment = {}
203249

204250
trigger_id = branch_id
205-
if trigger_id is None:
251+
if trigger_id is not None:
252+
# Determine trigger type from the component
253+
component = self.session.session_component_tree.get_component(trigger_id)
254+
if component and component.type == "blueprints_apitrigger":
255+
trigger_type = "API"
256+
elif component and component.type == "blueprints_crontrigger":
257+
trigger_type = "Cron"
258+
else:
259+
trigger_type = "Branch"
260+
elif self.is_blueprint_api_available(blueprint_id):
261+
# Prioritize API trigger over Cron if both exist
206262
trigger_id = self.get_blueprint_api_trigger(blueprint_id)
207-
263+
trigger_type = "API"
264+
elif self.is_blueprint_cron_available(blueprint_id):
265+
trigger_id = self.get_blueprint_cron_trigger(blueprint_id)
266+
trigger_type = "Cron"
267+
else:
268+
raise ValueError(f'No trigger found for blueprint "{blueprint_id}".')
208269
return self.run_branch(
209270
trigger_id,
210271
None,

src/writer/core.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,15 +1283,11 @@ def run_blueprint_via_api(
12831283
blueprint_id = payload.pop("blueprint_id", None)
12841284
if not blueprint_id:
12851285
raise ValueError("Missing blueprint_id in payload")
1286-
trigger_type = payload.pop("trigger_type", None)
1287-
if not trigger_type:
1288-
raise ValueError("Missing trigger_type in payload")
12891286
execution_environment = EventHandler._get_blueprint_execution_environment(
12901287
payload, context, session, vault
12911288
)
12921289
return blueprint_runner.run_blueprint_via_api(
12931290
blueprint_id=blueprint_id,
1294-
trigger_type=trigger_type,
12951291
branch_id=payload.pop("branch_id", None),
12961292
execution_environment=execution_environment,
12971293
)

src/writer/serve.py

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -385,12 +385,13 @@ async def _get_payload_as_json(request: Request):
385385
raise HTTPException(status_code=400, detail="Cannot parse the payload.")
386386
return payload
387387

388-
def has_api_trigger(app_runner: AppRunner, blueprint_id: str) -> bool:
389-
# Check if blueprint has at least one API trigger component
388+
def is_blueprint_triggerable(app_runner: AppRunner, blueprint_id: str) -> bool:
389+
"""Check if blueprint has at least one supported trigger (API or Cron)."""
390390
if not app_runner.bmc_components:
391391
return False
392+
supported_triggers = ("blueprints_apitrigger", "blueprints_crontrigger")
392393
return any(
393-
comp["type"] == "blueprints_apitrigger" and comp.get("parentId") == blueprint_id
394+
comp["type"] in supported_triggers and comp.get("parentId") == blueprint_id
394395
for comp in app_runner.bmc_components.values()
395396
)
396397

@@ -409,7 +410,7 @@ async def get_blueprints(request: Request):
409410
}
410411
for comp in app_runner.bmc_components.values()
411412
if comp["type"] == "blueprints_blueprint"
412-
and has_api_trigger(app_runner, comp["id"])
413+
and is_blueprint_triggerable(app_runner, comp["id"])
413414
]
414415

415416
return JSONResponse(content=blueprints)
@@ -520,9 +521,9 @@ async def event_logic(queue: asyncio.Queue):
520521
}))
521522
return
522523

523-
if not branch_id and not has_api_trigger(app_runner, blueprint_id):
524+
if not branch_id and not is_blueprint_triggerable(app_runner, blueprint_id):
524525
await queue.put(await format_event("error", {
525-
"msg": f"Blueprint '{blueprint_id}' lacks an API trigger.",
526+
"msg": f"Blueprint '{blueprint_id}' lacks a supported trigger (API or Cron).",
526527
"finished_at": int(time.time())
527528
}))
528529
return
@@ -538,39 +539,21 @@ async def event_logic(queue: asyncio.Queue):
538539

539540
await queue.put(await format_event("status", {"status": "executing", "msg": (f"Executing branch: {branch_id}..." if branch_id else f"Executing blueprint: {blueprint_id}...")}))
540541

541-
if branch_id:
542-
task = asyncio.create_task(
543-
app_runner.handle_event(
544-
session_id,
545-
WriterEvent(
546-
type="wf-run-blueprint-via-api",
547-
isSafe=True,
548-
handler="run_blueprint_via_api",
549-
payload={
550-
"blueprint_id": blueprint_id,
551-
"trigger_type": "Cron",
552-
"branch_id": branch_id,
553-
**(payload or {})
554-
},
555-
)
556-
)
557-
)
558-
else:
559-
task = asyncio.create_task(
560-
app_runner.handle_event(
561-
session_id,
562-
WriterEvent(
563-
type="wf-run-blueprint-via-api",
564-
isSafe=True,
565-
handler="run_blueprint_via_api",
566-
payload={
567-
"blueprint_id": blueprint_id,
568-
"trigger_type": "API",
569-
**(payload or {})
570-
},
571-
)
542+
task = asyncio.create_task(
543+
app_runner.handle_event(
544+
session_id,
545+
WriterEvent(
546+
type="wf-run-blueprint-via-api",
547+
isSafe=True,
548+
handler="run_blueprint_via_api",
549+
payload={
550+
"blueprint_id": blueprint_id,
551+
"branch_id": branch_id,
552+
**(payload or {})
553+
},
572554
)
573555
)
556+
)
574557

575558
await queue.put(await format_event("status", {"status": "running", "msg": ("Branch is running. Awaiting output..." if branch_id else "Blueprint is running. Awaiting output...")}))
576559

0 commit comments

Comments
 (0)