1- import json
21from typing import (
32 Any ,
43 Coroutine ,
4+ Dict ,
55 Generic ,
66 List ,
77 Literal ,
@@ -29,10 +29,13 @@ class DumpedStep(pydantic.BaseModel):
2929 """Dumped state model."""
3030
3131 step_type : str
32- step_data : str
32+ step_data : Dict [ str , Any ]
3333 task_id : str
3434
3535
36+ DumpedSteps = pydantic .RootModel [List [DumpedStep ]]
37+
38+
3639class Pipeline (Generic [_FuncParams , _ReturnType ]):
3740 """
3841 Pipeline constructor.
@@ -116,7 +119,7 @@ def call_next(
116119 task = task ,
117120 param_name = param_name ,
118121 ** additional_kwargs ,
119- ).dumps (),
122+ ).model_dump (),
120123 task_id = "" ,
121124 ),
122125 )
@@ -172,7 +175,7 @@ def call_after(
172175 task = task ,
173176 param_name = EMPTY_PARAM_NAME ,
174177 ** additional_kwargs ,
175- ).dumps (),
178+ ).model_dump (),
176179 task_id = "" ,
177180 ),
178181 )
@@ -243,7 +246,7 @@ def map(
243246 skip_errors = skip_errors ,
244247 check_interval = check_interval ,
245248 ** additional_kwargs ,
246- ).dumps (),
249+ ).model_dump (),
247250 task_id = "" ,
248251 ),
249252 )
@@ -315,24 +318,24 @@ def filter(
315318 skip_errors = skip_errors ,
316319 check_interval = check_interval ,
317320 ** additional_kwargs ,
318- ).dumps (),
321+ ).model_dump (),
319322 task_id = "" ,
320323 ),
321324 )
322325 return self
323326
324- def dumps (self ) -> str :
327+ def dumpb (self ) -> bytes :
325328 """
326329 Dumps current pipeline as string.
327330
328331 :returns: serialized pipeline.
329332 """
330- return json . dumps (
331- [ step . model_dump () for step in self .steps ] ,
333+ return self . broker . serializer . dumpb (
334+ DumpedSteps . model_validate ( self .steps ). model_dump () ,
332335 )
333336
334337 @classmethod
335- def loads (cls , broker : AsyncBroker , pipe_data : str ) -> "Pipeline[Any, Any]" :
338+ def loadb (cls , broker : AsyncBroker , pipe_data : bytes ) -> "Pipeline[Any, Any]" :
336339 """
337340 Parses serialized pipeline.
338341
@@ -344,7 +347,8 @@ def loads(cls, broker: AsyncBroker, pipe_data: str) -> "Pipeline[Any, Any]":
344347 :return: new
345348 """
346349 pipe : "Pipeline[Any, Any]" = Pipeline (broker )
347- pipe .steps = pydantic .TypeAdapter (List [DumpedStep ]).validate_json (pipe_data )
350+ data = broker .serializer .loadb (pipe_data )
351+ pipe .steps = DumpedSteps .model_validate (data ) # type: ignore[assignment]
348352 return pipe
349353
350354 async def kiq (
@@ -383,7 +387,7 @@ async def kiq(
383387 )
384388 .with_task_id (step .task_id )
385389 .with_labels (
386- ** {CURRENT_STEP : 0 , PIPELINE_DATA : self .dumps ()}, # type: ignore
390+ ** {CURRENT_STEP : 0 , PIPELINE_DATA : self .dumpb ()}, # type: ignore
387391 )
388392 )
389393 taskiq_task = await kicker .kiq (* args , ** kwargs )
0 commit comments