Skip to content

Commit e6bf512

Browse files
chore(release): v0.2.0 – CSV writer, partition_by, YAML config + autodiscovery
- add CSV writer with header and flush - support --partition-by (device/type/none) - add YAML config parsing (--config) with PyYAML - implement config autodiscovery (config.sensors.yaml, dummysensors.yaml, config.yaml) - bump version to 0.2.0
1 parent 3d3b435 commit e6bf512

File tree

12 files changed

+311
-33
lines changed

12 files changed

+311
-33
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ __pycache__/
33
*.py[codz]
44
*$py.class
55

6+
# config.sensors.yaml
7+
config.sensors.yaml
8+
69
# Visual Studio code
710
.vscode/
811

out/temp.jsonl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{"ts_ms": 0, "device_id": "engine-A", "sensor_id": "temp-0", "type": "temp", "value": 24.643698021231547}
2+
{"ts_ms": 500, "device_id": "engine-A", "sensor_id": "temp-0", "type": "temp", "value": 23.70217328027316}
3+
{"ts_ms": 1000, "device_id": "engine-A", "sensor_id": "temp-0", "type": "temp", "value": 21.464851782876178}
4+
{"ts_ms": 1500, "device_id": "engine-A", "sensor_id": "temp-0", "type": "temp", "value": 26.817176898799882}
5+
{"ts_ms": 2000, "device_id": "engine-A", "sensor_id": "temp-0", "type": "temp", "value": 17.562920097753118}

out/vibration.csv

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
ts_ms,device_id,sensor_id,type,value
2+
0,engine-A,vibration-0,vibration,0.09920709782168856
3+
500,engine-A,vibration-0,vibration,0.24727765238362118
4+
1000,engine-A,vibration-0,vibration,0.10491826983174454
5+
1500,engine-A,vibration-0,vibration,0.15830367289446529
6+
2000,engine-A,vibration-0,vibration,0.20296206525424756

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "dummysensors"
7-
version = "0.1.0"
7+
version = "0.2.0"
88
description = "Dummy sensor data generator for testing IoT/ML pipelines"
99
readme = "README.md"
1010
authors = [{ name = "Mateusz Dalke" }]

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pytest>=8.2
22
ruff>=0.6
33
build>=1.2
44
hatchling>=1.25
5+
pyyaml>=6.0

src/dummysensors/cli.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,23 @@
11
from __future__ import annotations
2-
import argparse
3-
import sys
2+
import argparse
43
import time
5-
import json
64
from .sensors import TemperatureSensor
75
from .orchestrator import run_stream
8-
import os
6+
from .io import jsonl_writer, csv_writer
97

108
def _stdout_writer():
11-
def _w(sample: dict):
12-
sys.stdout.write(json.dumps(sample) + "\n")
13-
return _w
9+
return jsonl_writer(None)
1410

1511
def _jsonl_file_writer(path: str):
16-
d = os.path.dirname(path)
17-
if d:
18-
os.makedirs(d, exist_ok=True)
19-
f = open(path, "a", buffering=1, encoding="utf-8")
20-
def _w(sample: dict):
21-
f.write(json.dumps(sample) + "\n")
22-
return _w
12+
return jsonl_writer(path)
13+
14+
def _csv_file_writer(path: str):
15+
return csv_writer(path)
2316

2417
def _make_writer_map(out_args: list[str] | None):
2518
"""
26-
out args format: ["temp=out/temp.jsonl", "vibration=out/vib.jsonl", "*=stdout"]
19+
out args format: ["temp=out/temp.jsonl", "vibration=out/vib.csv", "*=stdout"]
20+
wybór formatu po rozszerzeniu pliku: .jsonl -> JSONL, .csv -> CSV
2721
"""
2822
mapping = {}
2923
if not out_args:
@@ -35,14 +29,16 @@ def _make_writer_map(out_args: list[str] | None):
3529
if dest == "stdout":
3630
mapping[key] = _stdout_writer()
3731
else:
38-
mapping[key] = _jsonl_file_writer(dest)
32+
if dest.lower().endswith(".csv"):
33+
mapping[key] = _csv_file_writer(dest)
34+
else:
35+
mapping[key] = _jsonl_file_writer(dest)
3936
return mapping
4037

4138
def main(argv=None):
4239
p = argparse.ArgumentParser(prog="dummy-sensors")
4340
sub = p.add_subparsers(dest="cmd", required=True)
4441

45-
# generate
4642
gen = sub.add_parser("generate", help="single temperature stream to stdout/file")
4743
gen.add_argument("--rate", type=float, default=5.0)
4844
gen.add_argument("--duration", type=float, default=5.0)
@@ -52,13 +48,14 @@ def main(argv=None):
5248
gen.add_argument("--noise", type=float, default=0.5)
5349
gen.add_argument("--jsonl", type=str, default=None)
5450

55-
# run
56-
run = sub.add_parser("run", help="multi-device/multi-sensor run via spec")
51+
run = sub.add_parser("run", help="multi-device/multi-sensor run via spec or config")
5752
run.add_argument("--rate", type=float, default=5.0)
5853
run.add_argument("--duration", type=float, default=None)
5954
run.add_argument("--count", type=int, default=None)
60-
run.add_argument("--spec", type=str, required=True, help='e.g. "device=A: temp*2,vibration*1; device=B: temp*3"')
55+
run.add_argument("--spec", type=str, help='e.g. "device=A: temp*2,vibration*1"')
6156
run.add_argument("--out", action="append", help='mapping like "temp=out/temp.jsonl", "*=stdout"', default=None)
57+
run.add_argument("--partition-by", choices=["none","type","device"], default="none")
58+
run.add_argument("--config", type=str, help="YAML config path (overrides --spec/--out)")
6259

6360
args = p.parse_args(argv)
6461

@@ -77,8 +74,15 @@ def main(argv=None):
7774
while time.perf_counter() < t_next:
7875
time.sleep(min(0.002, t_next - time.perf_counter()))
7976
else:
80-
writers = _make_writer_map(args.out)
81-
run_stream(args.spec, rate_hz=args.rate, duration_s=args.duration, total_count=args.count, writer_for_type=writers)
77+
# jeśli podano --config, jedziemy konfiguracją YAML (sekcja 4)
78+
if args.config:
79+
from .config import run_from_config
80+
run_from_config(args.config)
81+
return
8282

83-
if __name__ == "__main__":
84-
main()
83+
# spec + out
84+
writers = _make_writer_map(args.out)
85+
if not args.spec:
86+
p.error("Provide --spec or --config")
87+
run_stream(args.spec, rate_hz=args.rate, duration_s=args.duration, total_count=args.count,
88+
writer_for_type=writers, partition_by=args.partition_by)

src/dummysensors/config.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from __future__ import annotations
2+
from typing import Callable, Dict
3+
from .orchestrator import run_stream
4+
from .io import jsonl_writer
5+
from .io import csv_writer
6+
from pathlib import Path
7+
8+
DEFAULT_CONFIG_CANDIDATES = (
9+
"config.sensors.yaml",
10+
"config.sensors.yml",
11+
"dummysensors.yaml",
12+
"dummysensors.yml",
13+
)
14+
15+
def find_config_path(start: str | None = None) -> str | None:
16+
"""Return first matching config file path from DEFAULT_CONFIG_CANDIDATES in `start` or CWD."""
17+
base = Path(start) if start else Path.cwd()
18+
for name in DEFAULT_CONFIG_CANDIDATES:
19+
cand = base / name
20+
if cand.exists():
21+
return str(cand)
22+
return None
23+
24+
def _writer_for_decl(decl: dict):
25+
fmt = str(decl.get("type", "jsonl")).lower()
26+
path = decl.get("path")
27+
if fmt == "csv":
28+
if not path:
29+
raise ValueError("CSV output requires 'path' in config.")
30+
return csv_writer(str(path))
31+
# default jsonl; if path is None -> stdout
32+
return jsonl_writer(path)
33+
34+
def run_from_config(path: str) -> None:
35+
try:
36+
import yaml # type: ignore
37+
except ImportError as e:
38+
raise RuntimeError("Install PyYAML to use --config (pip install pyyaml)") from e
39+
40+
with open(path, "r", encoding="utf-8") as f:
41+
cfg = yaml.safe_load(f) or {}
42+
43+
rate = float(cfg.get("rate", 5))
44+
duration = cfg.get("duration")
45+
count = cfg.get("count")
46+
47+
# outputs
48+
writers: Dict[str, Callable[[dict], None]] = {}
49+
for out in cfg.get("outputs", []):
50+
k = out.get("for", "*")
51+
writers[k] = _writer_for_decl(out)
52+
53+
# partitioning
54+
partition_by = cfg.get("partition_by", "none")
55+
56+
# build spec_str z drzewa devices
57+
# devices: [{id: "engine-A", sensors: [{kind:"temp", count:2}, ...]}, ...]
58+
parts = []
59+
for dev in cfg.get("devices", []):
60+
sid = dev["id"]
61+
items = []
62+
for s in dev.get("sensors", []):
63+
kind = s["kind"]
64+
cnt = int(s.get("count", 1))
65+
items.append(f"{kind}*{cnt}")
66+
parts.append(f"device={sid}: " + ",".join(items))
67+
spec_str = "; ".join(parts)
68+
69+
run_stream(
70+
spec_str,
71+
rate_hz=rate,
72+
duration_s=duration,
73+
total_count=count,
74+
writer_for_type=writers,
75+
partition_by=partition_by,
76+
)

src/dummysensors/io.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import os
2+
import json
3+
import csv
4+
import sys
5+
from typing import Callable
6+
from typing import Iterable
7+
8+
def _ensure_dir(path: str) -> None:
9+
d = os.path.dirname(path)
10+
if d:
11+
os.makedirs(d, exist_ok=True)
12+
13+
def jsonl_writer(path: str | None) -> Callable[[dict], None]:
14+
if not path:
15+
# stdout
16+
return lambda rec: (sys.stdout.write(json.dumps(rec) + "\n"), None)[1]
17+
_ensure_dir(path)
18+
f = open(path, "a", encoding="utf-8", buffering=1)
19+
return lambda rec: (f.write(json.dumps(rec) + "\n"), None)[1]
20+
21+
def csv_writer(path, header=("ts_ms","device_id","sensor_id","type","value")):
22+
_ensure_dir(path)
23+
f = open(path, "w", newline="", encoding="utf-8", buffering=1)
24+
w = csv.writer(f)
25+
cols = list(header)
26+
w.writerow(cols)
27+
f.flush()
28+
os.fsync(f.fileno())
29+
30+
def _write(rec: dict) -> None:
31+
w.writerow([rec.get(c, "") for c in cols])
32+
f.flush()
33+
os.fsync(f.fileno())
34+
35+
return _write

src/dummysensors/orchestrator.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ def run_stream(
1212
duration_s: float | None,
1313
total_count: int | None,
1414
writer_for_type: dict[str, Callable[[dict], None]],
15+
partition_by: str = "none", # <--- NEW
1516
):
1617
period = 1.0 / rate_hz if rate_hz > 0 else 0.0
1718
devices = parse_spec(spec_str)
1819

19-
instances: list[tuple[str, str, str, object]] = [] # (device_id, sensor_id, type, sensor)
20+
instances: list[tuple[str, str, str, object]] = []
2021
for d in devices:
2122
idx = {}
2223
for s in d.sensors:
@@ -27,22 +28,38 @@ def run_stream(
2728
instances.append((d.id, sid, s.kind, sensor))
2829
idx[s.kind] += 1
2930

30-
# writer default
31-
default_writer = writer_for_type.get("*", lambda x: sys.stdout.write(json.dumps(x)+"\n"))
31+
default_writer = writer_for_type.get("*", lambda x: (sys.stdout.write(json.dumps(x)+"\n"), None)[1])
3232

33-
def get_writer(k: str):
34-
return writer_for_type.get(k, default_writer)
33+
# cache writerów per klucz (type/device/*)
34+
cache: dict[str, Callable[[dict], None]] = {}
35+
36+
def _key(rec: dict) -> str:
37+
if partition_by == "device":
38+
return rec["device_id"]
39+
if partition_by == "type":
40+
return rec["type"]
41+
return "*" # none
42+
43+
def _writer_for(rec: dict) -> Callable[[dict], None]:
44+
k = _key(rec)
45+
if k in cache:
46+
return cache[k]
47+
# jeżeli jest mapowanie po konkretnym typie, użyj go
48+
if k in writer_for_type:
49+
cache[k] = writer_for_type[k]
50+
else:
51+
cache[k] = default_writer
52+
return cache[k]
3553

36-
# main loop
3754
t0 = time.perf_counter()
3855
n = total_count if total_count is not None else int((duration_s or 0) * rate_hz)
3956
for i in range(n):
4057
t_s = time.perf_counter() - t0
4158
for dev, sid, kind, sensor in instances:
4259
val = sensor.read(t_s) # type: ignore[attr-defined]
4360
rec = {"ts_ms": int(t_s*1000), "device_id": dev, "sensor_id": sid, "type": kind, "value": val}
44-
get_writer(kind)(rec)
61+
_writer_for(rec)(rec)
4562
if period > 0:
4663
t_next = t0 + (i + 1) * period
4764
while time.perf_counter() < t_next:
48-
time.sleep(min(0.002, t_next - time.perf_counter()))
65+
time.sleep(min(0.002, t_next - time.perf_counter()))

tests/test_config.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import json
2+
import csv
3+
import os
4+
5+
from dummysensors.config import run_from_config
6+
7+
def test_config_yaml_creates_outputs(tmp_path):
8+
# prepare config
9+
cfg_path = tmp_path / "config.example.yaml"
10+
cfg_path.write_text(
11+
"""
12+
rate: 2
13+
count: 5
14+
partition_by: type
15+
16+
outputs:
17+
- type: jsonl
18+
for: temp
19+
path: temp.jsonl
20+
- type: csv
21+
for: vibration
22+
path: vibration.csv
23+
24+
devices:
25+
- id: engine-A
26+
sensors:
27+
- kind: temp
28+
count: 1
29+
- kind: vibration
30+
count: 1
31+
"""
32+
)
33+
34+
# run
35+
cwd = os.getcwd()
36+
os.chdir(tmp_path)
37+
try:
38+
run_from_config(str(cfg_path))
39+
finally:
40+
os.chdir(cwd)
41+
42+
# check outputs
43+
temp_file = tmp_path / "temp.jsonl"
44+
vib_file = tmp_path / "vibration.csv"
45+
46+
assert temp_file.exists()
47+
assert vib_file.exists()
48+
49+
# check JSONL
50+
with open(temp_file, encoding="utf-8") as f:
51+
lines = [json.loads(line) for line in f]
52+
assert all("value" in rec for rec in lines)
53+
54+
# check CSV
55+
with open(vib_file, newline="", encoding="utf-8") as f:
56+
reader = list(csv.reader(f))
57+
assert reader[0] == ["ts_ms","device_id","sensor_id","type","value"]
58+
assert len(reader) > 1

0 commit comments

Comments
 (0)