3535class FormulaEngine :
3636 """A post-fix formula engine that operates on `Sample` receivers.
3737
38- Operators and metrics need to be pushed into the engine in in-fix order, and they
39- get rearranged into post-fix in the engine. This is done using the [Shunting yard
40- algorithm](https://en.wikipedia.org/wiki/Shunting_yard_algorithm).
41-
42- Example:
43- To create an engine that adds the latest entries from two receivers, the
44- following calls need to be made:
45-
46- ```python
47- engine = FormulaEngine()
48- engine.push_metric("metric_1", receiver_1)
49- engine.push_oper("+")
50- engine.push_metric("metric_2", receiver_2)
51- engine.finalize()
52- ```
53-
54- and then every call to `engine.apply()` would fetch a value from each receiver,
55- add the values and return the result.
38+ Use the `FormulaBuilder` to create `FormulaEngine` instances.
5639 """
5740
5841 def __init__ (
59- self ,
60- ) -> None :
61- """Create a `FormulaEngine` instance."""
62- self ._steps : List [FormulaStep ] = []
63- self ._build_stack : List [FormulaStep ] = []
64- self ._metric_fetchers : Dict [str , MetricFetcher ] = {}
65- self ._first_run = True
66-
67- def push_oper (self , oper : str ) -> None :
68- """Push an operator into the engine.
69-
70- Args:
71- oper: One of these strings - "+", "-", "*", "/", "(", ")"
72- """
73- if self ._build_stack and oper != "(" :
74- op_prec = _operator_precedence [oper ]
75- while self ._build_stack :
76- prev_step = self ._build_stack [- 1 ]
77- if op_prec <= _operator_precedence [repr (prev_step )]:
78- break
79- if oper == ")" and repr (prev_step ) == "(" :
80- self ._build_stack .pop ()
81- break
82- if repr (prev_step ) == "(" :
83- break
84- self ._steps .append (prev_step )
85- self ._build_stack .pop ()
86-
87- if oper == "+" :
88- self ._build_stack .append (Adder ())
89- elif oper == "-" :
90- self ._build_stack .append (Subtractor ())
91- elif oper == "*" :
92- self ._build_stack .append (Multiplier ())
93- elif oper == "/" :
94- self ._build_stack .append (Divider ())
95- elif oper == "(" :
96- self ._build_stack .append (OpenParen ())
97-
98- def push_metric (
99- self ,
100- name : str ,
101- data_stream : Receiver [Sample ],
102- nones_are_zeros : bool ,
42+ self , steps : List [FormulaStep ], metric_fetchers : Dict [str , MetricFetcher ]
10343 ) -> None :
104- """Push a metric receiver into the engine .
44+ """Create a `FormulaEngine` instance .
10545
10646 Args:
107- name: A name for the metric.
108- data_stream: A receiver to fetch this metric from.
109- nones_are_zeros: Whether to treat None values from the stream as 0s. If
110- False, the returned value will be a None.
111- """
112- fetcher = self ._metric_fetchers .setdefault (
113- name , MetricFetcher (name , data_stream , nones_are_zeros )
114- )
115- self ._steps .append (fetcher )
116-
117- def finalize (self ) -> None :
118- """Finalize the formula engine.
119-
120- This function must be called before calls to `apply` can be made.
47+ steps: Steps for the engine to execute, in post-fix order.
48+ metric_fetchers: Fetchers for each metric stream the formula depends on.
12149 """
122- while self ._build_stack :
123- self ._steps .append (self ._build_stack .pop ())
50+ self ._steps = steps
51+ self ._metric_fetchers = metric_fetchers
52+ self ._first_run = True
12453
125- async def synchronize_metric_timestamps (
54+ async def _synchronize_metric_timestamps (
12655 self , metrics : Set [asyncio .Task [Optional [Sample ]]]
12756 ) -> datetime :
12857 """Synchronize the metric streams.
@@ -191,7 +120,7 @@ async def apply(self) -> Sample:
191120 raise RuntimeError ("Some resampled metrics didn't arrive" )
192121
193122 if self ._first_run :
194- metric_ts = await self .synchronize_metric_timestamps (ready_metrics )
123+ metric_ts = await self ._synchronize_metric_timestamps (ready_metrics )
195124 else :
196125 res = next (iter (ready_metrics )).result ()
197126 assert res is not None
@@ -206,3 +135,96 @@ async def apply(self) -> Sample:
206135 raise RuntimeError ("Formula application failed." )
207136
208137 return Sample (metric_ts , eval_stack [0 ])
138+
139+
140+ class FormulaBuilder :
141+ """Builds a post-fix formula engine that operates on `Sample` receivers.
142+
143+ Operators and metrics need to be pushed in in-fix order, and they get rearranged
144+ into post-fix order. This is done using the [Shunting yard
145+ algorithm](https://en.wikipedia.org/wiki/Shunting_yard_algorithm).
146+
147+ Example:
148+ To create an engine that adds the latest entries from two receivers, the
149+ following calls need to be made:
150+
151+ ```python
152+ builder = FormulaBuilder()
153+ builder.push_metric("metric_1", receiver_1)
154+ builder.push_oper("+")
155+ builder.push_metric("metric_2", receiver_2)
156+ engine = builder.build()
157+ ```
158+
159+ and then every call to `engine.apply()` would fetch a value from each receiver,
160+ add the values and return the result.
161+ """
162+
163+ def __init__ (
164+ self ,
165+ ) -> None :
166+ """Create a `FormulaBuilder` instance."""
167+ self ._build_stack : List [FormulaStep ] = []
168+ self ._steps : List [FormulaStep ] = []
169+ self ._metric_fetchers : Dict [str , MetricFetcher ] = {}
170+
171+ def push_oper (self , oper : str ) -> None :
172+ """Push an operator into the engine.
173+
174+ Args:
175+ oper: One of these strings - "+", "-", "*", "/", "(", ")"
176+ """
177+ if self ._build_stack and oper != "(" :
178+ op_prec = _operator_precedence [oper ]
179+ while self ._build_stack :
180+ prev_step = self ._build_stack [- 1 ]
181+ if op_prec <= _operator_precedence [repr (prev_step )]:
182+ break
183+ if oper == ")" and repr (prev_step ) == "(" :
184+ self ._build_stack .pop ()
185+ break
186+ if repr (prev_step ) == "(" :
187+ break
188+ self ._steps .append (prev_step )
189+ self ._build_stack .pop ()
190+
191+ if oper == "+" :
192+ self ._build_stack .append (Adder ())
193+ elif oper == "-" :
194+ self ._build_stack .append (Subtractor ())
195+ elif oper == "*" :
196+ self ._build_stack .append (Multiplier ())
197+ elif oper == "/" :
198+ self ._build_stack .append (Divider ())
199+ elif oper == "(" :
200+ self ._build_stack .append (OpenParen ())
201+
202+ def push_metric (
203+ self ,
204+ name : str ,
205+ data_stream : Receiver [Sample ],
206+ nones_are_zeros : bool ,
207+ ) -> None :
208+ """Push a metric receiver into the engine.
209+
210+ Args:
211+ name: A name for the metric.
212+ data_stream: A receiver to fetch this metric from.
213+ nones_are_zeros: Whether to treat None values from the stream as 0s. If
214+ False, the returned value will be a None.
215+ """
216+ fetcher = self ._metric_fetchers .setdefault (
217+ name , MetricFetcher (name , data_stream , nones_are_zeros )
218+ )
219+ self ._steps .append (fetcher )
220+
221+ def build (self ) -> FormulaEngine :
222+ """Finalize and build the formula engine.
223+
224+ Returns:
225+ A `FormulaEngine` instance.
226+ """
227+ while self ._build_stack :
228+ self ._steps .append (self ._build_stack .pop ())
229+
230+ return FormulaEngine (self ._steps , self ._metric_fetchers )
0 commit comments