Skip to content

Commit 9c607fc

Browse files
committed
Add FormulaChannel that allows composition of formulas
... into other formulas. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent 8d0ed27 commit 9c607fc

File tree

1 file changed

+296
-0
lines changed

1 file changed

+296
-0
lines changed

src/frequenz/sdk/timeseries/logical_meter/_formula_engine.py

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77

88
import asyncio
99
import logging
10+
import weakref
11+
from collections import deque
1012
from datetime import datetime
1113
from typing import Dict, List, Optional, Set, Tuple
14+
from uuid import UUID, uuid4
1215

1316
from frequenz.channels import Broadcast, Receiver
17+
from frequenz.channels._broadcast import Receiver as BroadcastReceiver
1418

1519
from .. import Sample
1620
from ._formula_steps import (
@@ -23,6 +27,7 @@
2327
OpenParen,
2428
Subtractor,
2529
)
30+
from ._tokenizer import TokenType
2631

2732
logger = logging.Logger(__name__)
2833

@@ -289,3 +294,294 @@ def build(self) -> FormulaEngine:
289294
self._steps.append(self._build_stack.pop())
290295

291296
return FormulaEngine(self._name, self._steps, self._metric_fetchers)
297+
298+
299+
class FormulaChannel(Broadcast[Sample]):
300+
"""A broadcast channel implementation for use with formulas."""
301+
302+
def __init__(
303+
self, name: str, engine: FormulaEngine, resend_latest: bool = False
304+
) -> None:
305+
"""Create a `FormulaChannel` instance.
306+
307+
Args:
308+
name: A name for the channel.
309+
engine: A FormulaEngine instance that produces values for this channel.
310+
resend_latest: Whether to resend latest channel values to newly created
311+
receivers, like in `Broadcast` channels.
312+
"""
313+
self._engine = engine
314+
super().__init__(name, resend_latest)
315+
316+
@property
317+
def engine(self) -> FormulaEngine:
318+
"""Return the formula engine attached to the channel.
319+
320+
Returns:
321+
A FormulaEngine instance.
322+
"""
323+
return self._engine
324+
325+
def new_receiver(
326+
self, name: Optional[str] = None, maxsize: int = 50
327+
) -> FormulaReceiver:
328+
"""Create a new FormulaReceiver for the channel.
329+
330+
This implementation is similar to `Broadcast.new_receiver()`, except that it
331+
creates and returns a `FormulaReceiver`. The way the default name for the
332+
receiver is constructed, is also slightly tweaked.
333+
334+
Args:
335+
name: An optional name for the receiver.
336+
maxsize: size of the receiver's buffer.
337+
338+
Returns:
339+
A `FormulaReceiver` instance attached to the `FormulaChannel`.
340+
"""
341+
uuid = uuid4()
342+
if name is None:
343+
name = self.name
344+
recv = FormulaReceiver(uuid, name, maxsize, self)
345+
self.receivers[uuid] = weakref.ReferenceType(recv)
346+
if self._resend_latest and self._latest is not None:
347+
recv.enqueue(self._latest)
348+
return recv
349+
350+
351+
class FormulaReceiver(BroadcastReceiver[Sample]):
352+
"""A receiver to receive calculated `Sample`s from a Formula channel.
353+
354+
They function as regular channel receivers, but can be composed to form higher order
355+
formulas.
356+
"""
357+
358+
def __init__(
359+
self,
360+
uuid: UUID,
361+
name: str,
362+
maxsize: int,
363+
chan: FormulaChannel,
364+
) -> None:
365+
"""Create a `FormulaReceiver` instance.
366+
367+
Args:
368+
uuid: uuid to uniquely identify the receiver. Forwarded to
369+
BroadcastReceiver's `__init__` function.
370+
name: Name for the receiver.
371+
maxsize: Buffer size for the receiver.
372+
chan: The `FormulaChannel` instance that this receiver is attached to.
373+
"""
374+
self._engine = chan.engine
375+
super().__init__(uuid, name, maxsize, chan)
376+
377+
@property
378+
def name(self) -> str:
379+
"""Name of the receiver.
380+
381+
Returns:
382+
Name of the receiver.
383+
"""
384+
return self._name
385+
386+
@property
387+
def engine(self) -> FormulaEngine:
388+
"""Return the formula engine attached to the receiver.
389+
390+
Returns:
391+
Formula Engine attached to the receiver.
392+
"""
393+
return self._engine
394+
395+
def _deactivate(self) -> None:
396+
self._active = False
397+
398+
def clone(self) -> FormulaReceiver:
399+
"""Create a new receiver from the formula engine.
400+
401+
Returns:
402+
New `FormulaReceiver` streaming a copy of the formula engine output.
403+
"""
404+
return self._engine.new_receiver()
405+
406+
def __add__(
407+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
408+
) -> HigherOrderFormulaBuilder:
409+
"""Return a formula builder that adds (data in) `other` to `self`.
410+
411+
Args:
412+
other: A formula receiver, or a formula builder instance corresponding to a
413+
sub-expression.
414+
415+
Returns:
416+
A formula builder that can take further expressions, or can be built
417+
into a formula engine.
418+
"""
419+
return HigherOrderFormulaBuilder(self) + other
420+
421+
def __sub__(
422+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
423+
) -> HigherOrderFormulaBuilder:
424+
"""Return a formula builder that subtracts (data in) `other` from `self`.
425+
426+
Args:
427+
other: A formula receiver, or a formula builder instance corresponding to a
428+
sub-expression.
429+
430+
Returns:
431+
A formula builder that can take further expressions, or can be built
432+
into a formula engine.
433+
"""
434+
return HigherOrderFormulaBuilder(self) - other
435+
436+
def __mul__(
437+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
438+
) -> HigherOrderFormulaBuilder:
439+
"""Return a formula builder that multiplies (data in) `self` with `other`.
440+
441+
Args:
442+
other: A formula receiver, or a formula builder instance corresponding to a
443+
sub-expression.
444+
445+
Returns:
446+
A formula builder that can take further expressions, or can be built
447+
into a formula engine.
448+
"""
449+
return HigherOrderFormulaBuilder(self) * other
450+
451+
def __truediv__(
452+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
453+
) -> HigherOrderFormulaBuilder:
454+
"""Return a formula builder that divides (data in) `self` by `other`.
455+
456+
Args:
457+
other: A formula receiver, or a formula builder instance corresponding to a
458+
sub-expression.
459+
460+
Returns:
461+
A formula builder that can take further expressions, or can be built
462+
into a formula engine.
463+
"""
464+
return HigherOrderFormulaBuilder(self) / other
465+
466+
467+
class HigherOrderFormulaBuilder:
468+
"""Provides a way to build formulas from the outputs of other formulas."""
469+
470+
def __init__(self, recv: FormulaReceiver) -> None:
471+
"""Create a `HigherOrderFormulaBuilder` instance.
472+
473+
Args:
474+
recv: A first input stream to create a builder with, so that python
475+
operators `+, -, *, /` can be used directly on newly created instances.
476+
"""
477+
self._steps: deque[tuple[TokenType, FormulaReceiver | str]] = deque()
478+
self._steps.append((TokenType.COMPONENT_METRIC, recv.clone()))
479+
recv._deactivate() # pylint: disable=protected-access
480+
self._engine = None
481+
482+
def _push(
483+
self, oper: str, other: FormulaReceiver | HigherOrderFormulaBuilder
484+
) -> HigherOrderFormulaBuilder:
485+
self._steps.appendleft((TokenType.OPER, "("))
486+
self._steps.append((TokenType.OPER, ")"))
487+
self._steps.append((TokenType.OPER, oper))
488+
489+
# pylint: disable=protected-access
490+
if isinstance(other, FormulaReceiver):
491+
self._steps.append((TokenType.COMPONENT_METRIC, other.clone()))
492+
other._deactivate()
493+
elif isinstance(other, HigherOrderFormulaBuilder):
494+
self._steps.append((TokenType.OPER, "("))
495+
self._steps.extend(other._steps)
496+
self._steps.append((TokenType.OPER, ")"))
497+
# pylint: enable=protected-access
498+
else:
499+
raise RuntimeError(f"Can't build a formula from: {other}")
500+
501+
return self
502+
503+
def __add__(
504+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
505+
) -> HigherOrderFormulaBuilder:
506+
"""Return a formula builder that adds (data in) `other` to `self`.
507+
508+
Args:
509+
other: A formula receiver, or a formula builder instance corresponding to a
510+
sub-expression.
511+
512+
Returns:
513+
A formula builder that can take further expressions, or can be built
514+
into a formula engine.
515+
"""
516+
return self._push("+", other)
517+
518+
def __sub__(
519+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
520+
) -> HigherOrderFormulaBuilder:
521+
"""Return a formula builder that subtracts (data in) `other` from `self`.
522+
523+
Args:
524+
other: A formula receiver, or a formula builder instance corresponding to a
525+
sub-expression.
526+
527+
Returns:
528+
A formula builder that can take further expressions, or can be built
529+
into a formula engine.
530+
"""
531+
return self._push("-", other)
532+
533+
def __mul__(
534+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
535+
) -> HigherOrderFormulaBuilder:
536+
"""Return a formula builder that multiplies (data in) `self` with `other`.
537+
538+
Args:
539+
other: A formula receiver, or a formula builder instance corresponding to a
540+
sub-expression.
541+
542+
Returns:
543+
A formula builder that can take further expressions, or can be built
544+
into a formula engine.
545+
"""
546+
return self._push("*", other)
547+
548+
def __truediv__(
549+
self, other: FormulaReceiver | HigherOrderFormulaBuilder
550+
) -> HigherOrderFormulaBuilder:
551+
"""Return a formula builder that divides (data in) `self` by `other`.
552+
553+
Args:
554+
other: A formula receiver, or a formula builder instance corresponding to a
555+
sub-expression.
556+
557+
Returns:
558+
A formula builder that can take further expressions, or can be built
559+
into a formula engine.
560+
"""
561+
return self._push("/", other)
562+
563+
def build(self, name: str, nones_are_zeros: bool = False) -> FormulaEngine:
564+
"""Create a formula engine from the builder.
565+
566+
Args:
567+
name: A name for the formula being built.
568+
nones_are_zeros: Whether `None`s in the input streams should be treated as
569+
zeros.
570+
571+
Returns:
572+
A `FormulaEngine` instance.
573+
"""
574+
if self._engine is not None:
575+
return self._engine
576+
577+
builder = FormulaBuilder(name)
578+
for step in self._steps:
579+
if step[0] == TokenType.COMPONENT_METRIC:
580+
assert isinstance(step[1], FormulaReceiver)
581+
builder.push_metric(step[1].name, step[1], nones_are_zeros)
582+
elif step[0] == TokenType.OPER:
583+
assert isinstance(step[1], str)
584+
builder.push_oper(step[1])
585+
self._engine = builder.build()
586+
587+
return self._engine

0 commit comments

Comments
 (0)