-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsma_exporter.py
More file actions
613 lines (492 loc) · 23.4 KB
/
sma_exporter.py
File metadata and controls
613 lines (492 loc) · 23.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
#!/usr/bin/env python3
"""
sma_exporter.py — Prometheus metrics scraper for SMA Sunny Boy inverters
=========================================================================
Zero external dependencies. Pure Python stdlib. Works on any Python 3.6+.
Speaks SMA Speedwire(TM) UDP on port 9522.
USAGE:
python3 sma_exporter.py --host 192.168.1.159 --password YOURPASSWORD
python3 sma_exporter.py --host 192.168.1.159 --password YOURPASSWORD --debug
# Multiple inverters, write node_exporter textfile
python3 sma_exporter.py \
--host 192.168.1.100 --name east --password PASS \
--host 192.168.1.101 --name west --password PASS \
--textfile /var/lib/node_exporter/textfile_collector/sma.prom
# Long-running HTTP server
python3 sma_exporter.py --host 192.168.1.159 --password PASS \
--serve 127.0.0.1:9779
CRON (textfile mode):
*/5 * * * * /usr/bin/python3 /usr/local/bin/sma_exporter.py \
--host 192.168.1.159 --password PASS \
--textfile /var/lib/node_exporter/textfile_collector/sma.prom
RESPONSE PACKET FORMAT (from packet capture analysis):
[0..19] Outer frame header
[20..41] Inner data header (22 bytes)
[42..45] Command response word
[46..49] First register (echoed or normalised by inverter)
[50..53] Last register
[54..] Register records, one of two sizes:
Energy records (16 bytes): reg_id(4) + ts(4) + value(4) + pad(4)
Spot records (28 bytes): reg_id(4) + ts(4) + v*4(16) + flag(4)
The four repeated values are min/avg/max/current; we use v1.
Terminated by 0x00000000 end marker.
NOTE: the inverter returns empty 58-byte responses if the password is wrong
or the user account has no read permissions. Default user password is 0000;
if data was ever changed the installer password (different encoding) is needed.
STATUS CODES:
Inverter: 35=fault 303=off 307=ok 455=warning
Relay: 51=closed/connected 311=open/disconnected
jtodd@loligo.com 2026/03/20 with extensive help from Claude.ai
"""
from __future__ import annotations
import argparse
import logging
import os
import re
import socket
import struct
import sys
import textwrap
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, List, Optional, Tuple
log = logging.getLogger("sma_exporter")
# ---------------------------------------------------------------------------
# Speedwire protocol constants
# ---------------------------------------------------------------------------
SW_PORT = 9522
APP_ID = 125
APP_SERIAL = 0x3A000001
ANY_SUSYID = 0xFFFF
ANY_SERIAL = 0xFFFFFFFF
LOGIN_TIMEOUT = 900
SW_HEADER = bytes([0x53,0x4D,0x41,0x00, 0x00,0x04,0x02,0xA0, 0x00,0x00,0x00,0x01])
SW_ETH_SIG = bytes([0x00,0x10,0x60,0x65])
CMD_LOGIN = 0xFFFD040C
CMD_LOGOFF = 0xFFFD010E
LOGIN_ACK = 0xFFFD040D
NAN_U32 = 0xFFFFFFFF
NAN_S32 = 0x80000000
NAN_U64 = 0x8000000000000000
INVERTER_STATUS = {35: "fault", 303: "off", 307: "ok", 455: "warning", 1392: "ok"}
RELAY_STATUS = {51: "open", 311: "closed"}
# ---------------------------------------------------------------------------
# Register ID → (field_name, scale, description)
# Confirmed from live packet captures.
# scale: divide raw integer by this to get SI value.
# ---------------------------------------------------------------------------
REGISTER_MAP: Dict[int, Tuple[str, float, str]] = {
# Energy counters (16-byte records — confirmed from captures)
0x00260101: ("energy_total_wh", 1, "Total energy produced (Wh)"),
0x00262201: ("energy_today_wh", 1, "Energy produced today (Wh)"),
# Operating time (16-byte counter records)
0x00462E01: ("operating_time_s", 1, "Total operating time (s)"),
0x00462F01: ("feed_in_time_s", 1, "Total grid feed-in time (s)"),
# AC power — 0x40 prefix confirmed from real captures
0x40464001: ("ac_power_w", 1, "AC output power total (W)"),
0x40464101: ("ac_power_l1_w", 1, "AC power phase L1 (W)"),
0x40464201: ("ac_power_l2_w", 1, "AC power phase L2 (W)"),
0x40464301: ("ac_power_l3_w", 1, "AC power phase L3 (W)"),
# AC voltage — 0x00 prefix confirmed from real captures
0x00464801: ("ac_voltage_l1_v", 100, "AC voltage L1 (V)"),
0x00464901: ("ac_voltage_l2_v", 100, "AC voltage L2 (V)"),
0x00464A01: ("ac_voltage_l3_v", 100, "AC voltage L3 (V)"),
0x00464B01: ("ac_voltage_l12_v", 100, "AC voltage L1-L2 (V)"),
0x00464C01: ("ac_voltage_l23_v", 100, "AC voltage L2-L3 (V)"),
0x00464D01: ("ac_voltage_l31_v", 100, "AC voltage L3-L1 (V)"),
# AC current — 0x00 prefix confirmed from real captures
0x00465001: ("ac_current_l1_a", 1000, "AC current L1 (A)"),
0x00465101: ("ac_current_l2_a", 1000, "AC current L2 (A)"),
0x00465201: ("ac_current_l3_a", 1000, "AC current L3 (A)"),
# Grid frequency — 0x00 prefix confirmed
0x00465701: ("grid_freq_hz", 100, "Grid frequency (Hz)"),
# DC strings — 0x40 prefix confirmed from real captures
0x40251E01: ("dc_power1_w", 1, "DC power string 1 (W)"),
0x40251E02: ("dc_power2_w", 1, "DC power string 2 (W)"),
0x40451F01: ("dc_voltage1_v", 100, "DC voltage string 1 (V)"),
0x40451F02: ("dc_voltage2_v", 100, "DC voltage string 2 (V)"),
0x40452101: ("dc_current1_a", 1000, "DC current string 1 (A)"),
0x40452102: ("dc_current2_a", 1000, "DC current string 2 (A)"),
# Temperature — 0x40 prefix confirmed from real captures
0x40237701: ("inverter_temp_c", 100, "Inverter temperature (C)"),
# Status — 0x08 prefix confirmed from real captures
# relay_status raw value encodes status in low byte: 0x01000033 -> 0x33=51=closed
0x08214801: ("inverter_status", 1, "Inverter state code"),
0x08416401: ("relay_status_raw", 1, "Grid relay state (low byte: 51=closed/connected 311=open/disconnected)"),
# Rated power
0x00832A01: ("rated_power_w", 1, "Inverter rated/nameplate power (W)"),
}
# ---------------------------------------------------------------------------
# Query table — use broad ranges to catch all registers in each class.
# The inverter returns all registers it knows about within the range.
# ---------------------------------------------------------------------------
QUERIES = [
("energy", 0x54000200, 0x00260100, 0x002622FF),
("op_time", 0x51000200, 0x00462E00, 0x00462FFF),
("ac_power", 0x51000200, 0x00464000, 0x004642FF),
("ac_voltage", 0x51000200, 0x00464800, 0x004657FF), # includes current and freq
("dc_power", 0x53800200, 0x00251E00, 0x00251EFF),
("dc_voltage", 0x53800200, 0x00451F00, 0x004521FF),
("status", 0x51800200, 0x00214800, 0x002148FF),
("relay", 0x51800200, 0x00416400, 0x004164FF),
("temperature", 0x52000200, 0x00237700, 0x002377FF),
("rated_power", 0x58000200, 0x00832A00, 0x00832AFF),
]
# ---------------------------------------------------------------------------
# Frame builder
# ---------------------------------------------------------------------------
_seq = 1
def _next_seq():
global _seq; v = _seq | 0x8000; _seq = (_seq % 0x7FFF) + 1; return v
def _encode_pw(password: str) -> bytes:
raw = password.encode("ascii"); out = bytearray(12)
for i in range(12): out[i] = (0x88 + raw[i]) % 256 if i < len(raw) else 0x88
return bytes(out)
def _wrap(dh: bytes, pl: bytes) -> bytes:
inner = dh + pl; n = len(inner)
return SW_HEADER + struct.pack(">H", n) + SW_ETH_SIG + bytes([n//4, 0xA0]) + inner
def _dh(c1: bytes = b"\x00\x01", c2: bytes = b"\x00\x01") -> bytes:
return (struct.pack("<HI", ANY_SUSYID, ANY_SERIAL) + c1 +
struct.pack("<HI", APP_ID, APP_SERIAL) + c2 +
struct.pack("<IH", 0, _next_seq()))
def make_login_frame(password: str) -> bytes:
pl = (struct.pack("<III", CMD_LOGIN, 0x07, LOGIN_TIMEOUT) +
struct.pack("<I", int(time.time())) +
struct.pack("<I", 0) + _encode_pw(password) + struct.pack("<I", 0))
return _wrap(_dh(), pl)
def make_logoff_frame() -> bytes:
return _wrap(_dh(b"\x00\x03", b"\x00\x03"),
struct.pack("<III", CMD_LOGOFF, 0xFFFFFFFF, 0))
def make_query_frame(cmd: int, first: int, last: int) -> bytes:
return _wrap(_dh(b"\x00\x00", b"\x00\x00"),
struct.pack("<IIII", cmd, first, last, 0))
# ---------------------------------------------------------------------------
# Response parser — iterates through register records by ID
# ---------------------------------------------------------------------------
def _u32(data: bytes, offset: int) -> int:
return struct.unpack_from("<I", data, offset)[0]
def parse_records(data: bytes, host: str = "") -> Dict[str, float]:
"""
Parse all register records from a response packet.
Returns {field_name: value} for all known register IDs found.
"""
if len(data) < 58:
return {} # empty response (wrong password, or genuinely no data)
values: Dict[str, float] = {}
offset = 54
while offset + 4 <= len(data):
reg_id = _u32(data, offset)
if reg_id == 0:
break # end marker
if reg_id not in REGISTER_MAP:
# Unknown register — figure out record size and skip
# If next 4 bytes look like a timestamp (>1e9), it's either format
if offset + 8 <= len(data):
ts_candidate = _u32(data, offset + 4)
record_size = 28 if ts_candidate > 1_000_000_000 else 16
else:
break
log.debug(f"{host}: unknown reg_id={reg_id:#010x}, skipping {record_size} bytes")
offset += record_size
continue
field, scale, desc = REGISTER_MAP[reg_id]
# Determine record size: energy regs use 16-byte, spot regs use 28-byte
# Energy reg IDs: 0x002601xx, 0x002622xx
is_energy = (reg_id & 0xFFFF0000) in (0x00260000, 0x00262000, 0x00462000)
record_size = 16 if is_energy else 28
if offset + record_size > len(data):
break
# Value is at offset+8 in both record types
raw = _u32(data, offset + 8)
# Check NaN sentinels
if raw in (NAN_U32, NAN_S32):
log.debug(f"{host}: {field} = NaN")
offset += record_size
continue
# Sign-extend if needed
if scale != 1 and raw > 0x7FFFFFFF:
raw = raw - 0x100000000
# Relay status encodes state in the low byte of a packed value
if field == "relay_status_raw":
raw = raw & 0xFF
field = "relay_status"
values[field] = raw / scale
log.debug(f"{host}: {field} = {values[field]} (reg={reg_id:#010x} raw={raw})")
offset += record_size
return values
def parse_response(data: bytes, host: str = "") -> Optional[Tuple[str, Dict]]:
"""
Returns ("__login__", {}) for login ack
("data", {values}) for a data response
None for unrecognised packets
"""
if len(data) < 46:
return None
code = _u32(data, 42)
if code == LOGIN_ACK:
log.debug(f"{host}: login ack")
return ("__login__", {})
# Match against known query response codes
known_resp_codes = {q[1] | 0x01 for q in QUERIES}
if code not in known_resp_codes:
log.debug(f"{host}: unrecognised code {code:#010x}, len={len(data)}")
return None
values = parse_records(data, host)
log.debug(f"{host}: parsed {len(values)} values from response (code={code:#010x})")
return ("data", values)
# ---------------------------------------------------------------------------
# UDP query loop
# ---------------------------------------------------------------------------
RECV_TIMEOUT = 1.0
def query_inverter(host: str, password: str = "0000",
timeout: int = 15) -> Dict[str, float]:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(RECV_TIMEOUT)
sock.connect((host, SW_PORT))
result: Dict[str, float] = {}
queries_sent: set = set()
queries_acked: int = 0
logged_in = False
login_sent = 0.0
deadline = time.monotonic() + timeout
try:
log.debug(f"{host}: sending login")
sock.send(make_login_frame(password))
login_sent = time.monotonic()
while time.monotonic() < deadline:
if logged_in:
for (name, cmd, first, last) in QUERIES:
if name not in queries_sent:
log.debug(f"{host}: querying {name}")
sock.send(make_query_frame(cmd, first, last))
queries_sent.add(name)
if queries_acked >= len(QUERIES):
break
if not logged_in and time.monotonic() - login_sent > 3.0:
log.debug(f"{host}: retrying login")
sock.send(make_login_frame(password))
login_sent = time.monotonic()
try:
pkt = sock.recv(4096)
except socket.timeout:
continue
log.debug(f"{host}: recv {len(pkt)} bytes")
parsed = parse_response(pkt, host)
if parsed is None:
continue
name, values = parsed
if name == "__login__":
logged_in = True
continue
result.update(values)
queries_acked += 1
log.debug(f"{host}: response {queries_acked}/{len(QUERIES)}")
finally:
try: sock.send(make_logoff_frame())
except: pass
sock.close()
if not logged_in:
raise TimeoutError(
f"No login response from {host}:{SW_PORT} within {timeout}s. "
f"Check IP, password, and that UDP {SW_PORT} is reachable."
)
if not result:
log.warning(
f"{host}: logged in but got no data. "
f"Wrong password? (Empty 58-byte responses mean auth succeeded "
f"but this user account has no read permissions.)"
)
return result
# ---------------------------------------------------------------------------
# Prometheus output
# ---------------------------------------------------------------------------
METRIC_PREFIX = "sma_inverter"
def _prom_escape(v: str) -> str:
return v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def _safe(s: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", s)
class PrometheusWriter:
def __init__(self):
self._lines: List[str] = []
self._seen: set = set()
def add(self, name: str, value: float, labels: Dict[str, str],
help_text: str = "", mtype: str = "gauge"):
full = f"{METRIC_PREFIX}_{_safe(name)}"
if full not in self._seen:
if help_text:
self._lines.append(f"# HELP {full} {help_text}")
self._lines.append(f"# TYPE {full} {mtype}")
self._seen.add(full)
lstr = ",".join(
f'{k}="{_prom_escape(str(v))}"' for k, v in sorted(labels.items())
)
self._lines.append(f"{full}{{{lstr}}} {value}")
def render(self) -> str:
return "\n".join(self._lines) + "\n"
def build_output(results: List[Tuple[str, str, Optional[Dict]]]) -> str:
pw = PrometheusWriter()
for host, name, data in results:
if data is None:
continue
lbl = {"host": host, "name": name}
def g(f): return data.get(f)
# Energy
if (v := g("energy_total_wh")) is not None:
pw.add("energy_total_wh", v, lbl, "Total energy produced (Wh)", "counter")
if (v := g("energy_today_wh")) is not None:
pw.add("energy_today_wh", v, lbl, "Energy produced today (Wh)")
# AC power — emit total first, then all phases together
total_w = g("ac_power_w")
phase_powers = [g(f"ac_power_{ph}_w") for ph in ("l1","l2","l3")]
valid_phases = [(ph, v) for ph, v in zip(("l1","l2","l3"), phase_powers)
if v is not None]
if total_w is None and valid_phases:
total_w = sum(v for _, v in valid_phases)
if total_w is not None:
pw.add("ac_power_total_watts", total_w, lbl, "Total AC output power (W)")
for ph, v in valid_phases:
pw.add("ac_power_watts", v, {**lbl, "phase": ph},
"AC output power per phase (W)")
# AC voltage — all phases together, cleaner span labels
ph_label = {"l12": "l1_l2", "l23": "l2_l3", "l31": "l3_l1"}
for ph in ("l1", "l2", "l3", "l12", "l23", "l31"):
if (v := g(f"ac_voltage_{ph}_v")) is not None:
pw.add("ac_voltage_volts", v,
{**lbl, "phase": ph_label.get(ph, ph)},
"AC voltage (V)")
# AC current — all phases together
for ph in ("l1", "l2", "l3"):
if (v := g(f"ac_current_{ph}_a")) is not None:
pw.add("ac_current_amps", v, {**lbl, "phase": ph}, "AC current (A)")
# Grid frequency
if (v := g("grid_freq_hz")) is not None:
pw.add("grid_frequency_hz", v, lbl, "Grid frequency (Hz)")
# DC strings — emit all strings for each metric together
for metric, field_tpl, desc in [
("dc_power_watts", "dc_power{}_w", "DC power string {} (W)"),
("dc_voltage_volts", "dc_voltage{}_v", "DC voltage string {} (V)"),
("dc_current_amps", "dc_current{}_a", "DC current string {} (A)"),
]:
for s in ("1", "2"):
v = g(field_tpl.format(s))
if v is not None and v > 0:
pw.add(metric, v, {**lbl, "string": s}, desc.format(s))
# Operating hours
if (v := g("operating_time_s")) is not None:
pw.add("operating_time_seconds", v, lbl,
"Total inverter operating time (s)", "counter")
if (v := g("feed_in_time_s")) is not None:
pw.add("feed_in_time_seconds", v, lbl,
"Total grid feed-in time (s)", "counter")
# Rated power
if (v := g("rated_power_w")) is not None:
pw.add("rated_power_watts", v, lbl, "Inverter nameplate power (W)")
# Temperature
if (v := g("inverter_temp_c")) is not None:
pw.add("inverter_temp_celsius", v, lbl, "Inverter temperature (C)")
# Status
if (v := g("inverter_status")) is not None:
pw.add("inverter_status_code", v, lbl,
"Inverter operating state (307=ok 303=off 35=fault 455=warning)")
if (v := g("relay_status")) is not None:
pw.add("grid_relay_status_code", v, lbl,
"Grid relay/contactor state (51=closed/connected 311=open/disconnected)")
pw.add("scrape_timestamp_seconds", time.time(), {},
"Unix timestamp of last scrape")
return pw.render()
# ---------------------------------------------------------------------------
# Multi-inverter scrape
# ---------------------------------------------------------------------------
def scrape_all(hosts, names, passwords, timeout) -> str:
results = []; ok_map = {}
for i, host in enumerate(hosts):
name = names[i] if i < len(names) else f"inverter{i+1}"
password = passwords[i] if i < len(passwords) else passwords[0]
try:
data = query_inverter(host, password, timeout)
results.append((host, name, data)); ok_map[host] = 1.0
except Exception as exc:
results.append((host, name, None)); ok_map[host] = 0.0
sys.stderr.write(f"[WARN] {host}: {exc}\n")
output = build_output(results)
pw = PrometheusWriter()
for host, ok in ok_map.items():
pw.add("scrape_success", ok, {"host": host}, "1 if last scrape succeeded")
return output + pw.render()
# ---------------------------------------------------------------------------
# Output modes
# ---------------------------------------------------------------------------
def write_textfile(path: str, content: str):
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as fh: fh.write(content)
os.replace(tmp, path)
def serve_http(bind_host, bind_port, hosts, names, passwords, timeout):
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path not in ("/metrics", "/"):
self.send_response(404); self.end_headers(); return
try:
body = scrape_all(hosts, names, passwords, timeout)
b = body.encode()
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.send_header("Content-Length", str(len(b)))
self.end_headers(); self.wfile.write(b)
except Exception as exc:
self.send_response(500); self.end_headers()
self.wfile.write(str(exc).encode())
def log_message(self, fmt, *args): log.info(fmt % args)
sys.stderr.write(f"[sma_exporter] http://{bind_host}:{bind_port}/metrics\n")
HTTPServer((bind_host, bind_port), Handler).serve_forever()
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def resolve_bind(spec: str) -> Tuple[str, int]:
if ":" in spec:
h, p = spec.rsplit(":", 1); return (h or "127.0.0.1"), int(p)
return "127.0.0.1", int(spec)
def main():
p = argparse.ArgumentParser(
description="SMA Sunny Boy Prometheus exporter — pure stdlib, no dependencies.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=textwrap.dedent("""\
Examples:
%(prog)s --host 192.168.1.100 --password YOURPASS
%(prog)s --host 192.168.1.100 --password YOURPASS --debug
%(prog)s --host 192.168.1.100 --name east --password PASS \\
--host 192.168.1.101 --name west --password PASS \\
--textfile /var/lib/node_exporter/textfile_collector/sma.prom
%(prog)s --host 192.168.1.100 --password PASS --serve 127.0.0.1:9779
"""),
)
p.add_argument("--host", dest="hosts", action="append", default=[], metavar="IP")
p.add_argument("--name", dest="names", action="append", default=[], metavar="NAME")
p.add_argument("--password", dest="passwords", action="append", default=[], metavar="PASS")
p.add_argument("--timeout", type=int, default=15, metavar="SEC")
p.add_argument("--debug", action="store_true")
out = p.add_mutually_exclusive_group()
out.add_argument("--textfile", metavar="PATH")
out.add_argument("--serve", metavar="[HOST:]PORT")
args = p.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.WARNING,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
if not args.hosts:
p.error("At least one --host is required.")
hosts = args.hosts
names = args.names or [f"inverter{i+1}" for i in range(len(hosts))]
passwords = args.passwords or ["0000"]
if args.serve:
bh, bp = resolve_bind(args.serve)
serve_http(bh, bp, hosts, names, passwords, args.timeout)
return
output = scrape_all(hosts, names, passwords, args.timeout)
if args.textfile:
write_textfile(args.textfile, output)
sys.stderr.write(f"[sma_exporter] wrote {args.textfile}\n")
else:
sys.stdout.write(output)
if __name__ == "__main__":
main()