1- from collections import defaultdict
2- from typing import TYPE_CHECKING , Any , Optional
1+ from typing import Any , Optional
32
4- from prefect .client .orchestration import get_client
5- from prefect .client .schemas .filters import (
6- FlowRunFilter ,
7- FlowRunFilterTags ,
8- LogFilter ,
9- LogFilterFlowRunId ,
10- )
11- from prefect .client .schemas .sorting import (
12- FlowRunSort ,
13- )
143from pydantic import ConfigDict , Field
154
165from infrahub .core .constants import TaskConclusion
2110from infrahub .core .timestamp import current_timestamp
2211from infrahub .database import InfrahubDatabase
2312from infrahub .utils import get_nested_dict
24- from infrahub .workflows .constants import TAG_NAMESPACE , WorkflowTag
2513
2614from .task_log import TaskLog
2715
28- if TYPE_CHECKING :
29- from prefect .client .schemas .objects import Log as PrefectLog
30-
31- LOG_LEVEL_MAPPING = {10 : "debug" , 20 : "info" , 30 : "warning" , 40 : "error" , 50 : "critical" }
32-
3316
3417class Task (StandardNode ):
3518 model_config = ConfigDict (arbitrary_types_allowed = True )
@@ -55,11 +38,13 @@ async def query(
5538 cls ,
5639 db : InfrahubDatabase ,
5740 fields : dict [str , Any ],
58- limit : int ,
59- offset : int ,
60- ids : list [str ],
61- related_nodes : list [str ],
41+ limit : int | None = None ,
42+ offset : int | None = None ,
43+ ids : list [str ] | None = None ,
44+ related_nodes : list [str ] | None = None ,
6245 ) -> dict [str , Any ]:
46+ ids = ids or []
47+ related_nodes = related_nodes or []
6348 log_fields = get_nested_dict (nested_dict = fields , keys = ["edges" , "node" , "logs" , "edges" , "node" ])
6449 count = None
6550 if "count" in fields :
@@ -108,83 +93,3 @@ async def query(
10893 )
10994
11095 return {"count" : count , "edges" : nodes }
111-
112-
113- class NewTask :
114- @classmethod
115- async def query (
116- cls ,
117- fields : dict [str , Any ],
118- related_nodes : list [str ],
119- branch : str | None = None ,
120- limit : int | None = None ,
121- offset : int = 0 ,
122- ) -> dict [str , Any ]:
123- nodes : list [dict ] = []
124- count = None
125-
126- log_fields = get_nested_dict (nested_dict = fields , keys = ["edges" , "node" , "logs" , "edges" , "node" ])
127- logs_flow : dict [str , list [PrefectLog ]] = defaultdict (list )
128-
129- async with get_client (sync_client = False ) as client :
130- tags = [TAG_NAMESPACE ]
131-
132- if branch :
133- tags .append (WorkflowTag .BRANCH .render (identifier = branch ))
134-
135- # We only support one related node for now, need to investigate HOW (and IF) we can support more
136- if related_nodes :
137- tags .append (WorkflowTag .RELATED_NODE .render (identifier = related_nodes [0 ]))
138-
139- flow_run_filters = FlowRunFilter (
140- tags = FlowRunFilterTags (all_ = tags ),
141- )
142-
143- flows = await client .read_flow_runs (
144- flow_run_filter = flow_run_filters ,
145- limit = limit ,
146- offset = offset ,
147- sort = FlowRunSort .START_TIME_DESC ,
148- )
149-
150- # For now count will just return the number of objects in the response
151- # it won't work well with pagination but it doesn't look like Prefect provide a good option to count all flows
152- if "count" in fields :
153- count = len (flows )
154-
155- if log_fields :
156- flow_ids = [flow .id for flow in flows ]
157- all_logs = await client .read_logs (log_filter = LogFilter (flow_run_id = LogFilterFlowRunId (any_ = flow_ids )))
158- for log in all_logs :
159- logs_flow [log .flow_run_id ].append (log )
160-
161- for flow in flows :
162- logs = []
163- if log_fields :
164- logs = [
165- {
166- "node" : {
167- "message" : log .message ,
168- "severity" : LOG_LEVEL_MAPPING .get (log .level , "error" ),
169- "timestamp" : log .timestamp .to_iso8601_string (),
170- }
171- }
172- for log in logs_flow [flow .id ]
173- ]
174-
175- nodes .append (
176- {
177- "node" : {
178- "title" : flow .name ,
179- "conclusion" : flow .state_name ,
180- "related_node" : "" ,
181- "related_node_kind" : "" ,
182- "created_at" : flow .created .to_iso8601_string (),
183- "updated_at" : flow .updated .to_iso8601_string (),
184- "id" : flow .id ,
185- "logs" : {"edges" : logs },
186- }
187- }
188- )
189-
190- return {"count" : count or 0 , "edges" : nodes }
0 commit comments