|
4 | 4 |
|
5 | 5 | import functools |
6 | 6 | import logging |
| 7 | +import os |
7 | 8 | from abc import ABC, abstractmethod |
8 | 9 | from typing import ( |
9 | 10 | Any, |
|
17 | 18 | Union, |
18 | 19 | ) |
19 | 20 |
|
| 21 | +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type |
20 | 22 | from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager |
21 | 23 | from airbyte_cdk.sources.message import MessageRepository |
22 | 24 | from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY |
@@ -237,12 +239,41 @@ def _extract_cursor_value(self, record: Record) -> Any: |
237 | 239 | return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) |
238 | 240 |
|
239 | 241 | def close_partition(self, partition: Partition) -> None: |
| 242 | + test_env = os.getenv("PYTEST_CURRENT_TEST") |
| 243 | + if test_env and "test_concurrent_declarative_source.py" in test_env: |
| 244 | + self._message_repository.emit_message( |
| 245 | + AirbyteMessage( |
| 246 | + type=Type.LOG, |
| 247 | + log=AirbyteLogMessage( |
| 248 | + level=Level.INFO, message=f"Closing partition {partition.to_slice()}" |
| 249 | + ), |
| 250 | + ) |
| 251 | + ) |
| 252 | + self._message_repository.emit_message( |
| 253 | + AirbyteMessage( |
| 254 | + type=Type.LOG, |
| 255 | + log=AirbyteLogMessage( |
| 256 | + level=Level.INFO, message=f"\tstate before is {self._concurrent_state}" |
| 257 | + ), |
| 258 | + ) |
| 259 | + ) |
| 260 | + |
240 | 261 | slice_count_before = len(self._concurrent_state.get("slices", [])) |
241 | 262 | self._add_slice_to_state(partition) |
242 | 263 | if slice_count_before < len( |
243 | 264 | self._concurrent_state["slices"] |
244 | 265 | ): # only emit if at least one slice has been processed |
245 | 266 | self._merge_partitions() |
| 267 | + if test_env and "test_concurrent_declarative_source.py" in test_env: |
| 268 | + self._message_repository.emit_message( |
| 269 | + AirbyteMessage( |
| 270 | + type=Type.LOG, |
| 271 | + log=AirbyteLogMessage( |
| 272 | + level=Level.INFO, |
| 273 | + message=f"\tstate after merged partition is {self._concurrent_state}", |
| 274 | + ), |
| 275 | + ) |
| 276 | + ) |
246 | 277 | self._emit_state_message() |
247 | 278 | self._has_closed_at_least_one_slice = True |
248 | 279 |
|
|
0 commit comments