Skip to content

Commit 1bee61a

Browse files
Change battery status when component stopped sending data (#207)
Set timer for battery and inverter messages. If no message was received for some time, check timestamp and update battery status. Previously the timestamp was checked after received request result. Because of that BatteryStatus could not work alone (without instance that sends request result).
2 parents 82acdb9 + d7c3753 commit 1bee61a

File tree

4 files changed

+215
-75
lines changed

4 files changed

+215
-75
lines changed

examples/battery_status.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Example how to run BatteryPoolStatus as separate instance.
5+
6+
This is not needed for user but simplifies testing and debugging and understanding
7+
this feature.
8+
"""
9+
10+
import asyncio
11+
import logging
12+
13+
from frequenz.sdk import microgrid
14+
from frequenz.sdk.actor.power_distributing._battery_pool_status import BatteryPoolStatus
15+
from frequenz.sdk.microgrid.component import ComponentCategory
16+
17+
_logger = logging.getLogger(__name__)
18+
HOST = "microgrid.sandbox.api.frequenz.io" # it should be the host name.
19+
PORT = 61060
20+
21+
22+
async def main() -> None:
23+
"""Start BatteryPoolStatus to see how it works."""
24+
logging.basicConfig(
25+
level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s:%(message)s"
26+
)
27+
await microgrid.initialize(HOST, PORT)
28+
batteries = {
29+
bat.component_id
30+
for bat in microgrid.get().component_graph.components(
31+
component_category={ComponentCategory.BATTERY}
32+
)
33+
}
34+
35+
batteries_status = BatteryPoolStatus(
36+
battery_ids=batteries,
37+
max_data_age_sec=5,
38+
max_blocking_duration_sec=30,
39+
)
40+
41+
await batteries_status.join()
42+
43+
44+
asyncio.run(main())

src/frequenz/sdk/actor/power_distributing/_battery_pool_status.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ def __init__(
9494
self._current_status = BatteryStatus(working=set(), uncertain=set())
9595

9696
# Channel for sending results of requests to the batteries
97-
request_result_channel = Broadcast[SetPowerResult]("battery_request_status")
98-
self._request_result_sender = request_result_channel.new_sender()
97+
set_power_result_channel = Broadcast[SetPowerResult]("battery_request_status")
98+
self._set_power_result_sender = set_power_result_channel.new_sender()
9999

100100
self._batteries: Dict[str, BatteryStatusTracker] = {}
101101

@@ -112,7 +112,7 @@ def __init__(
112112
max_data_age_sec=max_data_age_sec,
113113
max_blocking_duration_sec=max_blocking_duration_sec,
114114
status_sender=channel.sender,
115-
request_result_receiver=request_result_channel.new_receiver(
115+
set_power_result_receiver=set_power_result_channel.new_receiver(
116116
f"battery_{battery_id}_request_status"
117117
),
118118
)
@@ -123,6 +123,17 @@ def __init__(
123123

124124
self._task = asyncio.create_task(self._run())
125125

126+
async def join(self) -> None:
127+
"""Await for the battery pool, and return when the task completes.
128+
129+
It will not terminate the program while BatteryPool is working.
130+
BatteryPool can be stopped with the `stop` method.
131+
This method is not needed in source code, because BatteryPool is owned
132+
by the internal code.
133+
It is needed only when user needs to run his own instance of the BatteryPool.
134+
"""
135+
await self._task
136+
126137
async def stop(self) -> None:
127138
"""Stop tracking batteries status."""
128139
await cancel_and_await(self._task)
@@ -178,7 +189,7 @@ async def update_status(
178189
succeed_batteries: Batteries that succeed request
179190
failed_batteries: Batteries that failed request
180191
"""
181-
await self._request_result_sender.send(
192+
await self._set_power_result_sender.send(
182193
SetPowerResult(succeed_batteries, failed_batteries)
183194
)
184195

src/frequenz/sdk/actor/power_distributing/_battery_status.py

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from frequenz.api.microgrid.common_pb2 import ErrorLevel
1717
from frequenz.api.microgrid.inverter_pb2 import ComponentState as InverterComponentState
1818
from frequenz.channels import Receiver, Sender
19-
from frequenz.channels.util import Select
19+
from frequenz.channels.util import Select, Timer
2020

2121
from frequenz.sdk._internal.asyncio import cancel_and_await
2222
from frequenz.sdk.microgrid import get as get_microgrid
@@ -62,8 +62,16 @@ class SetPowerResult:
6262
@dataclass
6363
class _ComponentStreamStatus:
6464
component_id: int
65+
"""Component id."""
66+
67+
data_recv_timer: Timer
68+
"""Timer that is set when no component data has been received for some time."""
69+
6570
last_msg_timestamp: datetime = datetime.now(tz=timezone.utc)
71+
"""Timestamp of the last message from the component."""
72+
6673
last_msg_correct: bool = False
74+
"""Flag whether last message was correct or not."""
6775

6876

6977
@dataclass
@@ -156,7 +164,7 @@ def __init__( # pylint: disable=too-many-arguments
156164
max_data_age_sec: float,
157165
max_blocking_duration_sec: float,
158166
status_sender: Sender[Status],
159-
request_result_receiver: Receiver[SetPowerResult],
167+
set_power_result_receiver: Receiver[SetPowerResult],
160168
) -> None:
161169
"""Create class instance.
162170
@@ -169,7 +177,7 @@ def __init__( # pylint: disable=too-many-arguments
169177
max_blocking_duration_sec: This value tell what should be the maximum
170178
timeout used for blocking failing component.
171179
status_sender: Channel to send status updates.
172-
request_result_receiver: Channel to receive results of the requests to the
180+
set_power_result_receiver: Channel to receive results of the requests to the
173181
components.
174182
175183
Raises:
@@ -185,14 +193,18 @@ def __init__( # pylint: disable=too-many-arguments
185193
if inverter_id is None:
186194
raise RuntimeError(f"Can't find inverter adjacent to battery: {battery_id}")
187195

188-
self._battery = _ComponentStreamStatus(battery_id)
189-
self._inverter = _ComponentStreamStatus(inverter_id)
196+
self._battery = _ComponentStreamStatus(
197+
battery_id, data_recv_timer=Timer(max_data_age_sec)
198+
)
199+
self._inverter = _ComponentStreamStatus(
200+
inverter_id, data_recv_timer=Timer(max_data_age_sec)
201+
)
190202

191203
# Select needs receivers that can be get in async way only.
192204
self._select = None
193205

194206
self._task = asyncio.create_task(
195-
self._run(status_sender, request_result_receiver)
207+
self._run(status_sender, set_power_result_receiver)
196208
)
197209

198210
@property
@@ -214,15 +226,15 @@ async def stop(self) -> None:
214226
async def _run(
215227
self,
216228
status_sender: Sender[Status],
217-
request_result_receiver: Receiver[SetPowerResult],
229+
set_power_result_receiver: Receiver[SetPowerResult],
218230
) -> None:
219-
"""Process data from the components and request_result_receiver.
231+
"""Process data from the components and set_power_result_receiver.
220232
221233
New status is send only when it change.
222234
223235
Args:
224236
status_sender: Channel to send status updates.
225-
request_result_receiver: Channel to receive results of the requests to the
237+
set_power_result_receiver: Channel to receive results of the requests to the
226238
components.
227239
"""
228240
api_client = get_microgrid().api_client
@@ -232,8 +244,10 @@ async def _run(
232244

233245
self._select = Select(
234246
battery=battery_receiver,
247+
battery_timer=self._battery.data_recv_timer,
248+
inverter_timer=self._inverter.data_recv_timer,
235249
inverter=inverter_receiver,
236-
request_result=request_result_receiver,
250+
set_power_result=set_power_result_receiver,
237251
)
238252

239253
while True:
@@ -255,6 +269,7 @@ def _update_status(self, select: Select) -> Optional[Status]:
255269
and self._no_critical_error(msg.inner)
256270
)
257271
self._battery.last_msg_timestamp = msg.inner.timestamp
272+
self._battery.data_recv_timer.reset()
258273

259274
elif msg := select.inverter:
260275
self._inverter.last_msg_correct = (
@@ -263,37 +278,42 @@ def _update_status(self, select: Select) -> Optional[Status]:
263278
and self._no_critical_error(msg.inner)
264279
)
265280
self._inverter.last_msg_timestamp = msg.inner.timestamp
281+
self._inverter.data_recv_timer.reset()
266282

267-
elif msg := select.request_result:
283+
elif msg := select.set_power_result:
268284
result: SetPowerResult = msg.inner
269285
if self.battery_id in result.succeed:
270286
self._blocking_status.unblock()
271287

272-
elif self.battery_id in result.failed:
273-
# check if component stopped sending data
274-
if self._battery.last_msg_correct and self._is_timestamp_outdated(
275-
self._battery.last_msg_timestamp
276-
):
277-
self._battery.last_msg_correct = False
278-
279-
if self._inverter.last_msg_correct and self._is_timestamp_outdated(
280-
self._inverter.last_msg_timestamp
281-
):
282-
self._inverter.last_msg_correct = False
283-
284-
component_correct = (
285-
self._battery.last_msg_correct and self._inverter.last_msg_correct
288+
elif (
289+
self.battery_id in result.failed
290+
and self._last_status != Status.NOT_WORKING
291+
):
292+
duration = self._blocking_status.block()
293+
294+
if duration > 0:
295+
_logger.warning(
296+
"battery %d failed last response. block it for %f sec",
297+
self.battery_id,
298+
duration,
299+
)
300+
elif msg := select.battery_timer:
301+
if self._battery.last_msg_correct:
302+
self._battery.last_msg_correct = False
303+
_logger.warning(
304+
"Battery %d stopped sending data, last timestamp: %s",
305+
self._battery.component_id,
306+
self._battery.last_msg_timestamp,
286307
)
287-
if component_correct:
288-
# if component is sending data, but request fails, then block it
289-
# for some time
290-
duration = self._blocking_status.block()
291-
if duration > 0:
292-
_logger.warning(
293-
"battery %d failed last response. block it for %f sec",
294-
self.battery_id,
295-
duration,
296-
)
308+
elif msg := select.inverter_timer:
309+
if self._inverter.last_msg_correct:
310+
self._inverter.last_msg_correct = False
311+
_logger.warning(
312+
"Inverter %d stopped sending data, last timestamp: %s",
313+
self._inverter.component_id,
314+
self._inverter.last_msg_timestamp,
315+
)
316+
297317
else:
298318
_logger.error("Unknown message returned from select")
299319

0 commit comments

Comments
 (0)