99import logging
1010from abc import ABC
1111from collections import deque
12- from datetime import datetime
13- from math import isinf , isnan
1412from typing import (
1513 Callable ,
1614 Dict ,
1715 Generic ,
1816 List ,
1917 Optional ,
20- Set ,
2118 Tuple ,
2219 Type ,
2320 TypeVar ,
2926
3027from ..._internal ._asyncio import cancel_and_await
3128from .. import Sample , Sample3Phase
32- from .._quantities import QuantityT
29+ from .._quantities import Quantity , QuantityT
30+ from ._formula_evaluator import FormulaEvaluator
3331from ._formula_steps import (
3432 Adder ,
3533 Averager ,
5654}
5755
5856
59- class FormulaEvaluator (Generic [QuantityT ]):
60- """A post-fix formula evaluator that operates on `Sample` receivers."""
61-
62- def __init__ (
63- self ,
64- name : str ,
65- steps : List [FormulaStep ],
66- metric_fetchers : Dict [str , MetricFetcher [QuantityT ]],
67- create_method : Callable [[float ], QuantityT ],
68- ) -> None :
69- """Create a `FormulaEngine` instance.
70-
71- Args:
72- name: A name for the formula.
73- steps: Steps for the engine to execute, in post-fix order.
74- metric_fetchers: Fetchers for each metric stream the formula depends on.
75- create_method: A method to generate the output `Sample` value with. If the
76- formula is for generating power values, this would be
77- `Power.from_watts`, for example.
78- """
79- self ._name = name
80- self ._steps = steps
81- self ._metric_fetchers : Dict [str , MetricFetcher [QuantityT ]] = metric_fetchers
82- self ._first_run = True
83- self ._create_method : Callable [[float ], QuantityT ] = create_method
84-
85- async def _synchronize_metric_timestamps (
86- self , metrics : Set [asyncio .Task [Optional [Sample [QuantityT ]]]]
87- ) -> datetime :
88- """Synchronize the metric streams.
89-
90- For synchronised streams like data from the `ComponentMetricsResamplingActor`,
91- this a call to this function is required only once, before the first set of
92- inputs are fetched.
93-
94- Args:
95- metrics: The finished tasks from the first `fetch_next` calls to all the
96- `MetricFetcher`s.
97-
98- Returns:
99- The timestamp of the latest metric value.
100-
101- Raises:
102- RuntimeError: when some streams have no value, or when the synchronization
103- of timestamps fails.
104- """
105- metrics_by_ts : Dict [datetime , list [str ]] = {}
106- for metric in metrics :
107- result = metric .result ()
108- name = metric .get_name ()
109- if result is None :
110- raise RuntimeError (f"Stream closed for component: { name } " )
111- metrics_by_ts .setdefault (result .timestamp , []).append (name )
112- latest_ts = max (metrics_by_ts )
113-
114- # fetch the metrics with non-latest timestamps again until we have the values
115- # for the same ts for all metrics.
116- for metric_ts , names in metrics_by_ts .items ():
117- if metric_ts == latest_ts :
118- continue
119- while metric_ts < latest_ts :
120- for name in names :
121- fetcher = self ._metric_fetchers [name ]
122- next_val = await fetcher .fetch_next ()
123- assert next_val is not None
124- metric_ts = next_val .timestamp
125- if metric_ts > latest_ts :
126- raise RuntimeError (
127- "Unable to synchronize resampled metric timestamps, "
128- f"for formula: { self ._name } "
129- )
130- self ._first_run = False
131- return latest_ts
132-
133- async def apply (self ) -> Sample [QuantityT ]:
134- """Fetch the latest metrics, apply the formula once and return the result.
135-
136- Returns:
137- The result of the formula.
138-
139- Raises:
140- RuntimeError: if some samples didn't arrive, or if formula application
141- failed.
142- """
143- eval_stack : List [float ] = []
144- ready_metrics , pending = await asyncio .wait (
145- [
146- asyncio .create_task (fetcher .fetch_next (), name = name )
147- for name , fetcher in self ._metric_fetchers .items ()
148- ],
149- return_when = asyncio .ALL_COMPLETED ,
150- )
151-
152- if pending or any (res .result () is None for res in iter (ready_metrics )):
153- raise RuntimeError (
154- f"Some resampled metrics didn't arrive, for formula: { self ._name } "
155- )
156-
157- if self ._first_run :
158- metric_ts = await self ._synchronize_metric_timestamps (ready_metrics )
159- else :
160- sample = next (iter (ready_metrics )).result ()
161- assert sample is not None
162- metric_ts = sample .timestamp
163-
164- for step in self ._steps :
165- step .apply (eval_stack )
166-
167- # if all steps were applied and the formula was correct, there should only be a
168- # single value in the evaluation stack, and that would be the formula result.
169- if len (eval_stack ) != 1 :
170- raise RuntimeError (f"Formula application failed: { self ._name } " )
171-
172- res = eval_stack .pop ()
173- if isnan (res ) or isinf (res ):
174- return Sample (metric_ts , None )
175-
176- return Sample (metric_ts , self ._create_method (res ))
177-
178-
17957_CompositionType = Union [
18058 "FormulaEngine" ,
18159 "HigherOrderFormulaBuilder" ,
@@ -231,7 +109,7 @@ async def _stop(self) -> None:
231109
232110 def __add__ (
233111 self ,
234- other : _GenericEngine | _GenericHigherOrderBuilder ,
112+ other : _GenericEngine | _GenericHigherOrderBuilder | QuantityT ,
235113 ) -> _GenericHigherOrderBuilder :
236114 """Return a formula builder that adds (data in) `other` to `self`.
237115
@@ -246,7 +124,7 @@ def __add__(
246124 return self ._higher_order_builder (self , self ._create_method ) + other # type: ignore
247125
248126 def __sub__ (
249- self , other : _GenericEngine | _GenericHigherOrderBuilder
127+ self , other : _GenericEngine | _GenericHigherOrderBuilder | QuantityT
250128 ) -> _GenericHigherOrderBuilder :
251129 """Return a formula builder that subtracts (data in) `other` from `self`.
252130
@@ -261,7 +139,7 @@ def __sub__(
261139 return self ._higher_order_builder (self , self ._create_method ) - other # type: ignore
262140
263141 def __mul__ (
264- self , other : _GenericEngine | _GenericHigherOrderBuilder
142+ self , other : _GenericEngine | _GenericHigherOrderBuilder | float
265143 ) -> _GenericHigherOrderBuilder :
266144 """Return a formula builder that multiplies (data in) `self` with `other`.
267145
@@ -276,7 +154,7 @@ def __mul__(
276154 return self ._higher_order_builder (self , self ._create_method ) * other # type: ignore
277155
278156 def __truediv__ (
279- self , other : _GenericEngine | _GenericHigherOrderBuilder
157+ self , other : _GenericEngine | _GenericHigherOrderBuilder | float
280158 ) -> _GenericHigherOrderBuilder :
281159 """Return a formula builder that divides (data in) `self` by `other`.
282160
@@ -740,7 +618,11 @@ def __init__(
740618 self ._steps : deque [
741619 tuple [
742620 TokenType ,
743- FormulaEngine [QuantityT ] | FormulaEngine3Phase [QuantityT ] | str ,
621+ FormulaEngine [QuantityT ]
622+ | FormulaEngine3Phase [QuantityT ]
623+ | QuantityT
624+ | float
625+ | str ,
744626 ]
745627 ] = deque ()
746628 self ._steps .append ((TokenType .COMPONENT_METRIC , engine ))
@@ -754,12 +636,12 @@ def _push(
754636
755637 @overload
756638 def _push (
757- self , oper : str , other : _CompositionType3Phase
639+ self , oper : str , other : _CompositionType3Phase | QuantityT | float
758640 ) -> HigherOrderFormulaBuilder3Phase [QuantityT ]:
759641 ...
760642
761643 def _push (
762- self , oper : str , other : _CompositionType
644+ self , oper : str , other : _CompositionType | QuantityT | float
763645 ) -> (
764646 HigherOrderFormulaBuilder [QuantityT ]
765647 | HigherOrderFormulaBuilder3Phase [QuantityT ]
@@ -771,6 +653,19 @@ def _push(
771653 # pylint: disable=protected-access
772654 if isinstance (other , (FormulaEngine , FormulaEngine3Phase )):
773655 self ._steps .append ((TokenType .COMPONENT_METRIC , other ))
656+ elif isinstance (other , (Quantity , float )):
657+ match oper :
658+ case "+" | "-" :
659+ if not isinstance (other , Quantity ):
660+ raise RuntimeError (
661+ f"A Quantity must be provided for addition or subtraction to { other } "
662+ )
663+ case "*" | "/" :
664+ if not isinstance (other , (float , int )):
665+ raise RuntimeError (
666+ f"A float must be provided for scalar multiplication to { other } "
667+ )
668+ self ._steps .append ((TokenType .CONSTANT , other ))
774669 elif isinstance (other , _BaseHOFormulaBuilder ):
775670 self ._steps .append ((TokenType .OPER , "(" ))
776671 self ._steps .extend (other ._steps )
@@ -791,12 +686,12 @@ def __add__(
791686
792687 @overload
793688 def __add__ (
794- self , other : _CompositionType3Phase
689+ self , other : _CompositionType3Phase | QuantityT
795690 ) -> HigherOrderFormulaBuilder3Phase [QuantityT ]:
796691 ...
797692
798693 def __add__ (
799- self , other : _CompositionType
694+ self , other : _CompositionType | QuantityT
800695 ) -> (
801696 HigherOrderFormulaBuilder [QuantityT ]
802697 | HigherOrderFormulaBuilder3Phase [QuantityT ]
@@ -821,13 +716,13 @@ def __sub__(
821716
822717 @overload
823718 def __sub__ (
824- self , other : _CompositionType3Phase
719+ self , other : _CompositionType3Phase | QuantityT
825720 ) -> HigherOrderFormulaBuilder3Phase [QuantityT ]:
826721 ...
827722
828723 def __sub__ (
829724 self ,
830- other : _CompositionType ,
725+ other : _CompositionType | QuantityT ,
831726 ) -> (
832727 HigherOrderFormulaBuilder [QuantityT ]
833728 | HigherOrderFormulaBuilder3Phase [QuantityT ]
@@ -852,13 +747,13 @@ def __mul__(
852747
853748 @overload
854749 def __mul__ (
855- self , other : _CompositionType3Phase
750+ self , other : _CompositionType3Phase | float
856751 ) -> HigherOrderFormulaBuilder3Phase [QuantityT ]:
857752 ...
858753
859754 def __mul__ (
860755 self ,
861- other : _CompositionType ,
756+ other : _CompositionType | float ,
862757 ) -> (
863758 HigherOrderFormulaBuilder [QuantityT ]
864759 | HigherOrderFormulaBuilder3Phase [QuantityT ]
@@ -883,13 +778,13 @@ def __truediv__(
883778
884779 @overload
885780 def __truediv__ (
886- self , other : _CompositionType3Phase
781+ self , other : _CompositionType3Phase | float
887782 ) -> HigherOrderFormulaBuilder3Phase [QuantityT ]:
888783 ...
889784
890785 def __truediv__ (
891786 self ,
892- other : _CompositionType ,
787+ other : _CompositionType | float ,
893788 ) -> (
894789 HigherOrderFormulaBuilder [QuantityT ]
895790 | HigherOrderFormulaBuilder3Phase [QuantityT ]
@@ -935,6 +830,11 @@ def build(
935830 elif typ == TokenType .OPER :
936831 assert isinstance (value , str )
937832 builder .push_oper (value )
833+ elif typ == TokenType .CONSTANT :
834+ assert isinstance (value , (Quantity , float ))
835+ builder .push_constant (
836+ value .base_value if isinstance (value , Quantity ) else value
837+ )
938838 return builder .build ()
939839
940840
0 commit comments