Skip to content

Commit 81cc96b

Browse files
committed
APP-7077: Fixed timestamp-based pagination in the search log results (when from + size > ES window size)
1 parent 0fefc5c commit 81cc96b

File tree

2 files changed

+22
-82
lines changed

2 files changed

+22
-82
lines changed

pyatlan/client/search_log.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,4 +209,5 @@ def search(
209209
log_entries=log_entries,
210210
aggregations={},
211211
bulk=bulk,
212+
processed_log_entries_count=len(log_entries),
212213
)

pyatlan/model/search_log.py

Lines changed: 21 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from datetime import datetime
4-
from typing import Any, Dict, Generator, Iterable, List, Optional, Set
4+
from typing import Any, Dict, Generator, Iterable, List, Optional
55

66
from pydantic.v1 import Field, ValidationError, parse_obj_as
77

@@ -23,7 +23,6 @@
2323
Term,
2424
Terms,
2525
)
26-
from pyatlan.utils import deep_get
2726

2827
BY_TIMESTAMP = [SortItem("timestamp", order=SortOrder.ASCENDING)]
2928

@@ -377,6 +376,7 @@ def __init__(
377376
log_entries: List[SearchLogEntry],
378377
aggregations: Dict[str, Aggregation],
379378
bulk: bool = False,
379+
processed_log_entries_count: int = 0,
380380
):
381381
self._client = client
382382
self._endpoint = SEARCH_LOG
@@ -390,7 +390,8 @@ def __init__(
390390
self._bulk = bulk
391391
self._first_record_creation_time = -2
392392
self._last_record_creation_time = -2
393-
self._processed_log_entries: Set[str] = set()
393+
self._duplicate_timestamp_page_count: int = 0
394+
self._processed_log_entries_count: int = processed_log_entries_count
394395

395396
@property
396397
def count(self) -> int:
@@ -404,65 +405,15 @@ def current_page(self) -> List[SearchLogEntry]:
404405
"""
405406
return self._log_entries
406407

407-
def _get_sl_unique_key(self, entity: SearchLogEntry) -> Optional[str]:
408-
"""
409-
Returns a unique key for a `SearchLogEntry` by
410-
combining `entity_guid` with the timestamp.
411-
412-
NOTE: This is necessary because the search log API
413-
does not provide a unique identifier for logs.
414-
415-
:param: search log entry
416-
:returns: unique key or None if no valid key is found
417-
"""
418-
entity_guid = entity.entity_guids_all[0] if entity.entity_guids_all else None
419-
420-
# If entity_guid is not present, try to extract it from request_dsl; otherwise, return None
421-
if not entity_guid:
422-
terms = deep_get(
423-
entity.request_dsl, "query.function_score.query.bool.filter.bool.must"
424-
)
425-
if not terms:
426-
return None
427-
428-
if isinstance(terms, list):
429-
for term in terms:
430-
if isinstance(term, dict) and term.get("term", {}).get("__guid"):
431-
entity_guid = term["term"]["__guid"]
432-
break
433-
elif isinstance(terms, dict):
434-
entity_guid = terms.get("term", {}).get("__guid")
435-
436-
return (
437-
f"{entity_guid}:{entity.timestamp}"
438-
if entity_guid and entity_guid != "undefined"
439-
else None
440-
)
441-
442408
def next_page(self, start=None, size=None) -> bool:
443409
"""
444410
Indicates whether there is a next page of results.
445411
446412
:returns: True if there is a next page of results, otherwise False
447413
"""
448414
self._start = start or self._start + self._size
449-
is_bulk_search = (
450-
self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
451-
)
452415
if size:
453416
self._size = size
454-
455-
if is_bulk_search:
456-
# Used in the "timestamp-based" paging approach
457-
# to check if search log with the unique key "_get_sl_unique_key()"
458-
# has already been processed in a previous page of results.
459-
# If it has, then exclude it from the current results;
460-
# otherwise, we may encounter duplicate search log records.
461-
self._processed_log_entries.update(
462-
key
463-
for entity in self._log_entries
464-
if (key := self._get_sl_unique_key(entity))
465-
)
466417
return self._get_next_page() if self._log_entries else False
467418

468419
def _get_next_page(self):
@@ -501,8 +452,8 @@ def _get_next_page_json(self, is_bulk_search: bool = False):
501452
return None
502453
try:
503454
self._log_entries = parse_obj_as(List[SearchLogEntry], raw_json["logs"])
455+
self._processed_log_entries_count += len(self._log_entries)
504456
if is_bulk_search:
505-
self._filter_processed_entities()
506457
self._update_first_last_record_creation_times()
507458
return raw_json
508459
except ValidationError as err:
@@ -514,6 +465,7 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
514465
"""
515466
Adjusts the query to include timestamp filters for search log bulk extraction.
516467
"""
468+
self._criteria.dsl.from_ = 0
517469
rewritten_filters = []
518470
if isinstance(query, Bool):
519471
for filter_ in query.filter:
@@ -522,6 +474,9 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
522474
rewritten_filters.append(filter_)
523475

524476
if self._first_record_creation_time != self._last_record_creation_time:
477+
# If the first and last record creation times are different,
478+
# reset _duplicate_timestamp_page_count to its initial value
479+
self._duplicate_timestamp_page_count = 0
525480
rewritten_filters.append(
526481
self._get_paging_timestamp_query(self._last_record_creation_time)
527482
)
@@ -539,47 +494,31 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
539494
# in the DSL, append it to the Bool `filter`.
540495
rewritten_filters.append(query)
541496
rewritten_query = Bool(filter=rewritten_filters)
542-
self._criteria.dsl.from_ = 0
543497
self._criteria.dsl.query = rewritten_query
544498
else:
545-
# Ensure that when switching to offset-based paging, if the first and last record timestamps are the same,
546-
# we do not include a created timestamp filter (ie: Range(field='__timestamp', gte=VALUE)) in the query.
547-
# Instead, ensure the search runs with only SortItem(field='__timestamp', order=<SortOrder.ASCENDING>).
548-
# Failing to do so can lead to incomplete results (less than the approximate count) when running the search
549-
# with a small page size.
550-
if isinstance(query, Bool):
551-
for filter_ in query.filter:
552-
if self._is_paging_timestamp_query(filter_):
553-
query.filter.remove(filter_)
554-
555-
# Always ensure that the offset is set to the length of the processed assets
556-
# instead of the default (start + size), as the default may skip some assets
557-
# and result in incomplete results (less than the approximate count)
558-
self._criteria.dsl.from_ = len(self._processed_log_entries)
499+
# If the first and last record creation times are the same,
500+
# we need to switch to offset-based pagination instead of timestamp-based pagination
501+
# to ensure we get the next set of results without duplicates.
502+
# We use a page multiplier to skip already-processed records when encountering
503+
# consecutive pages with identical timestamps, preventing duplicate results.
504+
self._criteria.dsl.from_ = self._size * (
505+
self._duplicate_timestamp_page_count + 1
506+
)
507+
self._criteria.dsl.size = self._size
508+
self._duplicate_timestamp_page_count += 1
559509

560510
@staticmethod
561511
def _get_paging_timestamp_query(last_timestamp: int) -> Query:
562-
return Range(field="createdAt", gte=last_timestamp)
512+
return Range(field="createdAt", gt=last_timestamp)
563513

564514
@staticmethod
565515
def _is_paging_timestamp_query(filter_: Query) -> bool:
566516
return (
567517
isinstance(filter_, Range)
568518
and filter_.field == "createdAt"
569-
and filter_.gte is not None
519+
and filter_.gt is not None
570520
)
571521

572-
def _filter_processed_entities(self):
573-
"""
574-
Remove log entries that have already been processed to avoid duplicates.
575-
"""
576-
self._log_entries = [
577-
entity
578-
for entity in self._log_entries
579-
if entity is not None
580-
and self._get_sl_unique_key(entity) not in self._processed_log_entries
581-
]
582-
583522
def _update_first_last_record_creation_times(self):
584523
self._first_record_creation_time = self._last_record_creation_time = -2
585524

0 commit comments

Comments
 (0)