|
7 | 7 | import asyncio |
8 | 8 | from datetime import datetime |
9 | 9 | from pprint import pprint |
10 | | -from typing import AsyncGenerator |
| 10 | +from typing import AsyncIterator |
11 | 11 |
|
12 | 12 | import pandas as pd |
13 | 13 | from frequenz.client.common.metric import Metric |
|
18 | 18 | from frequenz.client.reporting._client import MetricSample |
19 | 19 |
|
20 | 20 |
|
21 | | -# pylint: disable=too-many-locals |
22 | | -async def main(microgrid_id: int, component_id: int) -> None: |
| 21 | +def main() -> None: |
| 22 | + """Parse arguments and run the client.""" |
| 23 | + parser = argparse.ArgumentParser() |
| 24 | + parser.add_argument( |
| 25 | + "--url", |
| 26 | + type=str, |
| 27 | + help="URL of the Reporting service", |
| 28 | + default="localhost:50051", |
| 29 | + ) |
| 30 | + parser.add_argument("--mid", type=int, help="Microgrid ID", required=True) |
| 31 | + parser.add_argument("--cid", type=int, help="Component ID", required=True) |
| 32 | + parser.add_argument( |
| 33 | + "--metrics", |
| 34 | + type=str, |
| 35 | + nargs="+", |
| 36 | + choices=[e.name for e in Metric], |
| 37 | + help="List of metrics to process", |
| 38 | + required=True, |
| 39 | + ) |
| 40 | + parser.add_argument( |
| 41 | + "--start", |
| 42 | + type=datetime.fromisoformat, |
| 43 | + help="Start datetime in YYYY-MM-DDTHH:MM:SS format", |
| 44 | + required=True, |
| 45 | + ) |
| 46 | + parser.add_argument( |
| 47 | + "--end", |
| 48 | + type=datetime.fromisoformat, |
| 49 | + help="End datetime in YYYY-MM-DDTHH:MM:SS format", |
| 50 | + required=True, |
| 51 | + ) |
| 52 | + parser.add_argument("--psize", type=int, help="Page size", default=100) |
| 53 | + parser.add_argument( |
| 54 | + "--display", choices=["iter", "df", "dict"], help="Display format", default="df" |
| 55 | + ) |
| 56 | + args = parser.parse_args() |
| 57 | + asyncio.run( |
| 58 | + run( |
| 59 | + args.mid, |
| 60 | + args.cid, |
| 61 | + args.metrics, |
| 62 | + args.start, |
| 63 | + args.end, |
| 64 | + page_size=args.psize, |
| 65 | + service_address=args.url, |
| 66 | + display=args.display, |
| 67 | + ) |
| 68 | + ) |
| 69 | + |
| 70 | + |
| 71 | +# pylint: disable=too-many-arguments |
| 72 | +async def run( |
| 73 | + microgrid_id: int, |
| 74 | + component_id: int, |
| 75 | + metric_names: list[str], |
| 76 | + start_dt: datetime, |
| 77 | + end_dt: datetime, |
| 78 | + page_size: int, |
| 79 | + service_address: str, |
| 80 | + display: str, |
| 81 | +) -> None: |
23 | 82 | """Test the ReportingClient. |
24 | 83 |
|
25 | 84 | Args: |
26 | | - microgrid_id: int |
27 | | - component_id: int |
| 85 | + microgrid_id: microgrid ID |
| 86 | + component_id: component ID |
| 87 | + metric_names: list of metric names |
| 88 | + start_dt: start datetime |
| 89 | + end_dt: end datetime |
| 90 | + page_size: page size |
| 91 | + service_address: service address |
| 92 | + display: display format |
| 93 | +
|
| 94 | + Raises: |
| 95 | + ValueError: if display format is invalid |
28 | 96 | """ |
29 | | - service_address = "localhost:50051" |
30 | 97 | client = ReportingClient(service_address) |
31 | 98 |
|
32 | | - microgrid_components = [(microgrid_id, [component_id])] |
33 | | - metrics = [ |
34 | | - Metric.DC_POWER, |
35 | | - Metric.DC_CURRENT, |
36 | | - ] |
37 | | - |
38 | | - start_dt = datetime.fromisoformat("2023-11-21T12:00:00.00+00:00") |
39 | | - end_dt = datetime.fromisoformat("2023-11-21T12:01:00.00+00:00") |
40 | | - |
41 | | - page_size = 10 |
42 | | - |
43 | | - print("########################################################") |
44 | | - print("Iterate over single metric generator") |
45 | | - |
46 | | - async for sample in client.iterate_single_metric( |
47 | | - microgrid_id=microgrid_id, |
48 | | - component_id=component_id, |
49 | | - metric=metrics[0], |
50 | | - start_dt=start_dt, |
51 | | - end_dt=end_dt, |
52 | | - page_size=page_size, |
53 | | - ): |
54 | | - print("Received:", sample) |
55 | | - |
56 | | - ########################################################################### |
57 | | - # |
58 | | - # The following code is experimental and demonstrates potential future |
59 | | - # usage of the ReportingClient. |
60 | | - # |
61 | | - ########################################################################### |
62 | | - |
63 | | - async def components_data_iter() -> AsyncGenerator[MetricSample, None]: |
64 | | - """Iterate over components data. |
65 | | -
|
66 | | - Yields: |
67 | | - Single metric sample |
| 99 | + metrics = [Metric[mn] for mn in metric_names] |
| 100 | + |
| 101 | + def data_iter() -> AsyncIterator[MetricSample]: |
| 102 | + """Iterate over single metric. |
| 103 | +
|
| 104 | + Just a wrapper around the client method for readability. |
| 105 | +
|
| 106 | + Returns: |
| 107 | + Iterator over single metric samples |
68 | 108 | """ |
69 | | - # pylint: disable=protected-access |
70 | | - async for page in client._iterate_components_data_pages( |
71 | | - microgrid_components=microgrid_components, |
| 109 | + return client.iterate_single_component( |
| 110 | + microgrid_id=microgrid_id, |
| 111 | + component_id=component_id, |
72 | 112 | metrics=metrics, |
73 | 113 | start_dt=start_dt, |
74 | 114 | end_dt=end_dt, |
75 | 115 | page_size=page_size, |
76 | | - ): |
77 | | - for entry in page.iterate_metric_samples(): |
78 | | - yield entry |
79 | | - |
80 | | - async def components_data_dict( |
81 | | - components_data_iter: AsyncGenerator[MetricSample, None] |
82 | | - ) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: |
83 | | - """Convert components data iterator into a single dict. |
84 | | -
|
85 | | - The nesting structure is: |
86 | | - { |
87 | | - microgrid_id: { |
88 | | - component_id: { |
89 | | - timestamp: { |
90 | | - metric: value |
91 | | - } |
| 116 | + ) |
| 117 | + |
| 118 | + if display == "iter": |
| 119 | + print("########################################################") |
| 120 | + print("Iterate over single metric generator") |
| 121 | + async for sample in data_iter(): |
| 122 | + print(sample) |
| 123 | + |
| 124 | + elif display == "dict": |
| 125 | + print("########################################################") |
| 126 | + print("Dumping all data as a single dict") |
| 127 | + dct = await iter_to_dict(data_iter()) |
| 128 | + pprint(dct) |
| 129 | + |
| 130 | + elif display == "df": |
| 131 | + print("########################################################") |
| 132 | + print("Turn data into a pandas DataFrame") |
| 133 | + data = [cd async for cd in data_iter()] |
| 134 | + df = pd.DataFrame(data).set_index("timestamp") |
| 135 | + # Set option to display all rows |
| 136 | + pd.set_option("display.max_rows", None) |
| 137 | + pprint(df) |
| 138 | + |
| 139 | + else: |
| 140 | + raise ValueError(f"Invalid display format: {display}") |
| 141 | + |
| 142 | + return |
| 143 | + |
| 144 | + |
| 145 | +async def iter_to_dict( |
| 146 | + components_data_iter: AsyncIterator[MetricSample], |
| 147 | +) -> dict[int, dict[int, dict[datetime, dict[Metric, float]]]]: |
| 148 | + """Convert components data iterator into a single dict. |
| 149 | +
|
| 150 | + The nesting structure is: |
| 151 | + { |
| 152 | + microgrid_id: { |
| 153 | + component_id: { |
| 154 | + timestamp: { |
| 155 | + metric: value |
92 | 156 | } |
93 | 157 | } |
94 | 158 | } |
| 159 | + } |
95 | 160 |
|
96 | | - Args: |
97 | | - components_data_iter: async generator |
98 | | -
|
99 | | - Returns: |
100 | | - Single dict with with all components data |
101 | | - """ |
102 | | - ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} |
103 | | - |
104 | | - async for ts, mid, cid, met, value in components_data_iter: |
105 | | - if mid not in ret: |
106 | | - ret[mid] = {} |
107 | | - if cid not in ret[mid]: |
108 | | - ret[mid][cid] = {} |
109 | | - if ts not in ret[mid][cid]: |
110 | | - ret[mid][cid][ts] = {} |
111 | | - |
112 | | - ret[mid][cid][ts][met] = value |
| 161 | + Args: |
| 162 | + components_data_iter: async generator |
113 | 163 |
|
114 | | - return ret |
| 164 | + Returns: |
| 165 | + Single dict with with all components data |
| 166 | + """ |
| 167 | + ret: dict[int, dict[int, dict[datetime, dict[Metric, float]]]] = {} |
115 | 168 |
|
116 | | - print("########################################################") |
117 | | - print("Iterate over generator") |
118 | | - async for msample in components_data_iter(): |
119 | | - print("Received:", msample) |
| 169 | + async for ts, mid, cid, met, value in components_data_iter: |
| 170 | + if mid not in ret: |
| 171 | + ret[mid] = {} |
| 172 | + if cid not in ret[mid]: |
| 173 | + ret[mid][cid] = {} |
| 174 | + if ts not in ret[mid][cid]: |
| 175 | + ret[mid][cid][ts] = {} |
120 | 176 |
|
121 | | - print("########################################################") |
122 | | - print("Dumping all data as a single dict") |
123 | | - dct = await components_data_dict(components_data_iter()) |
124 | | - pprint(dct) |
| 177 | + ret[mid][cid][ts][met] = value |
125 | 178 |
|
126 | | - print("########################################################") |
127 | | - print("Turn data into a pandas DataFrame") |
128 | | - data = [cd async for cd in components_data_iter()] |
129 | | - df = pd.DataFrame(data).set_index("timestamp") |
130 | | - pprint(df) |
| 179 | + return ret |
131 | 180 |
|
132 | 181 |
|
133 | 182 | if __name__ == "__main__": |
134 | | - parser = argparse.ArgumentParser() |
135 | | - parser.add_argument("microgrid_id", type=int, help="Microgrid ID") |
136 | | - parser.add_argument("component_id", type=int, help="Component ID") |
137 | | - |
138 | | - args = parser.parse_args() |
139 | | - asyncio.run(main(args.microgrid_id, args.component_id)) |
| 183 | + main() |
0 commit comments