|
| 1 | +from typing import List, Optional |
| 2 | + |
1 | 3 | from tenacity import (
|
2 | 4 | retry,
|
3 | 5 | retry_if_exception_type,
|
| 6 | + retry_if_result, |
4 | 7 | stop_after_attempt,
|
| 8 | + wait_exponential, |
5 | 9 | wait_random_exponential,
|
6 | 10 | )
|
7 | 11 |
|
8 | 12 | from pyatlan.client.atlan import AtlanClient
|
9 |
| -from pyatlan.errors import AtlanError |
| 13 | +from pyatlan.errors import AtlanError, NotFoundError |
| 14 | +from pyatlan.model.assets import Persona, Purpose |
| 15 | +from pyatlan.model.fluent_search import FluentSearch |
| 16 | +from pyatlan.model.search import IndexSearchRequest |
10 | 17 | from pyatlan.model.typedef import AtlanTagDef, CustomMetadataDef, EnumDef
|
11 | 18 |
|
12 | 19 |
|
@@ -35,3 +42,141 @@ def wait_for_successful_custometadatadef_purge(name: str, client: AtlanClient):
|
35 | 42 | )
|
36 | 43 | def wait_for_successful_enumadef_purge(name: str, client: AtlanClient):
|
37 | 44 | client.typedef.purge(name, typedef_type=EnumDef)
|
| 45 | + |
| 46 | + |
| 47 | +# ============================= |
| 48 | +# Search Retry Utilities for Eventual Consistency |
| 49 | +# ============================= |
| 50 | + |
| 51 | + |
| 52 | +def find_personas_by_name_with_retry( |
| 53 | + client: AtlanClient, name: str, attributes: Optional[List[str]] = None |
| 54 | +) -> List[Persona]: |
| 55 | + """ |
| 56 | + Find personas by name with automatic retry for search index eventual consistency. |
| 57 | +
|
| 58 | + :param client: AtlanClient instance |
| 59 | + :param name: name of the persona to find |
| 60 | + :param attributes: optional attributes to retrieve |
| 61 | + :returns: list of personas found |
| 62 | + :raises NotFoundError: if no personas found after all retries |
| 63 | + """ |
| 64 | + |
| 65 | + @retry( |
| 66 | + reraise=True, |
| 67 | + retry=retry_if_exception_type(NotFoundError), |
| 68 | + stop=stop_after_attempt(10), |
| 69 | + wait=wait_exponential(multiplier=1, min=2, max=10), |
| 70 | + ) |
| 71 | + def _retry_find_personas(): |
| 72 | + return client.asset.find_personas_by_name(name=name, attributes=attributes) |
| 73 | + |
| 74 | + return _retry_find_personas() |
| 75 | + |
| 76 | + |
| 77 | +def find_purposes_by_name_with_retry( |
| 78 | + client: AtlanClient, name: str, attributes: Optional[List[str]] = None |
| 79 | +) -> List[Purpose]: |
| 80 | + """ |
| 81 | + Find purposes by name with automatic retry for search index eventual consistency. |
| 82 | +
|
| 83 | + :param client: AtlanClient instance |
| 84 | + :param name: name of the purpose to find |
| 85 | + :param attributes: optional attributes to retrieve |
| 86 | + :returns: list of purposes found |
| 87 | + :raises NotFoundError: if no purposes found after all retries |
| 88 | + """ |
| 89 | + |
| 90 | + @retry( |
| 91 | + reraise=True, |
| 92 | + retry=retry_if_exception_type(NotFoundError), |
| 93 | + stop=stop_after_attempt(10), |
| 94 | + wait=wait_exponential(multiplier=1, min=2, max=10), |
| 95 | + ) |
| 96 | + def _retry_find_purposes(): |
| 97 | + return client.asset.find_purposes_by_name(name=name, attributes=attributes) |
| 98 | + |
| 99 | + return _retry_find_purposes() |
| 100 | + |
| 101 | + |
| 102 | +def fluent_search_count_with_retry( |
| 103 | + fluent_search: FluentSearch, client: AtlanClient, expected_count: int |
| 104 | +) -> int: |
| 105 | + """ |
| 106 | + Count FluentSearch results with automatic retry for search index eventual consistency. |
| 107 | +
|
| 108 | + :param fluent_search: FluentSearch instance to count |
| 109 | + :param client: AtlanClient instance |
| 110 | + :param expected_count: expected minimum count to wait for |
| 111 | + :returns: actual count after retry logic |
| 112 | + """ |
| 113 | + |
| 114 | + @retry( |
| 115 | + reraise=True, |
| 116 | + retry=retry_if_result(lambda count: count < expected_count), |
| 117 | + stop=stop_after_attempt(10), |
| 118 | + wait=wait_exponential(multiplier=1, min=2, max=10), |
| 119 | + ) |
| 120 | + def _retry_count(): |
| 121 | + return fluent_search.count(client) |
| 122 | + |
| 123 | + return _retry_count() |
| 124 | + |
| 125 | + |
| 126 | +def search_request_count_with_retry( |
| 127 | + client: AtlanClient, request: IndexSearchRequest, expected_count: int |
| 128 | +) -> int: |
| 129 | + """ |
| 130 | + Count search request results with automatic retry for search index eventual consistency. |
| 131 | +
|
| 132 | + :param client: AtlanClient instance |
| 133 | + :param request: IndexSearchRequest to execute |
| 134 | + :param expected_count: expected minimum count to wait for |
| 135 | + :returns: actual count after retry logic |
| 136 | + """ |
| 137 | + |
| 138 | + @retry( |
| 139 | + reraise=True, |
| 140 | + retry=retry_if_result(lambda count: count < expected_count), |
| 141 | + stop=stop_after_attempt(10), |
| 142 | + wait=wait_exponential(multiplier=1, min=2, max=10), |
| 143 | + ) |
| 144 | + def _retry_search(): |
| 145 | + response = client.asset.search(request) |
| 146 | + return response.count |
| 147 | + |
| 148 | + return _retry_search() |
| 149 | + |
| 150 | + |
| 151 | +def assert_search_count_with_retry( |
| 152 | + client: AtlanClient, request: IndexSearchRequest, expected_count: int |
| 153 | +) -> None: |
| 154 | + """ |
| 155 | + Assert search count with retry - convenience method for test assertions. |
| 156 | +
|
| 157 | + :param client: AtlanClient instance |
| 158 | + :param request: IndexSearchRequest to execute |
| 159 | + :param expected_count: expected count to assert |
| 160 | + :raises AssertionError: if count doesn't match after retries |
| 161 | + """ |
| 162 | + actual_count = search_request_count_with_retry(client, request, expected_count) |
| 163 | + assert actual_count == expected_count, ( |
| 164 | + f"Expected {expected_count} results, got {actual_count}" |
| 165 | + ) |
| 166 | + |
| 167 | + |
| 168 | +def assert_fluent_search_count_with_retry( |
| 169 | + fluent_search: FluentSearch, client: AtlanClient, expected_count: int |
| 170 | +) -> None: |
| 171 | + """ |
| 172 | + Assert FluentSearch count with retry - convenience method for test assertions. |
| 173 | +
|
| 174 | + :param fluent_search: FluentSearch instance to count |
| 175 | + :param client: AtlanClient instance |
| 176 | + :param expected_count: expected count to assert |
| 177 | + :raises AssertionError: if count doesn't match after retries |
| 178 | + """ |
| 179 | + actual_count = fluent_search_count_with_retry(fluent_search, client, expected_count) |
| 180 | + assert actual_count == expected_count, ( |
| 181 | + f"Expected {expected_count} results, got {actual_count}" |
| 182 | + ) |
0 commit comments