2424from chatsky .slots .slots import GroupSlot
2525from chatsky .core .service .group import ServiceGroup , ServiceGroupInitTypes
2626from chatsky .core .service .extra import ComponentExtraHandlerInitTypes , BeforeHandler , AfterHandler
27- from chatsky .core .service .types import (
28- GlobalExtraHandlerType ,
29- ExtraHandlerFunction ,
30- )
3127from .service import Service
32- from .utils import finalize_service_group
28+ from .utils import finalize_service_group , initialize_service_states
3329from chatsky .core .service .actor import Actor
3430from chatsky .core .node_label import AbsoluteNodeLabel , AbsoluteNodeLabelInitTypes
3531from chatsky .core .script_parsing import JSONImporter , Path
@@ -104,15 +100,6 @@ class Pipeline(BaseModel, extra="forbid", arbitrary_types_allowed=True):
104100 timeout : Optional [float ] = None
105101 """
106102 Timeout to add to pipeline root service group.
107- """
108- optimization_warnings : bool = False
109- """
110- Asynchronous pipeline optimization check request flag;
111- warnings will be sent to logs. Additionally, it has some calculated fields:
112-
113- - `services_pipeline` is a pipeline root :py:class:`~.ServiceGroup` object,
114- - `actor` is a pipeline actor, found among services.
115-
116103 """
117104 parallelize_processing : bool = False
118105 """
@@ -136,7 +123,6 @@ def __init__(
136123 before_handler : ComponentExtraHandlerInitTypes = None ,
137124 after_handler : ComponentExtraHandlerInitTypes = None ,
138125 timeout : float = None ,
139- optimization_warnings : bool = None ,
140126 parallelize_processing : bool = None ,
141127 ):
142128 if fallback_label is None :
@@ -154,7 +140,6 @@ def __init__(
154140 "before_handler" : before_handler ,
155141 "after_handler" : after_handler ,
156142 "timeout" : timeout ,
157- "optimization_warnings" : optimization_warnings ,
158143 "parallelize_processing" : parallelize_processing ,
159144 }
160145 empty_fields = set ()
@@ -216,14 +201,11 @@ def services_pipeline(self) -> PipelineServiceGroup:
216201 after_handler = self .after_handler ,
217202 timeout = self .timeout ,
218203 )
219- services_pipeline .name = "pipeline "
220- services_pipeline .path = ".pipeline "
204+ services_pipeline .name = ""
205+ services_pipeline .path = ""
221206
222207 finalize_service_group (services_pipeline , path = services_pipeline .path )
223208
224- if self .optimization_warnings :
225- services_pipeline .log_optimization_warnings ()
226-
227209 return services_pipeline
228210
229211 @model_validator (mode = "after" )
@@ -240,60 +222,6 @@ def validate_fallback_label(self):
240222 raise ValueError (f"Unknown fallback_label={ self .fallback_label } " )
241223 return self
242224
243- def add_global_handler (
244- self ,
245- global_handler_type : GlobalExtraHandlerType ,
246- extra_handler : ExtraHandlerFunction ,
247- whitelist : Optional [List [str ]] = None ,
248- blacklist : Optional [List [str ]] = None ,
249- ):
250- """
251- Method for adding global wrappers to pipeline.
252- Different types of global wrappers are called before/after pipeline execution
253- or before/after each pipeline component.
254- They can be used for pipeline statistics collection or other functionality extensions.
255- NB! Global wrappers are still wrappers,
256- they shouldn't be used for much time-consuming tasks (see :py:mod:`chatsky.core.service.extra`).
257-
258- :param global_handler_type: (required) indication where the wrapper
259- function should be executed.
260- :param extra_handler: (required) wrapper function itself.
261- :type extra_handler: ExtraHandlerFunction
262- :param whitelist: a list of services to only add this wrapper to.
263- :param blacklist: a list of services to not add this wrapper to.
264- :return: `None`
265- """
266-
267- def condition (name : str ) -> bool :
268- return (whitelist is None or name in whitelist ) and (blacklist is None or name not in blacklist )
269-
270- if (
271- global_handler_type is GlobalExtraHandlerType .BEFORE_ALL
272- or global_handler_type is GlobalExtraHandlerType .AFTER_ALL
273- ):
274- whitelist = ["pipeline" ]
275- global_handler_type = (
276- GlobalExtraHandlerType .BEFORE
277- if global_handler_type is GlobalExtraHandlerType .BEFORE_ALL
278- else GlobalExtraHandlerType .AFTER
279- )
280-
281- self .services_pipeline .add_extra_handler (global_handler_type , extra_handler , condition )
282-
283- @property
284- def info_dict (self ) -> dict :
285- """
286- Property for retrieving info dictionary about this pipeline.
287- Returns info dict, containing most important component public fields as well as its type.
288- All complex or unserializable fields here are replaced with 'Instance of [type]'.
289- """
290- return {
291- "type" : type (self ).__name__ ,
292- "messenger_interface" : f"Instance of { type (self .messenger_interface ).__name__ } " ,
293- "context_storage" : f"Instance of { type (self .context_storage ).__name__ } " ,
294- "services" : [self .services_pipeline .info_dict ],
295- }
296-
297225 async def _run_pipeline (
298226 self , request : Message , ctx_id : Optional [Hashable ] = None , update_ctx_misc : Optional [dict ] = None
299227 ) -> Context :
@@ -329,12 +257,10 @@ async def _run_pipeline(
329257 ctx .framework_data .slot_manager .set_root_slot (self .slots )
330258
331259 ctx .framework_data .pipeline = self
260+ initialize_service_states (ctx , self .services_pipeline )
332261
333262 ctx .add_request (request )
334- result = await self .services_pipeline (ctx , self )
335-
336- if asyncio .iscoroutine (result ):
337- await result
263+ await self .services_pipeline (ctx )
338264
339265 ctx .framework_data .service_states .clear ()
340266 ctx .framework_data .pipeline = None
0 commit comments