1+ """Saga definitions module."""
2+
13from __future__ import (
24 annotations ,
35)
46
57import warnings
8+ from collections .abc import (
9+ Iterable ,
10+ )
11+ from inspect import (
12+ getmembers ,
13+ )
14+ from operator import (
15+ attrgetter ,
16+ )
617from typing import (
718 Any ,
8- Iterable ,
919 Optional ,
10- Type ,
20+ Protocol ,
1121 TypeVar ,
1222 Union ,
23+ runtime_checkable ,
24+ )
25+
26+ from cached_property import (
27+ cached_property ,
1328)
1429
1530from ..exceptions import (
1631 AlreadyCommittedException ,
1732 AlreadyOnSagaException ,
1833 EmptySagaException ,
34+ OrderPrecedenceException ,
1935 SagaNotCommittedException ,
2036)
2137from .operations import (
2642 LocalSagaStep ,
2743 RemoteSagaStep ,
2844 SagaStep ,
45+ SagaStepDecoratorWrapper ,
2946)
3047from .types import (
3148 LocalCallback ,
3249 RequestCallBack ,
3350)
3451
3552
53+ @runtime_checkable
54+ class SagaDecoratorWrapper (Protocol ):
55+ """Saga Decorator Wrapper class."""
56+
57+ meta : SagaDecoratorMeta
58+
59+
60+ class SagaDecoratorMeta :
61+ """Saga Decorator Meta class."""
62+
63+ _inner : type
64+ _definition : Saga
65+
66+ def __init__ (self , func : type , saga : Saga ):
67+ self ._inner = func
68+ self ._definition = saga
69+
70+ @cached_property
71+ def definition (self ) -> Saga :
72+ """Get the saga definition.
73+
74+ :return: A ``Saga`` instance.
75+ """
76+ steps = getmembers (self ._inner , predicate = lambda x : isinstance (x , SagaStepDecoratorWrapper ))
77+ steps = list (map (lambda member : member [1 ].meta .definition , steps ))
78+ for step in steps :
79+ if step .order is None :
80+ raise OrderPrecedenceException (f"The { step !r} step does not have 'order' value." )
81+ steps .sort (key = attrgetter ("order" ))
82+
83+ for step in steps :
84+ self ._definition .add_step (step )
85+ self ._definition .commit ()
86+
87+ return self ._definition
88+
89+
90+ TP = TypeVar ("TP" , bound = type )
91+
92+
3693class Saga :
3794 """Saga class.
3895
3996 The purpose of this class is to define a sequence of operations among microservices.
4097 """
4198
4299 # noinspection PyUnusedLocal
43- def __init__ (self , * args , steps : list [SagaStep ] = None , committed : bool = False , commit : None = None , ** kwargs ):
100+ def __init__ (
101+ self , * args , steps : list [SagaStep ] = None , committed : Optional [bool ] = None , commit : None = None , ** kwargs
102+ ):
103+ self .steps = list ()
104+ self .committed = False
105+
44106 if steps is None :
45107 steps = list ()
108+ for step in steps :
109+ self .add_step (step )
46110
47- self .steps = steps
111+ if committed is None :
112+ committed = len (steps )
48113 self .committed = committed
49114
50115 if commit is not None :
51116 warnings .warn (f"Commit callback is being deprecated. Use { self .local_step !r} instead" , DeprecationWarning )
52117 self .local_step (commit )
53118 self .committed = True
54119
120+ def __call__ (self , type_ : TP ) -> Union [TP , SagaDecoratorWrapper ]:
121+ """Decorate the given type.
122+
123+ :param type_: The type to be decorated.
124+ :return: The decorated type.
125+ """
126+ type_ .meta = SagaDecoratorMeta (type_ , self )
127+ return type_
128+
55129 @classmethod
56130 def from_raw (cls , raw : Union [dict [str , Any ], Saga ], ** kwargs ) -> Saga :
57131 """Build a new ``Saga`` instance from raw.
@@ -77,7 +151,7 @@ def conditional_step(self, step: Optional[ConditionalSagaStep] = None) -> Condit
77151 :param step: The step to be added. If `None` is provided then a new one will be created.
78152 :return: A ``SagaStep`` instance.
79153 """
80- return self ._add_step ( ConditionalSagaStep , step )
154+ return self .add_step ( step , ConditionalSagaStep )
81155
82156 def local_step (
83157 self , step : Optional [Union [LocalCallback , SagaOperation [LocalCallback ], LocalSagaStep ]] = None , ** kwargs
@@ -90,12 +164,12 @@ def local_step(
90164 """
91165 if step is not None and not isinstance (step , SagaStep ) and not isinstance (step , SagaOperation ):
92166 step = SagaOperation (step , ** kwargs )
93- return self ._add_step ( LocalSagaStep , step )
167+ return self .add_step ( step , LocalSagaStep )
94168
95169 def step (
96170 self , step : Optional [Union [RequestCallBack , SagaOperation [RequestCallBack ], RemoteSagaStep ]] = None , ** kwargs
97171 ) -> RemoteSagaStep :
98- """Add a new remote step step .
172+ """Add a new remote step.
99173
100174 :param step: The step to be added. If `None` is provided then a new one will be created.
101175 :param kwargs: Additional named parameters.
@@ -107,17 +181,23 @@ def step(
107181 def remote_step (
108182 self , step : Optional [Union [RequestCallBack , SagaOperation [RequestCallBack ], RemoteSagaStep ]] = None , ** kwargs
109183 ) -> RemoteSagaStep :
110- """Add a new remote step step .
184+ """Add a new remote step.
111185
112186 :param step: The step to be added. If `None` is provided then a new one will be created.
113187 :param kwargs: Additional named parameters.
114188 :return: A ``SagaStep`` instance.
115189 """
116190 if step is not None and not isinstance (step , SagaStep ) and not isinstance (step , SagaOperation ):
117191 step = SagaOperation (step , ** kwargs )
118- return self ._add_step (RemoteSagaStep , step )
192+ return self .add_step (step , RemoteSagaStep )
193+
194+ def add_step (self , step : Optional [Union [SagaOperation , T ]], step_cls : type [T ] = SagaStep ) -> T :
195+ """Add a new step.
119196
120- def _add_step (self , step_cls : Type [T ], step : Optional [Union [SagaOperation , T ]]) -> T :
197+ :param step: The step to be added.
198+ :param step_cls: The step class (for validation purposes).
199+ :return: The added step.
200+ """
121201 if self .committed :
122202 raise AlreadyCommittedException ("It is not possible to add more steps to an already committed saga." )
123203
@@ -129,9 +209,21 @@ def _add_step(self, step_cls: Type[T], step: Optional[Union[SagaOperation, T]])
129209 if step .saga is not None :
130210 raise AlreadyOnSagaException ()
131211 step .saga = self
212+
132213 else :
133214 step = step_cls (step , saga = self )
134215
216+ if step .order is None :
217+ if self .steps :
218+ step .order = self .steps [- 1 ].order + 1
219+ else :
220+ step .order = 1
221+
222+ if self .steps and step .order <= self .steps [- 1 ].order :
223+ raise OrderPrecedenceException (
224+ f"Unsatisfied precedence constraints. Previous: { self .steps [- 1 ].order } Current: { step .order } "
225+ )
226+
135227 self .steps .append (step )
136228 return step
137229
0 commit comments