Skip to content

Commit 1c3975a

Browse files
author
maxime.c
committed
add logging to concurrent cursor
1 parent 6d75a92 commit 1c3975a

File tree

1 file changed

+7
-0
lines changed
  • airbyte_cdk/sources/streams/concurrent

1 file changed

+7
-0
lines changed

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import functools
66
import logging
7+
import os
78
from abc import ABC, abstractmethod
89
from typing import (
910
Any,
@@ -237,12 +238,18 @@ def _extract_cursor_value(self, record: Record) -> Any:
237238
return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
238239

239240
def close_partition(self, partition: Partition) -> None:
241+
test_env = os.getenv("PYTEST_CURRENT_TEST")
242+
if test_env and "test_concurrent_declarative_source.py" in test_env:
243+
LOGGER.info(f"Closing partition {partition.to_slice()}")
244+
LOGGER.info(f"\tstate before is {self._concurrent_state}")
240245
slice_count_before = len(self._concurrent_state.get("slices", []))
241246
self._add_slice_to_state(partition)
242247
if slice_count_before < len(
243248
self._concurrent_state["slices"]
244249
): # only emit if at least one slice has been processed
245250
self._merge_partitions()
251+
if test_env and "test_concurrent_declarative_source.py" in test_env:
252+
LOGGER.info(f"\tstate after merged partition is {self._concurrent_state}")
246253
self._emit_state_message()
247254
self._has_closed_at_least_one_slice = True
248255

0 commit comments

Comments
 (0)