diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 6cf0aa392..17a7ee957 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -48,6 +48,9 @@ FileBasedErrorsCollector, FileBasedSourceError, ) +from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import ( + AbstractFileBasedStreamPermissionsReader, +) from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.file_types import default_parsers from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser @@ -100,8 +103,10 @@ def __init__( cursor_cls: Type[ Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor] ] = FileBasedConcurrentCursor, + stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None, ): self.stream_reader = stream_reader + self.stream_permissions_reader = stream_permissions_reader self.spec_class = spec_class self.config = config self.catalog = catalog @@ -234,6 +239,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: try: parsed_config = self._get_parsed_config(config) self.stream_reader.config = parsed_config + if self.stream_permissions_reader: + self.stream_permissions_reader.config = parsed_config streams: List[Stream] = [] for stream_config in parsed_config.streams: # Like state_manager, `catalog_stream` may be None during `check` @@ -337,9 +344,23 @@ def _make_default_stream( preserve_directory_structure=preserve_directory_structure(parsed_config), ) + def _ensure_permissions_reader_available(self) -> None: + """ + Validates that a stream permissions reader is available. + Raises a ValueError if the reader is not provided. + """ + if not self.stream_permissions_reader: + raise ValueError( + "Stream permissions reader is required for streams that use permissions transfer mode." + ) + def _make_permissions_stream( self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor] ) -> AbstractFileBasedStream: + """ + Creates a stream that reads permissions from files. + """ + self._ensure_permissions_reader_available() return PermissionsFileBasedStream( config=stream_config, catalog_schema=self.stream_schemas.get(stream_config.name), @@ -350,6 +371,7 @@ def _make_permissions_stream( validation_policy=self._validate_and_get_validation_policy(stream_config), errors_collector=self.errors_collector, cursor=cursor, + stream_permissions_reader=self.stream_permissions_reader, # type: ignore ) def _make_file_based_stream( @@ -370,9 +392,10 @@ def _make_file_based_stream( def _make_identities_stream( self, ) -> Stream: + self._ensure_permissions_reader_available() return FileIdentitiesStream( catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME), - stream_reader=self.stream_reader, + stream_permissions_reader=self.stream_permissions_reader, # type: ignore discovery_policy=self.discovery_policy, errors_collector=self.errors_collector, ) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_permissions_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_permissions_reader.py new file mode 100644 index 000000000..aff5c652c --- /dev/null +++ b/airbyte_cdk/sources/file_based/file_based_stream_permissions_reader.py @@ -0,0 +1,123 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import logging +from abc import ABC, abstractmethod +from typing import Any, Dict, Iterable, Optional + +from airbyte_cdk.sources.file_based import AbstractFileBasedSpec +from airbyte_cdk.sources.file_based.remote_file import RemoteFile + + +class AbstractFileBasedStreamPermissionsReader(ABC): + """ + This class is responsible for reading file permissions and Identities from a source. + """ + + def __init__(self) -> None: + self._config = None + + @property + def config(self) -> Optional[AbstractFileBasedSpec]: + return self._config + + @config.setter + @abstractmethod + def config(self, value: AbstractFileBasedSpec) -> None: + """ + FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader. + + Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader + will require keys that (for example) allow it to authenticate with the 3rd party. + + Therefore, concrete implementations of AbstractFileBasedStreamPermissionsReader's's config setter should assert that `value` is of the correct + config type for that type of StreamReader. + """ + ... + + @abstractmethod + def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: + """ + This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it + + e.g. + def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): + api_conn = some_api.conn(credentials=SOME_CREDENTIALS) + result = api_conn.get_file_permissions_info(file.id) + return MyPermissionsModel( + id=result["id"], + access_control_list = result["access_control_list"], + is_public = result["is_public"], + ).dict() + """ + ... + + @abstractmethod + def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: + """ + This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists. + + e.g. + def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: + api_conn = some_api.conn(credentials=SOME_CREDENTIALS) + users_api = api_conn.users() + groups_api = api_conn.groups() + members_api = self.google_directory_service.members() + for user in users_api.list(): + yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() + for group in groups_api.list(): + group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() + for member in members_api.list(group=group): + group_obj.member_email_addresses = group_obj.member_email_addresses or [] + group_obj.member_email_addresses.append(member.email) + yield group_obj.dict() + """ + ... + + @property + @abstractmethod + def file_permissions_schema(self) -> Dict[str, Any]: + """ + This function should return the permissions schema for file permissions stream. + + e.g. + def file_permissions_schema(self) -> Dict[str, Any]: + # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json + return { + "type": "object", + "properties": { + "id": { "type": "string" }, + "file_path": { "type": "string" }, + "access_control_list": { + "type": "array", + "items": { "type": "string" } + }, + "publicly_accessible": { "type": "boolean" } + } + } + """ + ... + + @property + @abstractmethod + def identities_schema(self) -> Dict[str, Any]: + """ + This function should return the identities schema for file identity stream. + + e.g. + def identities_schema(self) -> Dict[str, Any]: + # you can also follow the pattern we have for python connectors and have a json file and read from there e.g. schemas/identities.json + return { + "type": "object", + "properties": { + "id": { "type": "string" }, + "remote_id": { "type": "string" }, + "name": { "type": ["null", "string"] }, + "email_address": { "type": ["null", "string"] }, + "member_email_addresses": { "type": ["null", "array"] }, + "type": { "type": "string" }, + } + } + """ + ... diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index d5cd759bf..cbf3d119b 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -184,97 +184,3 @@ def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> Li makedirs(path.dirname(local_file_path), exist_ok=True) absolute_file_path = path.abspath(local_file_path) return [file_relative_path, local_file_path, absolute_file_path] - - @abstractmethod - def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: - """ - This function should return the allow list for a given file, i.e. the list of all identities and their permission levels associated with it - - e.g. - def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger): - api_conn = some_api.conn(credentials=SOME_CREDENTIALS) - result = api_conn.get_file_permissions_info(file.id) - return MyPermissionsModel( - id=result["id"], - access_control_list = result["access_control_list"], - is_public = result["is_public"], - ).dict() - """ - raise NotImplementedError( - f"{self.__class__.__name__} does not implement get_file_acl_permissions(). To support ACL permissions, implement this method and update file_permissions_schema." - ) - - @abstractmethod - def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: - """ - This function should return the Identities in a determined "space" or "domain" where the file metadata (ACLs) are fetched and ACLs items (Identities) exists. - - e.g. - def load_identity_groups(self, logger: logging.Logger) -> Dict[str, Any]: - api_conn = some_api.conn(credentials=SOME_CREDENTIALS) - users_api = api_conn.users() - groups_api = api_conn.groups() - members_api = self.google_directory_service.members() - for user in users_api.list(): - yield my_identity_model(id=user.id, name=user.name, email_address=user.email, type="user").dict() - for group in groups_api.list(): - group_obj = my_identity_model(id=group.id, name=groups.name, email_address=user.email, type="group").dict() - for member in members_api.list(group=group): - group_obj.member_email_addresses = group_obj.member_email_addresses or [] - group_obj.member_email_addresses.append(member.email) - yield group_obj.dict() - """ - raise NotImplementedError( - f"{self.__class__.__name__} does not implement load_identity_groups(). To support identities, implement this method and update identities_schema." - ) - - @property - @abstractmethod - def file_permissions_schema(self) -> Dict[str, Any]: - """ - This function should return the permissions schema for file permissions stream. - - e.g. - def file_permissions_schema(self) -> Dict[str, Any]: - # you can also follow the patter we have for python connectors and have a json file and read from there e.g. schemas/identities.json - return { - "type": "object", - "properties": { - "id": { "type": "string" }, - "file_path": { "type": "string" }, - "access_control_list": { - "type": "array", - "items": { "type": "string" } - }, - "publicly_accessible": { "type": "boolean" } - } - } - """ - raise NotImplementedError( - f"{self.__class__.__name__} does not implement file_permissions_schema, please return json schema for your permissions streams." - ) - - @property - @abstractmethod - def identities_schema(self) -> Dict[str, Any]: - """ - This function should return the identities schema for file identity stream. - - e.g. - def identities_schema(self) -> Dict[str, Any]: - # you can also follow the patter we have for python connectors and have a json file and read from there e.g. schemas/identities.json - return { - "type": "object", - "properties": { - "id": { "type": "string" }, - "remote_id": { "type": "string" }, - "name": { "type": ["null", "string"] }, - "email_address": { "type": ["null", "string"] }, - "member_email_addresses": { "type": ["null", "array"] }, - "type": { "type": "string" }, - } - } - """ - raise NotImplementedError( - f"{self.__class__.__name__} does not implement identities_schema, please return json schema for your identities stream." - ) diff --git a/airbyte_cdk/sources/file_based/stream/identities_stream.py b/airbyte_cdk/sources/file_based/stream/identities_stream.py index d0c33baa1..2c582d0b2 100644 --- a/airbyte_cdk/sources/file_based/stream/identities_stream.py +++ b/airbyte_cdk/sources/file_based/stream/identities_stream.py @@ -8,7 +8,9 @@ from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy from airbyte_cdk.sources.file_based.exceptions import FileBasedErrorsCollector -from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import ( + AbstractFileBasedStreamPermissionsReader, +) from airbyte_cdk.sources.streams.core import JsonSchema from airbyte_cdk.sources.streams.permissions.identities_stream import IdentitiesStream @@ -24,13 +26,13 @@ class FileIdentitiesStream(IdentitiesStream): def __init__( self, catalog_schema: Optional[Mapping[str, Any]], - stream_reader: AbstractFileBasedStreamReader, + stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, discovery_policy: AbstractDiscoveryPolicy, errors_collector: FileBasedErrorsCollector, ) -> None: super().__init__() self.catalog_schema = catalog_schema - self.stream_reader = stream_reader + self.stream_permissions_reader = stream_permissions_reader self._discovery_policy = discovery_policy self.errors_collector = errors_collector self._cursor: MutableMapping[str, Any] = {} @@ -40,8 +42,8 @@ def primary_key(self) -> PrimaryKeyType: return None def load_identity_groups(self) -> Iterable[Dict[str, Any]]: - return self.stream_reader.load_identity_groups(logger=self.logger) + return self.stream_permissions_reader.load_identity_groups(logger=self.logger) @cache def get_json_schema(self) -> JsonSchema: - return self.stream_reader.identities_schema + return self.stream_permissions_reader.identities_schema diff --git a/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py index 75e201101..52003c7ae 100644 --- a/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/permissions_file_based_stream.py @@ -7,6 +7,9 @@ from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import ( + AbstractFileBasedStreamPermissionsReader, +) from airbyte_cdk.sources.file_based.stream import DefaultFileBasedStream from airbyte_cdk.sources.file_based.types import StreamSlice from airbyte_cdk.sources.streams.core import JsonSchema @@ -26,10 +29,16 @@ class PermissionsFileBasedStream(DefaultFileBasedStream): and schema definition, while this class handles the streaming interface. """ + def __init__( + self, stream_permissions_reader: AbstractFileBasedStreamPermissionsReader, **kwargs: Any + ): + super().__init__(**kwargs) + self.stream_permissions_reader = stream_permissions_reader + def _filter_schema_invalid_properties( self, configured_catalog_json_schema: Dict[str, Any] ) -> Dict[str, Any]: - return self.stream_reader.file_permissions_schema + return self.stream_permissions_reader.file_permissions_schema def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]: """ @@ -40,7 +49,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte no_permissions = False file_datetime_string = file.last_modified.strftime(self.DATE_TIME_FORMAT) try: - permissions_record = self.stream_reader.get_file_acl_permissions( + permissions_record = self.stream_permissions_reader.get_file_acl_permissions( file, logger=self.logger ) if not permissions_record: @@ -82,4 +91,4 @@ def _get_raw_json_schema(self) -> JsonSchema: Returns: The file permissions schema that defines the structure of permission records """ - return self.stream_reader.file_permissions_schema + return self.stream_permissions_reader.file_permissions_schema diff --git a/unit_tests/sources/file_based/stream/test_file_identities_stream.py b/unit_tests/sources/file_based/stream/test_file_identities_stream.py index 59ba53166..9004899c3 100644 --- a/unit_tests/sources/file_based/stream/test_file_identities_stream.py +++ b/unit_tests/sources/file_based/stream/test_file_identities_stream.py @@ -12,7 +12,9 @@ from airbyte_cdk.sources.file_based.exceptions import ( FileBasedErrorsCollector, ) -from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader +from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import ( + AbstractFileBasedStreamPermissionsReader, +) from airbyte_cdk.sources.file_based.stream import FileIdentitiesStream @@ -59,20 +61,23 @@ class IdentitiesFileBasedStreamTest(unittest.TestCase): def setUp(self) -> None: self._catalog_schema = Mock() - self._stream_reader = Mock(spec=AbstractFileBasedStreamReader) + self._stream_permissions_reader = Mock(spec=AbstractFileBasedStreamPermissionsReader) self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy) - self._stream_reader.identities_schema = self._IDENTITIES_SCHEMA + self._stream_permissions_reader.identities_schema = self._IDENTITIES_SCHEMA self._stream = FileIdentitiesStream( catalog_schema=self._catalog_schema, - stream_reader=self._stream_reader, + stream_permissions_reader=self._stream_permissions_reader, discovery_policy=self._discovery_policy, errors_collector=FileBasedErrorsCollector(), ) def test_when_read_records_then_return_records(self) -> None: - self._stream_reader.load_identity_groups.return_value = [self._A_RECORD, self._GROUP_RECORD] + self._stream_permissions_reader.load_identity_groups.return_value = [ + self._A_RECORD, + self._GROUP_RECORD, + ] messages = list(self._stream.read_records(SyncMode.full_refresh)) assert list(map(lambda message: message.record.data, messages)) == [ self._A_RECORD, @@ -84,7 +89,7 @@ def test_when_getting_schema(self): assert returned_schema == self._IDENTITIES_SCHEMA def test_when_read_records_and_raise_exception(self) -> None: - self._stream_reader.load_identity_groups.side_effect = Exception( + self._stream_permissions_reader.load_identity_groups.side_effect = Exception( "Identities retrieval failed" ) diff --git a/unit_tests/sources/file_based/stream/test_permissions_file_based_stream.py b/unit_tests/sources/file_based/stream/test_permissions_file_based_stream.py index efc929c7e..532009585 100644 --- a/unit_tests/sources/file_based/stream/test_permissions_file_based_stream.py +++ b/unit_tests/sources/file_based/stream/test_permissions_file_based_stream.py @@ -14,6 +14,9 @@ from airbyte_cdk.sources.file_based.exceptions import ( FileBasedErrorsCollector, ) +from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import ( + AbstractFileBasedStreamPermissionsReader, +) from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -53,6 +56,7 @@ def setUp(self) -> None: self._stream_config.name = "a stream name" self._catalog_schema = Mock() self._stream_reader = Mock(spec=AbstractFileBasedStreamReader) + self._stream_permissions_reader = Mock(spec=AbstractFileBasedStreamPermissionsReader) self._availability_strategy = Mock(spec=AbstractFileBasedAvailabilityStrategy) self._discovery_policy = Mock(spec=AbstractDiscoveryPolicy) self._parser = Mock(spec=FileTypeParser) @@ -60,7 +64,7 @@ def setUp(self) -> None: self._validation_policy.name = "validation policy name" self._cursor = Mock(spec=AbstractFileBasedCursor) - self._stream_reader.file_permissions_schema = self._A_PERMISSIONS_SCHEMA + self._stream_permissions_reader.file_permissions_schema = self._A_PERMISSIONS_SCHEMA self._stream = PermissionsFileBasedStream( config=self._stream_config, @@ -72,10 +76,11 @@ def setUp(self) -> None: validation_policy=self._validation_policy, cursor=self._cursor, errors_collector=FileBasedErrorsCollector(), + stream_permissions_reader=self._stream_permissions_reader, ) def test_when_read_records_from_slice_then_return_records(self) -> None: - self._stream_reader.get_file_acl_permissions.return_value = self._A_RECORD + self._stream_permissions_reader.get_file_acl_permissions.return_value = self._A_RECORD messages = list( self._stream.read_records_from_slice( {"files": [RemoteFile(uri="uri", last_modified=self._NOW)]} @@ -102,7 +107,7 @@ def test_when_getting_schema(self): assert returned_schema == expected_schema def test_when_read_records_from_slice_and_raise_exception(self) -> None: - self._stream_reader.get_file_acl_permissions.side_effect = Exception( + self._stream_permissions_reader.get_file_acl_permissions.side_effect = Exception( "ACL permissions retrieval failed" ) @@ -117,7 +122,7 @@ def test_when_read_records_from_slice_and_raise_exception(self) -> None: ) def test_when_read_records_from_slice_with_empty_permissions_then_return_empty(self) -> None: - self._stream_reader.get_file_acl_permissions.return_value = {} + self._stream_permissions_reader.get_file_acl_permissions.return_value = {} messages = list( self._stream.read_records_from_slice( {"files": [RemoteFile(uri="uri", last_modified=self._NOW)]}