66from elasticsearch import Elasticsearch
77from elasticsearch .connection import create_ssl_context
88from elasticsearch_dsl import Search
9- from datetime import datetime , timezone
9+ from datetime import datetime , timezone , timedelta
1010
11- from benchmark_runner .common .elasticsearch .elasticsearch_exceptions import ElasticSearchDataNotUploaded
11+ from benchmark_runner .common .elasticsearch .elasticsearch_exceptions import ElasticSearchDataNotUploaded , ElasticSearchDataNotFound
1212from benchmark_runner .common .logger .logger_time_stamp import logger_time_stamp , logger
1313
1414
@@ -154,11 +154,12 @@ def verify_elasticsearch_data_uploaded(self, index: str, uuid: str = '', workloa
154154 raise ElasticSearchDataNotUploaded
155155
156156 @typechecked ()
157- def upload_to_elasticsearch (self , index : str , data : dict , doc_type : str = '_doc' , es_add_items : dict = None , ** kwargs ):
157+ def upload_to_elasticsearch (self , index : str , data : dict , doc_type : str = '_doc' , timestamp : datetime = None , es_add_items : dict = None , ** kwargs ):
158158 """
159159 This method uploads json data into elasticsearch
160160 :param index: index name to be stored in elasticsearch
161161 :param data: data must be in dictionary i.e. {'key': 'value'}
162+ :param timestamp:
162163 :param doc_type:
163164 :param es_add_items:
164165 :return:
@@ -169,7 +170,10 @@ def upload_to_elasticsearch(self, index: str, data: dict, doc_type: str = '_doc'
169170 data .update (es_add_items )
170171
171172 # utcnow - solve timestamp issue
172- data ['timestamp' ] = datetime .now (timezone .utc ) # datetime.utcnow() or datetime.now()
173+ if timestamp :
174+ data ['timestamp' ] = timestamp
175+ else :
176+ data ['timestamp' ] = datetime .now (timezone .utc ) # datetime.utcnow() or datetime.now()
173177
174178 # Uploads data to elasticsearch server
175179 try :
@@ -229,32 +233,48 @@ def delete_elasticsearch_index_by_id(self, index: str, id: str):
229233
230234 @typechecked ()
231235 @logger_time_stamp
232- def get_query_data_between_dates (self , start_datetime : datetime , end_datetime : datetime ) :
236+ def get_query_data_between_dates (self , start_datetime : datetime , end_datetime : datetime , sort_order : str = "desc" ) -> dict :
233237 """
234- This method returns the query for fetching data between dates
235- @param start_datetime:
236- @param end_datetime:
237- @return:
238+ Returns an Elasticsearch query for fetching data between UTC-aware datetime ranges.
239+
240+ :param start_datetime: datetime, start of the range (UTC-aware)
241+ :param end_datetime: datetime, end of the range (UTC-aware)
242+ :param sort_order: desc or asc
243+ :return: dict, Elasticsearch query
238244 """
239- if start_datetime and end_datetime :
240- if end_datetime < start_datetime :
241- start_datetime = end_datetime
242- query = {
243- "bool" : {
244- "filter" : {
245- "range" : {
246- "timestamp" : {
247- "format" : "yyyy-MM-dd HH:mm:ss"
248- }
245+ if not start_datetime :
246+ raise ValueError ('Empty parameters: start_datetime' )
247+ if not end_datetime :
248+ raise ValueError ('Empty parameters: end_datetime' )
249+
250+ # Ensure start_datetime <= end_datetime
251+ if end_datetime < start_datetime :
252+ start_datetime , end_datetime = end_datetime , start_datetime
253+
254+ # Convert to ISO 8601 with Z
255+ start_iso = start_datetime .replace (microsecond = 0 ).isoformat () + "Z"
256+ end_iso = end_datetime .replace (microsecond = 0 ).isoformat () + "Z"
257+
258+ # Build Elasticsearch query
259+ query = {
260+ "query" : {
261+ "bool" : {
262+ "filter" : {
263+ "range" : {
264+ "timestamp" : {
265+ "gte" : start_iso ,
266+ "lte" : end_iso
249267 }
250268 }
251269 }
252- }
253- query ['bool' ]['filter' ]['range' ]['timestamp' ]['lte' ] = str (end_datetime .replace (microsecond = 0 ))
254- query ['bool' ]['filter' ]['range' ]['timestamp' ]['gte' ] = str (start_datetime .replace (microsecond = 0 ))
255- return query
256- else :
257- raise Exception ('Empty parameters: start_datetime/ end_datetime' )
270+ }
271+ },
272+ "sort" : [
273+ {"timestamp" : {"order" : sort_order }}
274+ ]
275+ }
276+
277+ return query
258278
259279 @typechecked ()
260280 @logger_time_stamp
@@ -271,7 +291,7 @@ def get_index_data_between_dates(self, index: str, start_datetime: datetime = No
271291 if index and start_datetime and end_datetime :
272292 es_data = []
273293 query = self .get_query_data_between_dates (start_datetime , end_datetime )
274- response = self .__es .search (index = index , query = query , size = number_of_documents , scroll = scroll_duration )
294+ response = self .__es .search (index = index , body = query , size = number_of_documents , scroll = scroll_duration )
275295 scroll_id = response .get ('_scroll_id' )
276296 if response .get ('hits' ).get ('hits' ):
277297 es_data .extend (response .get ('hits' ).get ('hits' ))
@@ -285,6 +305,63 @@ def get_index_data_between_dates(self, index: str, start_datetime: datetime = No
285305 else :
286306 raise Exception ('Empty parameters: index/ start_datetime/ end_datetime' )
287307
308+ @typechecked ()
309+ @logger_time_stamp
310+ def get_latest_resource_with_key (
311+ self ,
312+ index : str ,
313+ key : str ,
314+ start_datetime : datetime = None ,
315+ end_datetime : datetime = None
316+ ) -> dict :
317+ """
318+ Returns the _source of the most recent document (latest first)
319+ that contains the given key.
320+ If none found, returns an empty dict.
321+ """
322+ if not end_datetime :
323+ end_datetime = datetime .now (timezone .utc )
324+ if not start_datetime :
325+ start_datetime = end_datetime - timedelta (minutes = 60 )
326+
327+ docs = self .get_index_data_between_dates (
328+ index ,
329+ start_datetime = start_datetime ,
330+ end_datetime = end_datetime
331+ )
332+
333+ if not docs :
334+ return {}
335+
336+ # Latest → oldest
337+ for doc in docs :
338+ source = doc .get ('_source' , {})
339+ if key in source :
340+ return source
341+
342+ return {}
343+
344+ @typechecked ()
345+ @logger_time_stamp
346+ def get_uuid_for_ids (self , index : str , ids : list [str ]) -> dict :
347+ """
348+ Return a mapping of doc_id -> uuid (or None if missing)
349+ """
350+ if not ids :
351+ # Nothing to fetch, avoid _mget with empty list
352+ return {}
353+ docs = self .__es .mget (
354+ index = index ,
355+ body = {"ids" : ids }
356+ )
357+ result = {}
358+ for doc in docs ["docs" ]:
359+ if doc .get ("found" ):
360+ result [doc ["_id" ]] = doc ["_source" ].get ("uuid" )
361+ else :
362+ result [doc ["_id" ]] = None
363+ return result
364+
288365 @typechecked ()
289366 @logger_time_stamp
290367 def get_index_ids_between_dates (self , index : str , start_datetime : datetime = None , end_datetime : datetime = None ):
0 commit comments