Skip to content

Commit 78259c8

Browse files
feat(file-based): Add logging for observable checkpoints during check and discovery
Co-Authored-By: Aaron <AJ> Steers <[email protected]>
1 parent d4fdd4f commit 78259c8

File tree

5 files changed

+28
-6
lines changed

5 files changed

+28
-6
lines changed

airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,14 @@ def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:
9696
9797
Returns the first file if successful, otherwise raises a CheckAvailabilityError.
9898
"""
99+
stream.logger.info(f"Starting to list files for stream: {stream.name}")
99100
try:
100-
file = next(iter(stream.get_files()))
101+
files = list(stream.get_files())
102+
file_count = len(files)
103+
stream.logger.info(f"Found {file_count} files for stream: {stream.name}")
104+
if file_count == 0:
105+
raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
106+
file = files[0]
101107
except StopIteration:
102108
raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
103109
except CustomFileBasedException as exc:
@@ -107,6 +113,7 @@ def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:
107113
FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name
108114
) from exc
109115

116+
stream.logger.info(f"Successfully verified file access for stream: {stream.name}")
110117
return file
111118

112119
def _check_parse_record(

airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ def check_connection(
154154
155155
Otherwise, the "error" object should describe what went wrong.
156156
"""
157+
logger.info("Starting check connection for file-based source")
157158
try:
158159
streams = self.streams(config)
159160
except Exception as config_exception:
@@ -222,7 +223,9 @@ def check_connection(
222223
failure_type=FailureType.config_error,
223224
)
224225

225-
return not bool(errors), (errors or None)
226+
success = not bool(errors)
227+
logger.info(f"Completed check connection for file-based source. Result: {success}")
228+
return success, (errors or None)
226229

227230
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
228231
"""

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ def get_matching_files(
8585
8686
Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs`
8787
are available, which may be helpful when implementing this method.
88+
89+
Implementation should include logging:
90+
- At the beginning: logger.info(f"Starting to match files with glob patterns: {globs}")
91+
- After filtering files: logger.info(f"Completed matching files with glob patterns: {globs}")
8892
"""
8993
...
9094

airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,16 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
113113
def get_files_to_sync(
114114
self, all_files: Iterable[RemoteFile], logger: logging.Logger
115115
) -> Iterable[RemoteFile]:
116+
logger.info("Starting to determine files to sync based on cursor")
116117
if self._is_history_full():
117118
logger.warning(
118119
f"The state history is full. "
119120
f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
120121
f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
121122
)
122-
for f in all_files:
123-
if self._should_sync_file(f, logger):
124-
yield f
123+
files_to_sync = [f for f in all_files if self._should_sync_file(f, logger)]
124+
logger.info(f"Determined {len(files_to_sync)} files to sync out of {len(list(all_files))} total files")
125+
return files_to_sync
125126

126127
def get_start_time(self) -> datetime:
127128
return self._start_time

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,13 @@ def get_files(self) -> Iterable[RemoteFile]:
342342
)
343343

344344
def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
345+
self.logger.info(f"Starting schema inference for stream {self.name} with {len(files)} files")
345346
loop = asyncio.get_event_loop()
346347
schema = loop.run_until_complete(self._infer_schema(files))
347348
# as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
348-
return self._fill_nulls(deepcopy(schema))
349+
result = self._fill_nulls(deepcopy(schema))
350+
self.logger.info(f"Completed schema inference for stream {self.name}")
351+
return result
349352

350353
@staticmethod
351354
def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]:
@@ -374,6 +377,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
374377
Each file type has a corresponding `infer_schema` handler.
375378
Dispatch on file type.
376379
"""
380+
self.logger.info(f"Starting concurrent schema inference for {len(files)} files with {self._discovery_policy.n_concurrent_requests} concurrent requests")
377381
base_schema: SchemaType = {}
378382
pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()
379383

@@ -383,6 +387,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
383387
while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (
384388
file := next(files_iterator, None)
385389
):
390+
self.logger.debug(f"Starting schema inference for file: {file.uri}")
386391
pending_tasks.add(asyncio.create_task(self._infer_file_schema(file)))
387392
n_started += 1
388393
# Return when the first task is completed so that we can enqueue a new task as soon as the
@@ -392,6 +397,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
392397
)
393398
for task in done:
394399
try:
400+
self.logger.debug(f"Completed schema inference for a file, {len(pending_tasks)} files remaining")
395401
base_schema = merge_schemas(base_schema, task.result())
396402
except AirbyteTracedException as ate:
397403
raise ate
@@ -401,6 +407,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
401407
exc_info=exc,
402408
)
403409

410+
self.logger.info(f"Completed concurrent schema inference for stream {self.name}")
404411
return base_schema
405412

406413
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:

0 commit comments

Comments
 (0)