Skip to content

Commit 7a02ec8

Browse files
maxi297octavia-squidington-iii
andauthored
Add file_based information in discovered catalog (#446)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent ac278b9 commit 7a02ec8

File tree

5 files changed

+26
-2
lines changed

5 files changed

+26
-2
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ def _group_streams(
339339
else None,
340340
logger=self.logger,
341341
cursor=cursor,
342+
supports_file_transfer=bool(file_uploader),
342343
)
343344
)
344345
elif (
@@ -371,6 +372,7 @@ def _group_streams(
371372
cursor_field=None,
372373
logger=self.logger,
373374
cursor=final_state_cursor,
375+
supports_file_transfer=bool(file_uploader),
374376
)
375377
)
376378
elif (
@@ -425,6 +427,7 @@ def _group_streams(
425427
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
426428
logger=self.logger,
427429
cursor=perpartition_cursor,
430+
supports_file_transfer=bool(file_uploader),
428431
)
429432
)
430433
else:

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
from pathlib import Path
33
from typing import Optional
44

5+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
56
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
67
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
78
SafeResponse,
89
)
9-
from airbyte_cdk.models import AirbyteRecordMessageFileReference
1010
from airbyte_cdk.sources.declarative.requesters import Requester
1111
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
1212
from airbyte_cdk.sources.utils.files_directory import get_files_directory

airbyte_cdk/sources/streams/concurrent/default_stream.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ def __init__(
2929
logger: Logger,
3030
cursor: Cursor,
3131
namespace: Optional[str] = None,
32+
supports_file_transfer: bool = False,
3233
) -> None:
3334
self._stream_partition_generator = partition_generator
3435
self._name = name
@@ -39,6 +40,7 @@ def __init__(
3940
self._logger = logger
4041
self._cursor = cursor
4142
self._namespace = namespace
43+
self._supports_file_transfer = supports_file_transfer
4244

4345
def generate_partitions(self) -> Iterable[Partition]:
4446
yield from self._stream_partition_generator.generate()
@@ -68,6 +70,7 @@ def as_airbyte_stream(self) -> AirbyteStream:
6870
json_schema=dict(self._json_schema),
6971
supported_sync_modes=[SyncMode.full_refresh],
7072
is_resumable=False,
73+
is_file_based=self._supports_file_transfer,
7174
)
7275

7376
if self._namespace:

airbyte_cdk/sources/utils/record_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
AirbyteLogMessage,
1010
AirbyteMessage,
1111
AirbyteRecordMessage,
12-
AirbyteTraceMessage,
1312
AirbyteRecordMessageFileReference,
13+
AirbyteTraceMessage,
1414
)
1515
from airbyte_cdk.models import Type as MessageType
1616
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage

unit_tests/sources/declarative/file/test_file_stream.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
88
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
99
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
10+
from airbyte_cdk.test.entrypoint_wrapper import discover as entrypoint_discover
1011
from airbyte_cdk.test.entrypoint_wrapper import read as entrypoint_read
1112
from airbyte_cdk.test.state_builder import StateBuilder
1213

@@ -50,6 +51,13 @@ def read(
5051
)
5152

5253

54+
def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
55+
config = config_builder.build()
56+
return entrypoint_discover(
57+
_source(CatalogBuilder().build(), config), config, expecting_exception
58+
)
59+
60+
5361
class FileStreamTest(TestCase):
5462
def _config(self) -> ConfigBuilder:
5563
return ConfigBuilder()
@@ -90,3 +98,13 @@ def test_get_article_attachments(self) -> None:
9098
assert file_reference.file_url
9199
assert file_reference.file_relative_path
92100
assert file_reference.file_size_bytes
101+
102+
def test_discover_article_attachments(self) -> None:
103+
output = discover(self._config())
104+
105+
article_attachments_stream = next(
106+
filter(
107+
lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams
108+
)
109+
)
110+
assert article_attachments_stream.is_file_based

0 commit comments

Comments
 (0)