|
24 | 24 | logger = logging.getLogger(__name__) |
25 | 25 |
|
26 | 26 |
|
| 27 | +class EVChargerPoolError(Exception): |
| 28 | + """An error that occurred in any of the EVChargerPool methods.""" |
| 29 | + |
| 30 | + |
27 | 31 | class EVChargerPool: |
28 | 32 | """Interactions with EV Chargers.""" |
29 | 33 |
|
@@ -111,3 +115,50 @@ async def total_power(self) -> FormulaReceiver: |
111 | 115 | EVChargerPowerFormula, |
112 | 116 | FormulaGeneratorConfig(component_ids=self._component_ids), |
113 | 117 | ) |
| 118 | + |
| 119 | + async def current(self, component_id: int) -> FormulaReceiver3Phase: |
| 120 | + """Fetch the 3-phase current for the given ev_charger id. |
| 121 | +
|
| 122 | + Args: |
| 123 | + component_id: id of the ev charger to stream current values for. |
| 124 | +
|
| 125 | + Returns: |
| 126 | + A *new* receiver that will stream 3-phase current values for the given |
| 127 | + ev charger. |
| 128 | +
|
| 129 | + Raises: |
| 130 | + EVChargerPoolError: if the given component_id is not part of the pool. |
| 131 | + """ |
| 132 | + if component_id not in self._component_ids: |
| 133 | + raise EVChargerPoolError( |
| 134 | + f"{component_id=} is not part of the EVChargerPool" |
| 135 | + f" (with ids={self._component_ids})" |
| 136 | + ) |
| 137 | + return await self._formula_pool.from_generator( |
| 138 | + f"ev_charger_current_{component_id}", |
| 139 | + EVChargerCurrentFormula, |
| 140 | + FormulaGeneratorConfig(component_ids={component_id}), |
| 141 | + ) |
| 142 | + |
| 143 | + async def power(self, component_id: int) -> FormulaReceiver: |
| 144 | + """Fetch the power for the given ev_charger id. |
| 145 | +
|
| 146 | + Args: |
| 147 | + component_id: id of the ev charger to stream power values for. |
| 148 | +
|
| 149 | + Returns: |
| 150 | + A *new* receiver that will stream power values for the given ev charger. |
| 151 | +
|
| 152 | + Raises: |
| 153 | + EVChargerPoolError: if the given component_id is not part of the pool. |
| 154 | + """ |
| 155 | + if component_id not in self._component_ids: |
| 156 | + raise EVChargerPoolError( |
| 157 | + f"{component_id=} is not part of the EVChargerPool" |
| 158 | + f" (with ids={self._component_ids})" |
| 159 | + ) |
| 160 | + return await self._formula_pool.from_generator( |
| 161 | + f"ev_charger_current_{component_id}", |
| 162 | + EVChargerPowerFormula, |
| 163 | + FormulaGeneratorConfig(component_ids={component_id}), |
| 164 | + ) |
0 commit comments