@@ -114,47 +114,39 @@ async def run(
114114 but with ``Errored`` status.
115115 :param return_execution: If ``True`` the ``SagaExecution`` instance is returned. Otherwise, only the
116116 identifier (``UUID``) is returned.
117+ # :param timeout: TODO
117118 :param kwargs: Additional named arguments.
118119 :return: This method does not return anything.
119120 """
120121 if isinstance (definition , SagaDecoratorWrapper ):
121122 definition = definition .meta .definition
122123
123- if response is not None :
124- return await self ._load_and_run (
125- response = response ,
126- autocommit = autocommit ,
127- pause_on_disk = pause_on_disk ,
128- raise_on_error = raise_on_error ,
129- return_execution = return_execution ,
130- ** kwargs ,
131- )
132-
133- return await self ._run_new (
134- definition = definition ,
135- context = context ,
136- user = user ,
124+ if response is None :
125+ execution = await self ._create (definition , context , user )
126+ else :
127+ execution = await self ._load (response )
128+
129+ return await self ._run (
130+ execution ,
131+ response = response ,
137132 autocommit = autocommit ,
138133 pause_on_disk = pause_on_disk ,
139134 raise_on_error = raise_on_error ,
140135 return_execution = return_execution ,
141136 ** kwargs ,
142137 )
143138
144- async def _run_new (
145- self , definition : Saga , context : Optional [SagaContext ] = None , user : Optional [UUID ] = None , ** kwargs
146- ) -> Union [UUID , SagaExecution ]:
139+ @staticmethod
140+ async def _create (definition : Saga , context : Optional [SagaContext ], user : Optional [UUID ]) -> SagaExecution :
147141 if REQUEST_USER_CONTEXT_VAR .get () is not None :
148142 if user is not None :
149143 warnings .warn ("The `user` Argument will be ignored in favor of the `user` ContextVar" , RuntimeWarning )
150144 user = REQUEST_USER_CONTEXT_VAR .get ()
151145
152- execution = SagaExecution .from_definition (definition , context = context , user = user )
153- return await self ._run (execution , ** kwargs )
146+ return SagaExecution .from_definition (definition , context = context , user = user )
154147
155- async def _load_and_run (self , response : SagaResponse , ** kwargs ) -> Union [UUID , SagaExecution ]:
156- execution = await self .storage .load (response .uuid )
157- return await self ._run (execution , response = response , ** kwargs )
148+ async def _load (self , response : SagaResponse ) -> Union [UUID , SagaExecution ]:
149+ return await self .storage .load (response .uuid )
158150
159151 async def _run (
160152 self ,
@@ -197,9 +189,11 @@ def _update_request_headers(execution: SagaExecution) -> None:
197189 headers ["related_services" ] = "," .join (related_services )
198190
199191 @staticmethod
200- async def _run_with_pause_on_disk (execution : SagaExecution , autocommit : bool = True , ** kwargs ) -> None :
192+ async def _run_with_pause_on_disk (
193+ execution : SagaExecution , response : Optional [SagaResponse ] = None , autocommit : bool = True , ** kwargs
194+ ) -> None :
201195 try :
202- await execution .execute (autocommit = False , ** kwargs )
196+ await execution .execute (autocommit = False , response = response , ** kwargs )
203197 if autocommit :
204198 await execution .commit (** kwargs )
205199 except SagaPausedExecutionStepException :
0 commit comments