Skip to content

Commit 2cd745f

Browse files
committed
add tests for machine health data
1 parent 8df9ca0 commit 2cd745f

File tree

2 files changed

+279
-1
lines changed

2 files changed

+279
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,5 @@ jobs:
7979
8080
- name: Run tests
8181
run: |
82-
pip3 install pytest
82+
pip3 install pytest pandas
8383
pytest tests/

tests/test_machine_health_data.py

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
import pytest
2+
import pandas as pd
3+
import numpy as np
4+
from snowflake.connector.errors import ProgrammingError
5+
6+
def test_machine_health_metrics_table_exists(snowflake_conn):
7+
"""Test if the machine health metrics table exists and is accessible"""
8+
cursor = snowflake_conn.cursor()
9+
try:
10+
cursor.execute("SELECT 1 FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics LIMIT 1")
11+
result = cursor.fetchone()
12+
assert result is not None, "Machine health metrics table should exist and be accessible"
13+
finally:
14+
cursor.close()
15+
16+
def test_machine_health_metrics_columns(snowflake_conn):
17+
"""Test if all required columns are present in the table"""
18+
cursor = snowflake_conn.cursor()
19+
try:
20+
cursor.execute("DESC TABLE FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics")
21+
columns = {row[0].lower(): row[1] for row in cursor.fetchall()}
22+
23+
# Required columns based on UI usage
24+
required_columns = {
25+
'machine_id': str,
26+
'health_status': str,
27+
'failure_risk_score': float,
28+
'maintenance_recommendation': str
29+
}
30+
31+
for col, _ in required_columns.items():
32+
assert col in columns, f"Required column '{col}' not found in table"
33+
34+
finally:
35+
cursor.close()
36+
37+
def test_machine_health_metrics_data_types(snowflake_conn):
38+
"""Test if the data in the table has the expected types and constraints"""
39+
cursor = snowflake_conn.cursor()
40+
try:
41+
cursor.execute("SELECT * FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics LIMIT 10")
42+
columns = [desc[0].lower() for desc in cursor.description]
43+
data = cursor.fetchall()
44+
45+
if not data:
46+
pytest.skip("No data available in machine health metrics table")
47+
48+
df = pd.DataFrame(data, columns=columns)
49+
50+
# Type validations
51+
assert df['machine_id'].dtype == 'object', "machine_id should be string type"
52+
assert df['health_status'].dtype == 'object', "health_status should be string type"
53+
assert pd.to_numeric(df['failure_risk_score'], errors='coerce').notnull().all(), \
54+
"failure_risk_score should be numeric"
55+
assert df['maintenance_recommendation'].dtype == 'object', \
56+
"maintenance_recommendation should be string type"
57+
58+
# Value validations
59+
valid_health_statuses = {'HEALTHY', 'NEEDS_MAINTENANCE', 'CRITICAL'}
60+
assert set(df['health_status'].unique()).issubset(valid_health_statuses), \
61+
"health_status should only contain valid values"
62+
63+
# Convert percentage values (0-100) to decimal values (0-1)
64+
failure_risk_scores = df['failure_risk_score'].astype(float) / 100.0
65+
assert (failure_risk_scores >= 0).all() and (failure_risk_scores <= 1).all(), \
66+
"failure_risk_score should be between 0 and 100"
67+
68+
finally:
69+
cursor.close()
70+
71+
def test_data_completeness(snowflake_conn):
72+
"""Test for data completeness - no nulls and all machines have records"""
73+
cursor = snowflake_conn.cursor()
74+
try:
75+
# Check for NULL values in critical columns using CASE statements
76+
cursor.execute("""
77+
SELECT COUNT(*)
78+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
79+
WHERE machine_id = ''
80+
OR health_status = ''
81+
OR failure_risk_score = 0
82+
OR maintenance_recommendation = ''
83+
""")
84+
null_count = cursor.fetchone()[0]
85+
assert null_count == 0, "Critical columns should not be empty"
86+
87+
# Check if each machine has at least one record using EXISTS
88+
cursor.execute("""
89+
SELECT m1.machine_id
90+
FROM FACTORY_PIPELINE_DEMO.PUBLIC.RAW_SENSOR_DATA m1
91+
WHERE NOT EXISTS (
92+
SELECT 1
93+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics m2
94+
WHERE m1.machine_id = m2.machine_id
95+
)
96+
LIMIT 1
97+
""")
98+
missing_machines = cursor.fetchone()
99+
assert missing_machines is None, "All machines should have health metrics"
100+
101+
finally:
102+
cursor.close()
103+
104+
def test_data_consistency(snowflake_conn):
105+
"""Test for data consistency in health metrics"""
106+
cursor = snowflake_conn.cursor()
107+
try:
108+
cursor.execute("""
109+
SELECT health_status,
110+
maintenance_recommendation,
111+
failure_risk_score
112+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
113+
""")
114+
data = cursor.fetchall()
115+
df = pd.DataFrame(data, columns=['health_status', 'maintenance_recommendation', 'failure_risk_score'])
116+
117+
# Check if CRITICAL status has urgent recommendations
118+
critical_records = df[df['health_status'] == 'CRITICAL']
119+
if not critical_records.empty:
120+
assert any('urgent' in rec.lower() or 'immediate' in rec.lower()
121+
for rec in critical_records['maintenance_recommendation']), \
122+
"CRITICAL status should have urgent maintenance recommendations"
123+
124+
# Check if risk scores align with health status
125+
status_risk_scores = df.groupby('health_status')['failure_risk_score'].mean()
126+
if all(status in status_risk_scores.index for status in ['CRITICAL', 'NEEDS_MAINTENANCE', 'HEALTHY']):
127+
assert status_risk_scores['CRITICAL'] > status_risk_scores['HEALTHY'], \
128+
"Risk scores should be higher for CRITICAL than HEALTHY status"
129+
130+
# Check for duplicates using group by
131+
cursor.execute("""
132+
SELECT machine_id, health_status, failure_risk_score, maintenance_recommendation, COUNT(*)
133+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
134+
GROUP BY machine_id, health_status, failure_risk_score, maintenance_recommendation
135+
HAVING COUNT(*) > 1
136+
LIMIT 1
137+
""")
138+
duplicate = cursor.fetchone()
139+
assert duplicate is None, "There should be no duplicate records"
140+
141+
finally:
142+
cursor.close()
143+
144+
def test_data_ranges(snowflake_conn):
145+
"""Test for expected data ranges and distributions"""
146+
cursor = snowflake_conn.cursor()
147+
try:
148+
# Check machine count is within expected range
149+
cursor.execute("SELECT COUNT(DISTINCT machine_id) FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics")
150+
machine_count = cursor.fetchone()[0]
151+
assert 1 <= machine_count <= 1000, "Number of machines should be within reasonable range"
152+
153+
# Check health status distribution using simpler aggregation
154+
cursor.execute("""
155+
SELECT health_status, COUNT(*) as count
156+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
157+
GROUP BY health_status
158+
""")
159+
status_counts = dict(cursor.fetchall())
160+
total_count = sum(status_counts.values())
161+
162+
# Calculate proportions manually
163+
status_props = {status: count/total_count for status, count in status_counts.items()}
164+
165+
# Ensure not all machines are in CRITICAL status
166+
assert status_props.get('CRITICAL', 0) < 0.5, \
167+
"Proportion of CRITICAL machines should not exceed 50%"
168+
169+
# Check maintenance recommendations
170+
cursor.execute("""
171+
SELECT COUNT(DISTINCT maintenance_recommendation)
172+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
173+
""")
174+
unique_recommendations = cursor.fetchone()[0]
175+
assert 1 <= unique_recommendations <= 20, \
176+
"Number of unique maintenance recommendations should be reasonable"
177+
178+
finally:
179+
cursor.close()
180+
181+
def test_data_aggregation(snowflake_conn):
182+
"""Test aggregated metrics and statistics"""
183+
cursor = snowflake_conn.cursor()
184+
try:
185+
# Use simpler statistics
186+
cursor.execute("""
187+
SELECT
188+
AVG(failure_risk_score) as mean_risk,
189+
MIN(failure_risk_score) as min_risk,
190+
MAX(failure_risk_score) as max_risk
191+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
192+
""")
193+
stats = cursor.fetchone()
194+
mean_risk, min_risk, max_risk = stats
195+
196+
# Check if statistics are within reasonable ranges
197+
assert 0 <= mean_risk <= 100, "Mean risk score should be between 0 and 100"
198+
assert 0 <= min_risk <= max_risk <= 100, "Risk scores should be between 0 and 100"
199+
200+
# Check health status distribution using simpler aggregation
201+
cursor.execute("""
202+
SELECT health_status, COUNT(*) as count
203+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics
204+
GROUP BY health_status
205+
""")
206+
status_counts = dict(cursor.fetchall())
207+
total_count = sum(status_counts.values())
208+
209+
# Calculate proportions manually
210+
status_props = {status: count/total_count for status, count in status_counts.items()}
211+
212+
# Verify reasonable distribution
213+
assert all(0 <= prop <= 1 for prop in status_props.values()), \
214+
"Health status proportions should be valid probabilities"
215+
assert sum(status_props.values()) == pytest.approx(1.0), \
216+
"Health status proportions should sum to 1"
217+
218+
finally:
219+
cursor.close()
220+
221+
def test_data_relationships(snowflake_conn):
222+
"""Test relationships between metrics and source data"""
223+
cursor = snowflake_conn.cursor()
224+
try:
225+
# Check if all machines in metrics exist in sensor data using EXISTS
226+
cursor.execute("""
227+
SELECT m.machine_id
228+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics m
229+
WHERE NOT EXISTS (
230+
SELECT 1
231+
FROM FACTORY_PIPELINE_DEMO.PUBLIC.RAW_SENSOR_DATA s
232+
WHERE m.machine_id = s.machine_id
233+
)
234+
LIMIT 1
235+
""")
236+
orphaned_metric = cursor.fetchone()
237+
assert orphaned_metric is None, "All machines in metrics should exist in sensor data"
238+
239+
# Check if metrics align with recent sensor data
240+
cursor.execute("""
241+
WITH recent_sensor_data AS (
242+
SELECT
243+
machine_id,
244+
AVG(temperature) as avg_temp,
245+
AVG(vibration) as avg_vibration,
246+
AVG(pressure) as avg_pressure
247+
FROM FACTORY_PIPELINE_DEMO.PUBLIC.RAW_SENSOR_DATA
248+
GROUP BY machine_id
249+
)
250+
SELECT
251+
m.machine_id,
252+
m.health_status,
253+
m.failure_risk_score,
254+
s.avg_temp,
255+
s.avg_vibration,
256+
s.avg_pressure
257+
FROM FACTORY_PIPELINE_DEMO.PUBLIC_marts.machine_health_metrics m
258+
JOIN recent_sensor_data s ON m.machine_id = s.machine_id
259+
""")
260+
data = pd.DataFrame(cursor.fetchall(),
261+
columns=['machine_id', 'health_status', 'failure_risk_score',
262+
'avg_temp', 'avg_vibration', 'avg_pressure'])
263+
264+
# Check if at least one sensor metric correlates with health status
265+
status_order = {'HEALTHY': 0, 'NEEDS_MAINTENANCE': 1, 'CRITICAL': 2}
266+
data['health_status_score'] = data['health_status'].map(status_order)
267+
268+
correlations = []
269+
for sensor in ['avg_temp', 'avg_vibration', 'avg_pressure']:
270+
correlation = data['health_status_score'].corr(data[sensor])
271+
correlations.append(correlation)
272+
273+
# Assert that at least one sensor has a positive correlation with health status
274+
assert any(corr > 0 for corr in correlations), \
275+
"At least one sensor metric should positively correlate with worse health status"
276+
277+
finally:
278+
cursor.close()

0 commit comments

Comments
 (0)