Skip to content

Commit 0a2b4d6

Browse files
committed
file-based: ensure reader is available
1 parent 03dc23e commit 0a2b4d6

File tree

1 file changed

+14
-10
lines changed

1 file changed

+14
-10
lines changed

airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -342,16 +342,23 @@ def _make_default_stream(
342342
preserve_directory_structure=preserve_directory_structure(parsed_config),
343343
)
344344

345-
def _make_permissions_stream(
346-
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
347-
) -> AbstractFileBasedStream:
345+
def _ensure_permissions_reader_available(self) -> None:
348346
"""
349-
Creates a stream that reads permissions from files.
347+
Validates that a stream permissions reader is available.
348+
Raises a ValueError if the reader is not provided.
350349
"""
351350
if not self.stream_permissions_reader:
352351
raise ValueError(
353352
"Stream permissions reader is required for streams that use permissions transfer mode."
354353
)
354+
355+
def _make_permissions_stream(
356+
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
357+
) -> AbstractFileBasedStream:
358+
"""
359+
Creates a stream that reads permissions from files.
360+
"""
361+
self._ensure_permissions_reader_available()
355362
return PermissionsFileBasedStream(
356363
config=stream_config,
357364
catalog_schema=self.stream_schemas.get(stream_config.name),
@@ -362,7 +369,7 @@ def _make_permissions_stream(
362369
validation_policy=self._validate_and_get_validation_policy(stream_config),
363370
errors_collector=self.errors_collector,
364371
cursor=cursor,
365-
stream_permissions_reader=self.stream_permissions_reader,
372+
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
366373
)
367374

368375
def _make_file_based_stream(
@@ -383,13 +390,10 @@ def _make_file_based_stream(
383390
def _make_identities_stream(
384391
self,
385392
) -> Stream:
386-
if not self.stream_permissions_reader:
387-
raise ValueError(
388-
"Stream permissions reader is required for streams that use permissions transfer mode."
389-
)
393+
self._ensure_permissions_reader_available()
390394
return FileIdentitiesStream(
391395
catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
392-
stream_permissions_reader=self.stream_permissions_reader,
396+
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
393397
discovery_policy=self.discovery_policy,
394398
errors_collector=self.errors_collector,
395399
)

0 commit comments

Comments
 (0)