99import dateutil .parser
1010from column_name import normalise_name
1111from osisoft_plugin_common import reorder_dataframe
12+ from datetime import datetime
1213
1314
1415logger = SafeLogger ("pi-system plugin" , forbiden_keys = ["token" , "password" ])
@@ -36,18 +37,27 @@ def parse_timestamp_and_value(line):
3637 return date , value
3738
3839
39- def get_datetime_from_string ( datetime ):
40+ def get_epoch_from_string ( datetime_string ):
4041 try :
41- _ = dateutil .parser .isoparse (datetime )
42- return datetime
42+ utc_time = datetime .strptime (datetime_string , "%Y-%m-%dT%H:%M:%SZ" )
43+ epoch_time = (utc_time - datetime (1970 , 1 , 1 )).total_seconds ()
44+ except Exception :
45+ return None
46+ return epoch_time
47+
48+
49+ def get_datetime_from_string (datetime_string ):
50+ try :
51+ _ = dateutil .parser .isoparse (datetime_string )
52+ return datetime_string
4353 except Exception :
4454 pass
4555 return None
4656
4757
48- def get_datetime_from_pandas (datetime ):
58+ def get_datetime_from_pandas (datetime_string ):
4959 try :
50- time_stamp = datetime .strftime ('%Y-%m-%dT%H:%M:%SZ' )
60+ time_stamp = datetime_string .strftime ('%Y-%m-%dT%H:%M:%SZ' )
5161 return time_stamp
5262 except Exception :
5363 pass
@@ -63,7 +73,7 @@ def get_datetime_from_row(row, datetime_column):
6373 return formated_datetime
6474
6575
66- def get_latest_values_at_timestamp (file_handles , seek_timestamp ):
76+ def get_values_at_timestamp (file_handles , seek_timestamp , step_attributes ):
6777 attribute_index = 0
6878 values = {}
6979 for attribute_path in file_handles :
@@ -85,19 +95,42 @@ def get_latest_values_at_timestamp(file_handles, seek_timestamp):
8595 next_timestamps_cache [attribute_index ] = attribute_timestamp
8696 next_values_cache [attribute_index ] = attribute_value
8797 next_cached_timestamp = next_timestamps_cache [attribute_index ]
98+ if step_attributes .get (attribute_path ) is True :
99+ calculated_value = interpolate (
100+ current_timestamps_cache [attribute_index ],
101+ current_values_cache [attribute_index ],
102+ next_timestamps_cache [attribute_index ],
103+ next_values_cache [attribute_index ],
104+ seek_timestamp
105+ )
106+ else :
107+ calculated_value = current_values_cache [attribute_index ]
88108 if should_add_timestamps_columns :
89109 values .update ({
90110 "{}{}" .format (attribute_path , OSIsoftConstants .TIMESTAMP_COLUMN_SUFFIX ): current_timestamps_cache [attribute_index ],
91- "{}{}" .format (attribute_path , OSIsoftConstants .VALUE_COLUMN_SUFFIX ): current_values_cache [ attribute_index ]
111+ "{}{}" .format (attribute_path , OSIsoftConstants .VALUE_COLUMN_SUFFIX ): calculated_value
92112 })
93113 else :
94114 values .update ({
95- attribute_path : current_values_cache [ attribute_index ]
115+ attribute_path : calculated_value
96116 })
97117 attribute_index = attribute_index + 1
98118 return values
99119
100120
121+ def interpolate (previous_timestamp , previous_value , next_timestamp , next_value , time_now ):
122+ previous_timestamp = get_epoch_from_string (previous_timestamp )
123+ next_timestamp = get_epoch_from_string (next_timestamp )
124+ time_now = get_epoch_from_string (time_now )
125+ if previous_timestamp is None or next_timestamp is None or time_now is None :
126+ return None
127+ if previous_timestamp == next_timestamp or time_now == previous_timestamp :
128+ return previous_value
129+ rate_of_change = (float (next_value ) - float (previous_value )) / (float (next_timestamp ) - float (previous_timestamp ))
130+ value_now = float (previous_value ) + rate_of_change * (float (time_now ) - float (previous_timestamp ))
131+ return value_now
132+
133+
101134def clean_cache (paths_to_file_handles ):
102135 logger .info ("Polling done, cleaning the cache files" )
103136 # Close and delete all cache files
@@ -162,12 +195,17 @@ def get_column_name_specifications():
162195previous_server_url = ""
163196paths_to_file_handles = {}
164197file_counter = 0
198+ step_attributes = {}
199+
200+ type_of_interpolation = config .get ("type_of_interpolation" , "last_value" )
201+ if type_of_interpolation == "auto" :
202+ step_column_name = config .get ("step_column_name" , "Step" )
165203
166204# Cache each attribute
167205logger .info ("Caching all attributes in {}" .format (temp_location .name ))
168206for index , input_parameters_row in input_parameters_dataframe .iterrows ():
169- datetime = get_datetime_from_row (input_parameters_row , datetime_column )
170- if not datetime :
207+ row_datetime = get_datetime_from_row (input_parameters_row , datetime_column )
208+ if not row_datetime :
171209 continue
172210 attribute_path = input_parameters_row .get (input_paths_column )
173211 if should_make_column_names_db_compatible :
@@ -181,7 +219,14 @@ def get_column_name_specifications():
181219 if attribute_path == reference_attribute_path :
182220 time_reference_file = file_counter
183221 file_counter = file_counter + 1
184- paths_to_file_handles [attribute_path ].writelines ("{}|{}\n " .format (datetime , value ))
222+ paths_to_file_handles [attribute_path ].writelines ("{}|{}\n " .format (row_datetime , value ))
223+
224+ if type_of_interpolation == "auto" :
225+ is_step_attribute = input_parameters_row .get (step_column_name )
226+ if is_step_attribute == "True" or is_step_attribute is True :
227+ step_attributes [attribute_path ] = True
228+ elif type_of_interpolation == "interpolation" :
229+ step_attributes [attribute_path ] = True
185230
186231logger .info ("Cached all {} attributes" .format (file_counter ))
187232
@@ -210,15 +255,15 @@ def get_column_name_specifications():
210255next_timestamps_cache .pop (0 )
211256next_values_cache .pop (0 )
212257
258+ logger .info ("Polling all attributes into final dataset" )
213259# For each timestamp of synchronizer attribute, read the most up to date value of all other attributes
214260# Write all that, one column per attribute
215261first_dataframe = True
216- logger .info ("Polling all attributes into final dataset" )
217262with output_dataset .get_writer () as writer :
218263 for line in reference_values_file :
219264 unnested_items_rows = []
220265 timestamp , value = parse_timestamp_and_value (line )
221- output_columns_dictionary = get_latest_values_at_timestamp (paths_to_file_handles , timestamp )
266+ output_columns_dictionary = get_values_at_timestamp (paths_to_file_handles , timestamp , step_attributes )
222267 output_columns_dictionary .update ({
223268 OSIsoftConstants .TIMESTAMP_COLUMN_NAME : timestamp ,
224269 reference_attribute_path : value
0 commit comments