diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7a0063b5e..657ab2df4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -339,6 +339,7 @@ def _group_streams( else None, logger=self.logger, cursor=cursor, + supports_file_transfer=bool(file_uploader), ) ) elif ( @@ -371,6 +372,7 @@ def _group_streams( cursor_field=None, logger=self.logger, cursor=final_state_cursor, + supports_file_transfer=bool(file_uploader), ) ) elif ( @@ -425,6 +427,7 @@ def _group_streams( cursor_field=perpartition_cursor.cursor_field.cursor_field_key, logger=self.logger, cursor=perpartition_cursor, + supports_file_transfer=bool(file_uploader), ) ) else: diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 83850881d..348aa81c7 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -2,11 +2,11 @@ from pathlib import Path from typing import Optional +from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( SafeResponse, ) -from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.requesters import Requester from airbyte_cdk.sources.declarative.types import Record, StreamSlice from airbyte_cdk.sources.utils.files_directory import get_files_directory diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 7679a1eb6..54600d635 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -29,6 +29,7 @@ def __init__( logger: Logger, cursor: Cursor, namespace: Optional[str] = None, + supports_file_transfer: bool = False, ) -> None: self._stream_partition_generator = partition_generator self._name = name @@ -39,6 +40,7 @@ def __init__( self._logger = logger self._cursor = cursor self._namespace = namespace + self._supports_file_transfer = supports_file_transfer def generate_partitions(self) -> Iterable[Partition]: yield from self._stream_partition_generator.generate() @@ -68,6 +70,7 @@ def as_airbyte_stream(self) -> AirbyteStream: json_schema=dict(self._json_schema), supported_sync_modes=[SyncMode.full_refresh], is_resumable=False, + is_file_based=self._supports_file_transfer, ) if self._namespace: diff --git a/airbyte_cdk/sources/utils/record_helper.py b/airbyte_cdk/sources/utils/record_helper.py index a82078d10..d41907cf1 100644 --- a/airbyte_cdk/sources/utils/record_helper.py +++ b/airbyte_cdk/sources/utils/record_helper.py @@ -9,8 +9,8 @@ AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, - AirbyteTraceMessage, AirbyteRecordMessageFileReference, + AirbyteTraceMessage, ) from airbyte_cdk.models import Type as MessageType from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index a90a2705c..1c0547830 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -7,6 +7,7 @@ from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput +from airbyte_cdk.test.entrypoint_wrapper import discover as entrypoint_discover from airbyte_cdk.test.entrypoint_wrapper import read as entrypoint_read from airbyte_cdk.test.state_builder import StateBuilder @@ -50,6 +51,13 @@ def read( ) +def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + config = config_builder.build() + return entrypoint_discover( + _source(CatalogBuilder().build(), config), config, expecting_exception + ) + + class FileStreamTest(TestCase): def _config(self) -> ConfigBuilder: return ConfigBuilder() @@ -90,3 +98,13 @@ def test_get_article_attachments(self) -> None: assert file_reference.file_url assert file_reference.file_relative_path assert file_reference.file_size_bytes + + def test_discover_article_attachments(self) -> None: + output = discover(self._config()) + + article_attachments_stream = next( + filter( + lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams + ) + ) + assert article_attachments_stream.is_file_based