|
6 | 6 | from __future__ import annotations |
7 | 7 |
|
8 | 8 | from collections.abc import Iterable |
9 | | -from typing import Callable |
10 | 9 |
|
11 | 10 | from frequenz.client.common.microgrid.components import ComponentId |
12 | 11 | from frequenz.client.microgrid.component import ( |
13 | | - BatteryInverter, |
14 | | - Chp, |
15 | 12 | Component, |
16 | 13 | ComponentConnection, |
17 | | - EvCharger, |
18 | 14 | GridConnectionPoint, |
19 | | - SolarInverter, |
20 | 15 | ) |
21 | 16 | from frequenz.microgrid_component_graph import ComponentGraph, InvalidGraphError |
22 | 17 |
|
23 | 18 |
|
24 | | -def is_pv_inverter(component: Component) -> bool: |
25 | | - """Check if the component is a PV inverter. |
26 | | -
|
27 | | - Args: |
28 | | - component: The component to check. |
29 | | -
|
30 | | - Returns: |
31 | | - `True` if the component is a PV inverter, `False` otherwise. |
32 | | - """ |
33 | | - return isinstance(component, SolarInverter) |
34 | | - |
35 | | - |
36 | | -def is_battery_inverter(component: Component) -> bool: |
37 | | - """Check if the component is a battery inverter. |
38 | | -
|
39 | | - Args: |
40 | | - component: The component to check. |
41 | | -
|
42 | | - Returns: |
43 | | - `True` if the component is a battery inverter, `False` otherwise. |
44 | | - """ |
45 | | - return isinstance(component, BatteryInverter) |
46 | | - |
47 | | - |
48 | | -def is_chp(component: Component) -> bool: |
49 | | - """Check if the component is a CHP. |
50 | | -
|
51 | | - Args: |
52 | | - component: The component to check. |
53 | | -
|
54 | | - Returns: |
55 | | - `True` if the component is a CHP, `False` otherwise. |
56 | | - """ |
57 | | - return isinstance(component, Chp) |
58 | | - |
59 | | - |
60 | | -def is_ev_charger(component: Component) -> bool: |
61 | | - """Check if the component is an EV charger. |
62 | | -
|
63 | | - Args: |
64 | | - component: The component to check. |
65 | | -
|
66 | | - Returns: |
67 | | - `True` if the component is an EV charger, `False` otherwise. |
68 | | - """ |
69 | | - return isinstance(component, EvCharger) |
70 | | - |
71 | | - |
72 | | -def is_battery_chain( |
73 | | - graph: ComponentGraph[Component, ComponentConnection, ComponentId], |
74 | | - component: Component, |
75 | | -) -> bool: |
76 | | - """Check if the specified component is part of a battery chain. |
77 | | -
|
78 | | - A component is part of a battery chain if it is either a battery inverter or a |
79 | | - battery meter. |
80 | | -
|
81 | | - Args: |
82 | | - graph: The component graph. |
83 | | - component: component to check. |
84 | | -
|
85 | | - Returns: |
86 | | - Whether the specified component is part of a battery chain. |
87 | | - """ |
88 | | - return is_battery_inverter(component) or graph.is_battery_meter(component.id) |
89 | | - |
90 | | - |
91 | | -def is_pv_chain( |
92 | | - graph: ComponentGraph[Component, ComponentConnection, ComponentId], |
93 | | - component: Component, |
94 | | -) -> bool: |
95 | | - """Check if the specified component is part of a PV chain. |
96 | | -
|
97 | | - A component is part of a PV chain if it is either a PV inverter or a PV |
98 | | - meter. |
99 | | -
|
100 | | - Args: |
101 | | - graph: The component graph. |
102 | | - component: component to check. |
103 | | -
|
104 | | - Returns: |
105 | | - Whether the specified component is part of a PV chain. |
106 | | - """ |
107 | | - return is_pv_inverter(component) or graph.is_pv_meter(component.id) |
108 | | - |
109 | | - |
110 | | -def is_ev_charger_chain( |
111 | | - graph: ComponentGraph[Component, ComponentConnection, ComponentId], |
112 | | - component: Component, |
113 | | -) -> bool: |
114 | | - """Check if the specified component is part of an EV charger chain. |
115 | | -
|
116 | | - A component is part of an EV charger chain if it is either an EV charger or an |
117 | | - EV charger meter. |
118 | | -
|
119 | | - Args: |
120 | | - graph: The component graph. |
121 | | - component: component to check. |
122 | | -
|
123 | | - Returns: |
124 | | - Whether the specified component is part of an EV charger chain. |
125 | | - """ |
126 | | - return is_ev_charger(component) or graph.is_ev_charger_meter(component.id) |
127 | | - |
128 | | - |
129 | | -def is_chp_chain( |
130 | | - graph: ComponentGraph[Component, ComponentConnection, ComponentId], |
131 | | - component: Component, |
132 | | -) -> bool: |
133 | | - """Check if the specified component is part of a CHP chain. |
134 | | -
|
135 | | - A component is part of a CHP chain if it is either a CHP or a CHP meter. |
136 | | -
|
137 | | - Args: |
138 | | - graph: The component graph. |
139 | | - component: component to check. |
140 | | -
|
141 | | - Returns: |
142 | | - Whether the specified component is part of a CHP chain. |
143 | | - """ |
144 | | - return is_chp(component) or graph.is_chp_meter(component.id) |
145 | | - |
146 | | - |
147 | | -def dfs( |
148 | | - graph: ComponentGraph[Component, ComponentConnection, ComponentId], |
149 | | - current_node: Component, |
150 | | - visited: set[Component], |
151 | | - condition: Callable[[Component], bool], |
152 | | -) -> set[Component]: |
153 | | - """ |
154 | | - Search for components that fulfill the condition in the Graph. |
155 | | -
|
156 | | - DFS is used for searching the graph. The graph traversal is stopped |
157 | | - once a component fulfills the condition. |
158 | | -
|
159 | | - Args: |
160 | | - graph: The component graph. |
161 | | - current_node: The current node to search from. |
162 | | - visited: The set of visited nodes. |
163 | | - condition: The condition function to check for. |
164 | | -
|
165 | | - Returns: |
166 | | - A set of component ids where the corresponding components fulfill |
167 | | - the condition function. |
168 | | - """ |
169 | | - if current_node in visited: |
170 | | - return set() |
171 | | - |
172 | | - visited.add(current_node) |
173 | | - |
174 | | - if condition(current_node): |
175 | | - return {current_node} |
176 | | - |
177 | | - component: set[Component] = set() |
178 | | - |
179 | | - for successor in graph.successors(current_node.id): |
180 | | - component.update(dfs(graph, successor, visited, condition)) |
181 | | - |
182 | | - return component |
183 | | - |
184 | | - |
185 | 19 | def find_first_descendant_component( |
186 | 20 | graph: ComponentGraph[Component, ComponentConnection, ComponentId], |
187 | 21 | *, |
|
0 commit comments