33import csv
44import os
55import argparse
6+ import glob
7+ import re
68
7- def generate_sensor_data (num_records = 100 , machines = None , anomaly_probability = 0.15 ):
8- """
9- Generate synthetic sensor data with occasional anomalies
10-
11- Parameters:
12- - num_records: Number of records to generate
13- - machines: List of machine IDs (defaults to M001-M010)
14- - anomaly_probability: Chance of generating anomalous readings
15-
16- Returns:
17- - List of dictionaries containing sensor data
18- """
9+ def get_latest_batch_info (output_dir = "data" ):
10+ if not os .path .exists (output_dir ):
11+ os .makedirs (output_dir )
12+ return 1 , None
13+
14+ batch_files = glob .glob (os .path .join (output_dir , "sensor_data_batch_*.csv" ))
15+ if not batch_files :
16+ return 1 , None
17+
18+ latest_batch_num = 0
19+ latest_file = None
20+
21+ for f in batch_files :
22+ match = re .search (r"sensor_data_batch_(\d+).csv" , os .path .basename (f ))
23+ if match :
24+ batch_num = int (match .group (1 ))
25+ if batch_num > latest_batch_num :
26+ latest_batch_num = batch_num
27+ latest_file = f
28+
29+ if latest_file :
30+ try :
31+ with open (latest_file , 'r' , newline = '' ) as csvfile :
32+ reader = csv .reader (csvfile )
33+ header = next (reader )
34+ last_line = None
35+ for row in reader :
36+ if row :
37+ last_line = row
38+
39+ if last_line :
40+ last_timestamp_str = last_line [1 ]
41+ last_timestamp_dt = datetime .datetime .fromisoformat (last_timestamp_str .replace ('Z' , '+00:00' ))
42+ return latest_batch_num + 1 , last_timestamp_dt
43+ except Exception as e :
44+ print (f"Warning: Could not read or parse last timestamp from { latest_file } : { e } " )
45+ # Fallback if reading fails, start fresh for the next batch number
46+ return latest_batch_num + 1 , datetime .datetime .now (datetime .timezone .utc ) - datetime .timedelta (hours = 1 )
47+
48+ return latest_batch_num + 1 , None # Should not happen if latest_file was found, but as a fallback
49+
50+
51+ def generate_sensor_data (
52+ num_records_per_machine = 50 ,
53+ machines = None ,
54+ anomaly_probability = 0.15 ,
55+ start_timestamp = None
56+ ):
1957 if machines is None :
2058 machines = [f"M{ str (i ).zfill (3 )} " for i in range (1 , 11 )]
2159
22- # Base timestamp (current time minus some random offset)
23- base_time = datetime .datetime .now () - datetime .timedelta (hours = random .randint (0 , 24 ))
24-
25- # Normal operating ranges for each sensor
60+ num_total_records = num_records_per_machine * len (machines )
61+
62+ if start_timestamp is None :
63+ current_time = datetime .datetime .now (datetime .timezone .utc ) - datetime .timedelta (hours = 1 )
64+ else :
65+ current_time = start_timestamp
66+
2667 normal_ranges = {
27- "temperature" : (65.0 , 80.0 ), # degrees Celsius
28- "vibration" : (0.01 , 0.2 ), # mm/s
29- "pressure" : (98.0 , 102.0 ) # kPa
68+ "temperature" : (65.0 , 80.0 ),
69+ "vibration" : (0.01 , 0.2 ),
70+ "pressure" : (98.0 , 102.0 )
3071 }
3172
32- # Warning thresholds
3373 warning_thresholds = {
34- "temperature" : 85.0 , # above this is WARNING
35- "vibration" : 0.5 , # above this is WARNING
36- "pressure_high" : 105.0 , # above this is WARNING
37- "pressure_low" : 95.0 # below this is WARNING
74+ "temperature" : 85.0 ,
75+ "vibration" : 0.5 ,
76+ "pressure_high" : 105.0 ,
77+ "pressure_low" : 95.0
3878 }
3979
40- # Critical thresholds
4180 critical_thresholds = {
42- "temperature" : 95.0 , # above this is CRITICAL
43- "vibration" : 0.8 , # above this is CRITICAL
44- "pressure_high" : 110.0 , # above this is CRITICAL
45- "pressure_low" : 92.0 # below this is CRITICAL
81+ "temperature" : 95.0 ,
82+ "vibration" : 0.8 ,
83+ "pressure_high" : 110.0 ,
84+ "pressure_low" : 92.0
4685 }
4786
4887 data = []
88+ machine_record_counts = {machine_id : 0 for machine_id in machines }
4989
50- for i in range (num_records ):
51- # Select a random machine
52- machine_id = random .choice (machines )
90+ available_machines = list (machines )
91+
92+ for i in range (num_total_records ):
93+ if not available_machines :
94+ break
5395
54- # Generate timestamp with some randomness
55- timestamp = base_time + datetime .timedelta (
56- minutes = random .randint (0 , 60 ),
57- seconds = random .randint (0 , 59 )
58- )
96+ machine_id = random .choice (available_machines )
5997
60- # Decide if this reading should be anomalous
98+ current_time += datetime .timedelta (seconds = random .randint (1 , 15 + i // 10 ))
99+
61100 is_anomaly = random .random () < anomaly_probability
62- is_critical = is_anomaly and random .random () < 0.3 # 30% of anomalies are critical
101+ is_critical = is_anomaly and random .random () < 0.3
63102
64- # Generate sensor readings
103+ status = "AOK"
104+ temperature = random .uniform (* normal_ranges ["temperature" ])
105+ vibration = random .uniform (* normal_ranges ["vibration" ])
106+ pressure = random .uniform (* normal_ranges ["pressure" ])
107+
65108 if is_anomaly :
109+ anomaly_type = random .choice (["temperature" , "vibration" , "pressure" ])
66110 if is_critical :
67- # Critical anomaly
68- anomaly_type = random .choice (["temperature" , "vibration" , "pressure" ])
69-
111+ status = "CRIT"
70112 if anomaly_type == "temperature" :
71113 temperature = random .uniform (critical_thresholds ["temperature" ], critical_thresholds ["temperature" ] + 10 )
72- vibration = random .uniform (* normal_ranges ["vibration" ])
73- pressure = random .uniform (* normal_ranges ["pressure" ])
74- status = "CRIT"
75114 elif anomaly_type == "vibration" :
76- temperature = random .uniform (* normal_ranges ["temperature" ])
77115 vibration = random .uniform (critical_thresholds ["vibration" ], critical_thresholds ["vibration" ] + 0.5 )
78- pressure = random .uniform (* normal_ranges ["pressure" ])
79- status = "CRIT"
80- else : # pressure anomaly
81- temperature = random .uniform (* normal_ranges ["temperature" ])
82- vibration = random .uniform (* normal_ranges ["vibration" ])
83-
84- # Either too high or too low pressure
116+ else : # pressure
85117 if random .random () < 0.5 :
86118 pressure = random .uniform (critical_thresholds ["pressure_high" ], critical_thresholds ["pressure_high" ] + 5 )
87119 else :
88120 pressure = random .uniform (critical_thresholds ["pressure_low" ] - 5 , critical_thresholds ["pressure_low" ])
89- status = "CRIT"
90- else :
91- # Warning anomaly
92- anomaly_type = random .choice (["temperature" , "vibration" , "pressure" ])
93-
121+ else : # Warning
122+ status = "WARN"
94123 if anomaly_type == "temperature" :
95- temperature = random .uniform (warning_thresholds ["temperature" ], critical_thresholds ["temperature" ])
96- vibration = random .uniform (* normal_ranges ["vibration" ])
97- pressure = random .uniform (* normal_ranges ["pressure" ])
98- status = "WARN"
124+ temperature = random .uniform (warning_thresholds ["temperature" ], critical_thresholds ["temperature" ] - 0.1 ) # ensure it's below critical
99125 elif anomaly_type == "vibration" :
100- temperature = random .uniform (* normal_ranges ["temperature" ])
101- vibration = random .uniform (warning_thresholds ["vibration" ], critical_thresholds ["vibration" ])
102- pressure = random .uniform (* normal_ranges ["pressure" ])
103- status = "WARN"
104- else : # pressure anomaly
105- temperature = random .uniform (* normal_ranges ["temperature" ])
106- vibration = random .uniform (* normal_ranges ["vibration" ])
107-
108- # Either too high or too low pressure
126+ vibration = random .uniform (warning_thresholds ["vibration" ], critical_thresholds ["vibration" ]- 0.01 )
127+ else : # pressure
109128 if random .random () < 0.5 :
110- pressure = random .uniform (warning_thresholds ["pressure_high" ], critical_thresholds ["pressure_high" ])
129+ pressure = random .uniform (warning_thresholds ["pressure_high" ], critical_thresholds ["pressure_high" ]- 0.1 )
111130 else :
112- pressure = random .uniform (critical_thresholds ["pressure_low" ], warning_thresholds ["pressure_low" ])
113- status = "WARN"
114- else :
115- # Normal readings
116- temperature = random .uniform (* normal_ranges ["temperature" ])
117- vibration = random .uniform (* normal_ranges ["vibration" ])
118- pressure = random .uniform (* normal_ranges ["pressure" ])
119- status = "AOK"
131+ pressure = random .uniform (critical_thresholds ["pressure_low" ]+ 0.1 , warning_thresholds ["pressure_low" ])
120132
121- # Format timestamp as ISO format
122- timestamp_str = timestamp .strftime ("%Y-%m-%dT%H:%M:%SZ" )
133+ timestamp_str = current_time .strftime ("%Y-%m-%dT%H:%M:%SZ" )
123134
124- # Create data record
125135 record = {
126136 "machine_id" : machine_id ,
127137 "timestamp" : timestamp_str ,
@@ -130,10 +140,12 @@ def generate_sensor_data(num_records=100, machines=None, anomaly_probability=0.1
130140 "pressure" : round (pressure , 1 ),
131141 "status_code" : status
132142 }
133-
134143 data .append (record )
135-
136- # Sort by timestamp
144+
145+ machine_record_counts [machine_id ] += 1
146+ if machine_record_counts [machine_id ] >= num_records_per_machine :
147+ available_machines .remove (machine_id )
148+
137149 data .sort (key = lambda x : x ["timestamp" ])
138150
139151 return data
@@ -150,25 +162,32 @@ def write_csv(data, filename):
150162 writer .writerows (data )
151163
152164 print (f"Generated { len (data )} records and saved to { filename } " )
153-
154165 return filename
155166
156167if __name__ == "__main__" :
157- parser = argparse .ArgumentParser (description = "Generate synthetic sensor data" )
158- parser .add_argument ("--records " , type = int , default = 100 , help = "Number of records to generate (default: 100 )" )
159- parser .add_argument ("--output " , type = str , default = "data/generated_sensor_data.csv " , help = "Output CSV file path " )
168+ parser = argparse .ArgumentParser (description = "Generate synthetic sensor data in batches. " )
169+ parser .add_argument ("--records_per_machine " , type = int , default = 10 , help = "Number of records per machine (default: 50 )" )
170+ parser .add_argument ("--output_dir " , type = str , default = "data" , help = "Output directory for CSV files (default: data) " )
160171 parser .add_argument ("--anomalies" , type = float , default = 0.15 , help = "Probability of anomalies (0-1, default: 0.15)" )
161172
162173 args = parser .parse_args ()
163-
164- # Generate sensor data
165- data = generate_sensor_data (args .records , anomaly_probability = args .anomalies )
166-
167- # Write to CSV
168- output_file = write_csv (data , args .output )
169-
170- print (f"\n Next steps:" )
171- print (f"1. Upload the file to S3 to trigger Snowpipe:" )
172- print (f" python setup/03_upload_file.py --file { output_file } " )
173- print (f"2. Check the pipeline status:" )
174- print (f" python setup/check_pipeline_status.py" )
174+
175+ next_batch_num , last_timestamp = get_latest_batch_info (args .output_dir )
176+
177+ if last_timestamp :
178+ print (f"Last timestamp from batch { next_batch_num - 1 } : { last_timestamp .strftime ('%Y-%m-%dT%H:%M:%SZ' )} " )
179+ start_timestamp_for_new_batch = last_timestamp + datetime .timedelta (seconds = random .randint (10 ,30 ))
180+ else :
181+ print ("No previous batch found or unable to read last timestamp. Starting fresh." )
182+ start_timestamp_for_new_batch = datetime .datetime .now (datetime .timezone .utc ) - datetime .timedelta (hours = 1 )
183+
184+ print (f"Generating data for batch { next_batch_num } , starting after ~{ start_timestamp_for_new_batch .strftime ('%Y-%m-%dT%H:%M:%SZ' )} " )
185+
186+ data = generate_sensor_data (
187+ num_records_per_machine = args .records_per_machine ,
188+ anomaly_probability = args .anomalies ,
189+ start_timestamp = start_timestamp_for_new_batch
190+ )
191+
192+ output_filename = os .path .join (args .output_dir , f"sensor_data_batch_{ next_batch_num } .csv" )
193+ output_file = write_csv (data , output_filename )
0 commit comments