88import asyncio
99import logging
1010import weakref
11+ from abc import ABC
1112from collections import deque
1213from datetime import datetime
1314from math import isinf , isnan
14- from typing import Dict , List , Optional , Set , Tuple
15+ from typing import Dict , Generic , List , Optional , Set , Tuple , Type , TypeVar
1516from uuid import UUID , uuid4
1617
1718from frequenz .channels import Broadcast , Receiver
@@ -310,11 +311,24 @@ def build(self) -> FormulaEngine:
310311 return FormulaEngine (self ._name , self ._steps , self ._metric_fetchers )
311312
312313
313- class FormulaChannel (Broadcast [Sample ]):
314+ _GenericSample = TypeVar ("_GenericSample" )
315+ _GenericEngine = TypeVar ("_GenericEngine" )
316+ _GenericFormulaChannel = TypeVar ("_GenericFormulaChannel" )
317+ _GenericFormulaReceiver = TypeVar ("_GenericFormulaReceiver" )
318+ _GenericHOFormulaBuilder = TypeVar ("_GenericHOFormulaBuilder" )
319+
320+
321+ class _BaseFormulaChannel (
322+ Generic [_GenericSample , _GenericEngine ],
323+ Broadcast [_GenericSample ],
324+ ABC ,
325+ ):
314326 """A broadcast channel implementation for use with formulas."""
315327
328+ ReceiverType : Type [FormulaReceiver ]
329+
316330 def __init__ (
317- self , name : str , engine : FormulaEngine , resend_latest : bool = False
331+ self , name : str , engine : _GenericEngine , resend_latest : bool = False
318332 ) -> None :
319333 """Create a `FormulaChannel` instance.
320334
@@ -328,7 +342,7 @@ def __init__(
328342 super ().__init__ (name , resend_latest )
329343
330344 @property
331- def engine (self ) -> FormulaEngine :
345+ def engine (self ) -> _GenericEngine :
332346 """Return the formula engine attached to the channel.
333347
334348 Returns:
@@ -338,7 +352,7 @@ def engine(self) -> FormulaEngine:
338352
339353 def new_receiver (
340354 self , name : Optional [str ] = None , maxsize : int = 50
341- ) -> FormulaReceiver :
355+ ) -> _GenericFormulaReceiver :
342356 """Create a new FormulaReceiver for the channel.
343357
344358 This implementation is similar to `Broadcast.new_receiver()`, except that it
@@ -355,26 +369,32 @@ def new_receiver(
355369 uuid = uuid4 ()
356370 if name is None :
357371 name = self .name
358- recv = FormulaReceiver (uuid , name , maxsize , self )
372+ recv = self . ReceiverType (uuid , name , maxsize , self )
359373 self .receivers [uuid ] = weakref .ReferenceType (recv )
360374 if self ._resend_latest and self ._latest is not None :
361375 recv .enqueue (self ._latest )
362376 return recv
363377
364378
365- class FormulaReceiver (BroadcastReceiver [Sample ]):
379+ class _BaseFormulaReceiver (
380+ Generic [_GenericSample , _GenericEngine ],
381+ BroadcastReceiver [_GenericSample ],
382+ ABC ,
383+ ):
366384 """A receiver to receive calculated `Sample`s from a Formula channel.
367385
368386 They function as regular channel receivers, but can be composed to form higher order
369387 formulas.
370388 """
371389
390+ BuilderType : Type [HigherOrderFormulaBuilder ]
391+
372392 def __init__ (
373393 self ,
374394 uuid : UUID ,
375395 name : str ,
376396 maxsize : int ,
377- chan : FormulaChannel ,
397+ chan : _GenericFormulaChannel ,
378398 ) -> None :
379399 """Create a `FormulaReceiver` instance.
380400
@@ -398,15 +418,22 @@ def name(self) -> str:
398418 return self ._name
399419
400420 @property
401- def engine (self ) -> FormulaEngine :
421+ def engine (self ) -> _GenericEngine :
402422 """Return the formula engine attached to the receiver.
403423
404424 Returns:
405425 Formula Engine attached to the receiver.
406426 """
407427 return self ._engine
408428
409- def clone (self ) -> FormulaReceiver :
429+ # The use of `Self` is necessary for mypy to deduce the type of `clone` method in
430+ # the derived classes. With `from __future__ import annotations`, both CPython and
431+ # mypy accept this in python <= 3.10.
432+ #
433+ # Unfortunately pylint doesn't accept `Self` before python 3.11, even with `from
434+ # __future__ import annotations`. So the pylint `undefined-variable` check is
435+ # disabled to get `Self` to pass the checks.
436+ def clone (self ) -> Self : # pylint: disable=undefined-variable
410437 """Create a new receiver from the formula engine.
411438
412439 Returns:
@@ -415,8 +442,9 @@ def clone(self) -> FormulaReceiver:
415442 return self ._engine .new_receiver ()
416443
417444 def __add__ (
418- self , other : FormulaReceiver | HigherOrderFormulaBuilder
419- ) -> HigherOrderFormulaBuilder :
445+ self ,
446+ other : _BaseFormulaReceiver | _GenericHOFormulaBuilder ,
447+ ) -> _GenericHOFormulaBuilder :
420448 """Return a formula builder that adds (data in) `other` to `self`.
421449
422450 Args:
@@ -427,11 +455,12 @@ def __add__(
427455 A formula builder that can take further expressions, or can be built
428456 into a formula engine.
429457 """
430- return HigherOrderFormulaBuilder (self ) + other
458+ return self . BuilderType (self ) + other
431459
432460 def __sub__ (
433- self , other : FormulaReceiver | HigherOrderFormulaBuilder
434- ) -> HigherOrderFormulaBuilder :
461+ self ,
462+ other : _BaseFormulaReceiver | _GenericHOFormulaBuilder ,
463+ ) -> _GenericHOFormulaBuilder :
435464 """Return a formula builder that subtracts (data in) `other` from `self`.
436465
437466 Args:
@@ -442,11 +471,11 @@ def __sub__(
442471 A formula builder that can take further expressions, or can be built
443472 into a formula engine.
444473 """
445- return HigherOrderFormulaBuilder (self ) - other
474+ return self . BuilderType (self ) - other
446475
447476 def __mul__ (
448- self , other : FormulaReceiver | HigherOrderFormulaBuilder
449- ) -> HigherOrderFormulaBuilder :
477+ self , other : _BaseFormulaReceiver | _GenericHOFormulaBuilder
478+ ) -> _GenericHOFormulaBuilder :
450479 """Return a formula builder that multiplies (data in) `self` with `other`.
451480
452481 Args:
@@ -457,11 +486,11 @@ def __mul__(
457486 A formula builder that can take further expressions, or can be built
458487 into a formula engine.
459488 """
460- return HigherOrderFormulaBuilder (self ) * other
489+ return self . BuilderType (self ) * other
461490
462491 def __truediv__ (
463- self , other : FormulaReceiver | HigherOrderFormulaBuilder
464- ) -> HigherOrderFormulaBuilder :
492+ self , other : _BaseFormulaReceiver | _GenericHOFormulaBuilder
493+ ) -> _GenericHOFormulaBuilder :
465494 """Return a formula builder that divides (data in) `self` by `other`.
466495
467496 Args:
@@ -472,36 +501,40 @@ def __truediv__(
472501 A formula builder that can take further expressions, or can be built
473502 into a formula engine.
474503 """
475- return HigherOrderFormulaBuilder (self ) / other
504+ return self . BuilderType (self ) / other
476505
477506
478- class HigherOrderFormulaBuilder :
507+ class _BaseHOFormulaBuilder (
508+ ABC , Generic [_GenericFormulaReceiver , _GenericSample , _GenericEngine ]
509+ ):
479510 """Provides a way to build formulas from the outputs of other formulas."""
480511
481- def __init__ (self , recv : FormulaReceiver ) -> None :
482- """Create a `HigherOrderFormulaBuilder ` instance.
512+ def __init__ (self , recv : _GenericFormulaReceiver ) -> None :
513+ """Create a `GenericHigherOrderFormulaBuilder ` instance.
483514
484515 Args:
485516 recv: A first input stream to create a builder with, so that python
486517 operators `+, -, *, /` can be used directly on newly created instances.
487518 """
488- self ._steps : deque [tuple [TokenType , FormulaReceiver | str ]] = deque ()
519+ self ._steps : deque [tuple [TokenType , _GenericFormulaReceiver | str ]] = deque ()
489520 self ._steps .append ((TokenType .COMPONENT_METRIC , recv .clone ()))
490521 recv ._deactivate () # pylint: disable=protected-access
491522 self ._engine = None
492523
493524 def _push (
494- self , oper : str , other : FormulaReceiver | HigherOrderFormulaBuilder
495- ) -> HigherOrderFormulaBuilder :
525+ self ,
526+ oper : str ,
527+ other : _GenericFormulaReceiver | _GenericHOFormulaBuilder ,
528+ ) -> _GenericHOFormulaBuilder :
496529 self ._steps .appendleft ((TokenType .OPER , "(" ))
497530 self ._steps .append ((TokenType .OPER , ")" ))
498531 self ._steps .append ((TokenType .OPER , oper ))
499532
500533 # pylint: disable=protected-access
501- if isinstance (other , FormulaReceiver ):
534+ if isinstance (other , _BaseFormulaReceiver ):
502535 self ._steps .append ((TokenType .COMPONENT_METRIC , other .clone ()))
503536 other ._deactivate ()
504- elif isinstance (other , HigherOrderFormulaBuilder ):
537+ elif isinstance (other , _BaseHOFormulaBuilder ):
505538 self ._steps .append ((TokenType .OPER , "(" ))
506539 self ._steps .extend (other ._steps )
507540 self ._steps .append ((TokenType .OPER , ")" ))
@@ -512,8 +545,8 @@ def _push(
512545 return self
513546
514547 def __add__ (
515- self , other : FormulaReceiver | HigherOrderFormulaBuilder
516- ) -> HigherOrderFormulaBuilder :
548+ self , other : _GenericFormulaReceiver | _GenericHOFormulaBuilder
549+ ) -> _GenericHOFormulaBuilder :
517550 """Return a formula builder that adds (data in) `other` to `self`.
518551
519552 Args:
@@ -527,8 +560,8 @@ def __add__(
527560 return self ._push ("+" , other )
528561
529562 def __sub__ (
530- self , other : FormulaReceiver | HigherOrderFormulaBuilder
531- ) -> HigherOrderFormulaBuilder :
563+ self , other : _GenericFormulaReceiver | _GenericHOFormulaBuilder
564+ ) -> _GenericHOFormulaBuilder :
532565 """Return a formula builder that subtracts (data in) `other` from `self`.
533566
534567 Args:
@@ -542,8 +575,8 @@ def __sub__(
542575 return self ._push ("-" , other )
543576
544577 def __mul__ (
545- self , other : FormulaReceiver | HigherOrderFormulaBuilder
546- ) -> HigherOrderFormulaBuilder :
578+ self , other : _GenericFormulaReceiver | _GenericHOFormulaBuilder
579+ ) -> _GenericHOFormulaBuilder :
547580 """Return a formula builder that multiplies (data in) `self` with `other`.
548581
549582 Args:
@@ -557,8 +590,8 @@ def __mul__(
557590 return self ._push ("*" , other )
558591
559592 def __truediv__ (
560- self , other : FormulaReceiver | HigherOrderFormulaBuilder
561- ) -> HigherOrderFormulaBuilder :
593+ self , other : _GenericFormulaReceiver | _GenericHOFormulaBuilder
594+ ) -> _GenericHOFormulaBuilder :
562595 """Return a formula builder that divides (data in) `self` by `other`.
563596
564597 Args:
@@ -571,20 +604,44 @@ def __truediv__(
571604 """
572605 return self ._push ("/" , other )
573606
607+ def new_receiver (
608+ self , name : Optional [str ] = None , max_size : int = 50
609+ ) -> _GenericFormulaReceiver :
610+ """Get a new receiver from the corresponding engine.
611+
612+ Args:
613+ name: optional name for the receiver.
614+ max_size: size of the receiver's buffer.
615+
616+ Returns:
617+ A FormulaReceiver that streams formula output `Sample`s.
618+
619+ Raises:
620+ RuntimeError: If `build` hasn't been called yet.
621+ """
622+ if self ._engine is None :
623+ raise RuntimeError (
624+ "Please call `build()` first, before calls to `new_receiver()`"
625+ )
626+ return self ._engine .new_receiver (name , max_size )
627+
628+
629+ class HigherOrderFormulaBuilder (
630+ _BaseHOFormulaBuilder ["FormulaReceiver" , Sample , FormulaEngine ]
631+ ):
632+ """A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver`."""
633+
574634 def build (self , name : str , nones_are_zeros : bool = False ) -> FormulaEngine :
575- """Create a formula engine from the builder.
635+ """Build a `FormulaEngine` instance from the builder.
576636
577637 Args:
578- name: A name for the formula being built .
579- nones_are_zeros: Whether `None`s in the input streams should be treated as
580- zeros.
638+ name: A name for the newly generated formula .
639+ nones_are_zeros: whether `None` values in any of the input streams should be
640+ treated as zeros.
581641
582642 Returns:
583643 A `FormulaEngine` instance.
584644 """
585- if self ._engine is not None :
586- return self ._engine
587-
588645 builder = FormulaBuilder (name )
589646 for step in self ._steps :
590647 if step [0 ] == TokenType .COMPONENT_METRIC :
@@ -597,23 +654,14 @@ def build(self, name: str, nones_are_zeros: bool = False) -> FormulaEngine:
597654
598655 return self ._engine
599656
600- def new_receiver (
601- self , name : Optional [str ] = None , max_size : int = 50
602- ) -> FormulaReceiver :
603- """Get a new receiver from the corresponding engine.
604657
605- Args:
606- name: optional name for the receiver.
607- max_size: size of the receiver's buffer.
658+ class FormulaReceiver (_BaseFormulaReceiver [Sample , FormulaEngine ]):
659+ """A specialization of the _BaseFormulaChannel for `Sample` objects."""
608660
609- Returns:
610- A FormulaReceiver that streams formula output `Sample`s.
661+ BuilderType = HigherOrderFormulaBuilder
611662
612- Raises:
613- RuntimeError: If `build` hasn't been called yet.
614- """
615- if self ._engine is None :
616- raise RuntimeError (
617- "Please call `build()` first, before calls to `new_receiver()`"
618- )
619- return self ._engine .new_receiver (name , max_size )
663+
664+ class FormulaChannel (_BaseFormulaChannel [Sample , FormulaEngine ]):
665+ """A specialization of the _BaseFormulaChannel for `Sample` objects."""
666+
667+ ReceiverType = FormulaReceiver
0 commit comments