1
1
from __future__ import annotations
2
2
3
3
from datetime import datetime
4
- from typing import Any , Dict , Generator , Iterable , List , Optional , Set
4
+ from typing import Any , Dict , Generator , Iterable , List , Optional
5
5
6
6
from pydantic .v1 import Field , ValidationError , parse_obj_as
7
7
23
23
Term ,
24
24
Terms ,
25
25
)
26
- from pyatlan .utils import deep_get
27
26
28
27
BY_TIMESTAMP = [SortItem ("timestamp" , order = SortOrder .ASCENDING )]
29
28
@@ -377,6 +376,7 @@ def __init__(
377
376
log_entries : List [SearchLogEntry ],
378
377
aggregations : Dict [str , Aggregation ],
379
378
bulk : bool = False ,
379
+ processed_log_entries_count : int = 0 ,
380
380
):
381
381
self ._client = client
382
382
self ._endpoint = SEARCH_LOG
@@ -390,7 +390,8 @@ def __init__(
390
390
self ._bulk = bulk
391
391
self ._first_record_creation_time = - 2
392
392
self ._last_record_creation_time = - 2
393
- self ._processed_log_entries : Set [str ] = set ()
393
+ self ._duplicate_timestamp_page_count : int = 0
394
+ self ._processed_log_entries_count : int = processed_log_entries_count
394
395
395
396
@property
396
397
def count (self ) -> int :
@@ -404,65 +405,15 @@ def current_page(self) -> List[SearchLogEntry]:
404
405
"""
405
406
return self ._log_entries
406
407
407
- def _get_sl_unique_key (self , entity : SearchLogEntry ) -> Optional [str ]:
408
- """
409
- Returns a unique key for a `SearchLogEntry` by
410
- combining `entity_guid` with the timestamp.
411
-
412
- NOTE: This is necessary because the search log API
413
- does not provide a unique identifier for logs.
414
-
415
- :param: search log entry
416
- :returns: unique key or None if no valid key is found
417
- """
418
- entity_guid = entity .entity_guids_all [0 ] if entity .entity_guids_all else None
419
-
420
- # If entity_guid is not present, try to extract it from request_dsl; otherwise, return None
421
- if not entity_guid :
422
- terms = deep_get (
423
- entity .request_dsl , "query.function_score.query.bool.filter.bool.must"
424
- )
425
- if not terms :
426
- return None
427
-
428
- if isinstance (terms , list ):
429
- for term in terms :
430
- if isinstance (term , dict ) and term .get ("term" , {}).get ("__guid" ):
431
- entity_guid = term ["term" ]["__guid" ]
432
- break
433
- elif isinstance (terms , dict ):
434
- entity_guid = terms .get ("term" , {}).get ("__guid" )
435
-
436
- return (
437
- f"{ entity_guid } :{ entity .timestamp } "
438
- if entity_guid and entity_guid != "undefined"
439
- else None
440
- )
441
-
442
408
def next_page (self , start = None , size = None ) -> bool :
443
409
"""
444
410
Indicates whether there is a next page of results.
445
411
446
412
:returns: True if there is a next page of results, otherwise False
447
413
"""
448
414
self ._start = start or self ._start + self ._size
449
- is_bulk_search = (
450
- self ._bulk or self ._approximate_count > self ._MASS_EXTRACT_THRESHOLD
451
- )
452
415
if size :
453
416
self ._size = size
454
-
455
- if is_bulk_search :
456
- # Used in the "timestamp-based" paging approach
457
- # to check if search log with the unique key "_get_sl_unique_key()"
458
- # has already been processed in a previous page of results.
459
- # If it has, then exclude it from the current results;
460
- # otherwise, we may encounter duplicate search log records.
461
- self ._processed_log_entries .update (
462
- key
463
- for entity in self ._log_entries
464
- if (key := self ._get_sl_unique_key (entity ))
465
- )
466
417
return self ._get_next_page () if self ._log_entries else False
467
418
468
419
def _get_next_page (self ):
@@ -501,8 +452,8 @@ def _get_next_page_json(self, is_bulk_search: bool = False):
501
452
return None
502
453
try :
503
454
self ._log_entries = parse_obj_as (List [SearchLogEntry ], raw_json ["logs" ])
455
+ self ._processed_log_entries_count += len (self ._log_entries )
504
456
if is_bulk_search :
505
- self ._filter_processed_entities ()
506
457
self ._update_first_last_record_creation_times ()
507
458
return raw_json
508
459
except ValidationError as err :
@@ -514,6 +465,7 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
514
465
"""
515
466
Adjusts the query to include timestamp filters for search log bulk extraction.
516
467
"""
468
+ self ._criteria .dsl .from_ = 0
517
469
rewritten_filters = []
518
470
if isinstance (query , Bool ):
519
471
for filter_ in query .filter :
@@ -522,6 +474,9 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
522
474
rewritten_filters .append (filter_ )
523
475
524
476
if self ._first_record_creation_time != self ._last_record_creation_time :
477
+ # If the first and last record creation times are different,
478
+ # reset _duplicate_timestamp_page_count to its initial value
479
+ self ._duplicate_timestamp_page_count = 0
525
480
rewritten_filters .append (
526
481
self ._get_paging_timestamp_query (self ._last_record_creation_time )
527
482
)
@@ -539,47 +494,31 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
539
494
# in the DSL, append it to the Bool `filter`.
540
495
rewritten_filters .append (query )
541
496
rewritten_query = Bool (filter = rewritten_filters )
542
- self ._criteria .dsl .from_ = 0
543
497
self ._criteria .dsl .query = rewritten_query
544
498
else :
545
- # Ensure that when switching to offset-based paging, if the first and last record timestamps are the same,
546
- # we do not include a created timestamp filter (ie: Range(field='__timestamp', gte=VALUE)) in the query.
547
- # Instead, ensure the search runs with only SortItem(field='__timestamp', order=<SortOrder.ASCENDING>).
548
- # Failing to do so can lead to incomplete results (less than the approximate count) when running the search
549
- # with a small page size.
550
- if isinstance (query , Bool ):
551
- for filter_ in query .filter :
552
- if self ._is_paging_timestamp_query (filter_ ):
553
- query .filter .remove (filter_ )
554
-
555
- # Always ensure that the offset is set to the length of the processed assets
556
- # instead of the default (start + size), as the default may skip some assets
557
- # and result in incomplete results (less than the approximate count)
558
- self ._criteria .dsl .from_ = len (self ._processed_log_entries )
499
+ # If the first and last record creation times are the same,
500
+ # we need to switch to offset-based pagination instead of timestamp-based pagination
501
+ # to ensure we get the next set of results without duplicates.
502
+ # We use a page multiplier to skip already-processed records when encountering
503
+ # consecutive pages with identical timestamps, preventing duplicate results.
504
+ self ._criteria .dsl .from_ = self ._size * (
505
+ self ._duplicate_timestamp_page_count + 1
506
+ )
507
+ self ._criteria .dsl .size = self ._size
508
+ self ._duplicate_timestamp_page_count += 1
559
509
560
510
@staticmethod
561
511
def _get_paging_timestamp_query (last_timestamp : int ) -> Query :
562
- return Range (field = "createdAt" , gte = last_timestamp )
512
+ return Range (field = "createdAt" , gt = last_timestamp )
563
513
564
514
@staticmethod
565
515
def _is_paging_timestamp_query (filter_ : Query ) -> bool :
566
516
return (
567
517
isinstance (filter_ , Range )
568
518
and filter_ .field == "createdAt"
569
- and filter_ .gte is not None
519
+ and filter_ .gt is not None
570
520
)
571
521
572
- def _filter_processed_entities (self ):
573
- """
574
- Remove log entries that have already been processed to avoid duplicates.
575
- """
576
- self ._log_entries = [
577
- entity
578
- for entity in self ._log_entries
579
- if entity is not None
580
- and self ._get_sl_unique_key (entity ) not in self ._processed_log_entries
581
- ]
582
-
583
522
def _update_first_last_record_creation_times (self ):
584
523
self ._first_record_creation_time = self ._last_record_creation_time = - 2
585
524
0 commit comments