Skip to content

Commit fabc849

Browse files
Add data_normalisation_test and load_postgres_test
1 parent 39d1365 commit fabc849

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

tests/test_data_normalisation.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import csv
2+
import os
3+
import tempfile
4+
import builtins
5+
import pytest
6+
7+
import nifipulse.config as config
8+
from nifipulse.data_normalisation import process_data
9+
10+
11+
# -----------------------------
12+
# Helper to create temp CSV data
13+
# -----------------------------
14+
def create_csv(path, rows):
15+
fieldnames = ["timestamp", "instance", "metric_name", "component_name",
16+
"component_type", "component_id", "value"]
17+
18+
with open(path, "w", newline='', encoding='utf-8') as f:
19+
writer = csv.DictWriter(f, fieldnames=fieldnames)
20+
writer.writeheader()
21+
for r in rows:
22+
writer.writerow(r)
23+
24+
25+
# -----------------------------
26+
# TEST: process_data filters, maps, converts and writes results
27+
# -----------------------------
28+
def test_process_data(tmp_path, monkeypatch):
29+
# Temporary paths
30+
input_csv = tmp_path / "input.csv"
31+
output_csv = tmp_path / "clean.csv"
32+
33+
# Fake rows
34+
rows = [
35+
# 👎 should be filtered (value = 0 AND metric starts with nifi_amount)
36+
{
37+
"timestamp": "2025-01-01T00:00:00Z",
38+
"instance": "nifi1",
39+
"metric_name": "nifi_amount_flowfiles_received",
40+
"component_name": "ProcessorA",
41+
"component_type": "PROCESSOR",
42+
"component_id": "abc123",
43+
"value": "0"
44+
},
45+
# 👍 should be kept and mapped
46+
{
47+
"timestamp": "2025-01-01T00:01:00Z",
48+
"instance": "nifi1",
49+
"metric_name": "nifi_amount_bytes_read",
50+
"component_name": "ProcessorB",
51+
"component_type": "PROCESSOR",
52+
"component_id": "xyz999",
53+
"value": "150"
54+
}
55+
]
56+
57+
create_csv(input_csv, rows)
58+
59+
# Override config.env paths
60+
monkeypatch.setattr(config, "env", type("E", (), {
61+
"CSV_SINK": str(input_csv),
62+
"CLEAN_DATA": str(output_csv)
63+
}))
64+
65+
# Run the function
66+
process_data()
67+
68+
# ----------- Validate output CSV -----------
69+
assert output_csv.exists(), "Output file should be created"
70+
71+
with open(output_csv, encoding='utf-8') as f:
72+
reader = list(csv.DictReader(f))
73+
74+
# One row should remain
75+
assert len(reader) == 1
76+
77+
row = reader[0]
78+
79+
# Validate correct mapping
80+
assert row["metric_name"] == "bytes_read"
81+
82+
# Validate unit detection
83+
assert row["original_unit"] == "bytes"
84+
85+
# Validate numeric conversion
86+
assert row["value"] == "150.0"
87+
88+
# Validate ID propagation
89+
assert row["unique_id"] == "xyz999"
90+
91+
# Validate timestamp copy
92+
assert row["timestamp_utc"] == "2025-01-01T00:01:00Z"

tests/test_load_postgre.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import pytest
2+
import pandas as pd
3+
from unittest.mock import patch, MagicMock
4+
from nifipulse.load_postgres import load_postgres
5+
6+
7+
def test_load_postgres_success(tmp_path):
8+
9+
# 1. Create fake CSV
10+
csv_file = tmp_path / "sample.csv"
11+
csv_file.write_text(
12+
"timestamp_utc,instance,metric_name,original_unit,component_name,component_type,value\n"
13+
"2025-01-01 12:00:00,server1,cpu,%,cpu_component,system,45\n"
14+
)
15+
16+
# 2. Mock engine + conn
17+
mock_engine = MagicMock()
18+
mock_conn = MagicMock()
19+
mock_engine.begin.return_value.__enter__.return_value = mock_conn
20+
21+
# 3. Fake DataFrames for 5 read_sql calls
22+
fake_dim_instance = pd.DataFrame({
23+
"instance_id": [1],
24+
"instance_name": ["server1"]
25+
})
26+
27+
fake_dim_metric = pd.DataFrame({
28+
"metric_id": [10],
29+
"metric_name": ["cpu"],
30+
"original_unit": ["%"]
31+
})
32+
33+
fake_dim_component = pd.DataFrame({
34+
"component_id": [20],
35+
"component_name": ["cpu_component"],
36+
"component_type": ["system"]
37+
})
38+
39+
fake_dim_date = pd.DataFrame({
40+
"date_id": [100],
41+
"timestamp_utc": ["2025-01-01 12:00:00"]
42+
})
43+
44+
fake_fact_export = pd.DataFrame({
45+
"fact_id": [999],
46+
"timestamp_utc": ["2025-01-01 12:00:00"],
47+
"instance_name": ["server1"],
48+
"metric_name": ["cpu"],
49+
"original_unit": ["%"],
50+
"component_name": ["cpu_component"],
51+
"component_type": ["system"],
52+
"value": [45]
53+
})
54+
55+
read_sql_side_effect = [
56+
fake_dim_instance,
57+
fake_dim_metric,
58+
fake_dim_component,
59+
fake_dim_date,
60+
fake_fact_export # 5th call for export
61+
]
62+
63+
with patch("nifipulse.load_postgres.create_engine", return_value=mock_engine), \
64+
patch("nifipulse.load_postgres.pd.read_sql", side_effect=read_sql_side_effect):
65+
66+
load_postgres(str(csv_file))
67+
68+
# Assertions
69+
calls = [c.args[0].text for c in mock_conn.execute.call_args_list]
70+
71+
assert any("INSERT INTO dim_instance" in sql for sql in calls)
72+
assert any("INSERT INTO dim_metric" in sql for sql in calls)
73+
assert any("INSERT INTO dim_component" in sql for sql in calls)
74+
assert any("INSERT INTO dim_date" in sql for sql in calls)
75+
assert any("INSERT INTO fact_metrics" in sql for sql in calls)

0 commit comments

Comments
 (0)