Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def _group_streams(
else None,
logger=self.logger,
cursor=cursor,
supports_file_transfer=bool(file_uploader),
)
)
elif (
Expand Down Expand Up @@ -371,6 +372,7 @@ def _group_streams(
cursor_field=None,
logger=self.logger,
cursor=final_state_cursor,
supports_file_transfer=bool(file_uploader),
)
)
elif (
Expand Down Expand Up @@ -425,6 +427,7 @@ def _group_streams(
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
cursor=perpartition_cursor,
supports_file_transfer=bool(file_uploader),
)
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from pathlib import Path
from typing import Optional

from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SafeResponse,
)
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.requesters import Requester
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.utils.files_directory import get_files_directory
Expand Down
3 changes: 3 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
logger: Logger,
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
Expand All @@ -39,6 +40,7 @@ def __init__(
self._logger = logger
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer

def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
Expand Down Expand Up @@ -68,6 +70,7 @@ def as_airbyte_stream(self) -> AirbyteStream:
json_schema=dict(self._json_schema),
supported_sync_modes=[SyncMode.full_refresh],
is_resumable=False,
is_file_based=self._supports_file_transfer,
)

if self._namespace:
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/utils/record_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteTraceMessage,
AirbyteRecordMessageFileReference,
AirbyteTraceMessage,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
Expand Down
18 changes: 18 additions & 0 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.entrypoint_wrapper import discover as entrypoint_discover
from airbyte_cdk.test.entrypoint_wrapper import read as entrypoint_read
from airbyte_cdk.test.state_builder import StateBuilder

Expand Down Expand Up @@ -50,6 +51,13 @@ def read(
)


def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
config = config_builder.build()
return entrypoint_discover(
_source(CatalogBuilder().build(), config), config, expecting_exception
)


class FileStreamTest(TestCase):
def _config(self) -> ConfigBuilder:
return ConfigBuilder()
Expand Down Expand Up @@ -90,3 +98,13 @@ def test_get_article_attachments(self) -> None:
assert file_reference.file_url
assert file_reference.file_relative_path
assert file_reference.file_size_bytes

def test_discover_article_attachments(self) -> None:
output = discover(self._config())

article_attachments_stream = next(
filter(
lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams
)
)
assert article_attachments_stream.is_file_based
Loading