Skip to content

Commit 5df84a3

Browse files
committed
file-mode-api: initial changes to handle include_files selection from configured catalog.
1 parent c8e29d1 commit 5df84a3

File tree

5 files changed

+160
-16
lines changed

5 files changed

+160
-16
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def read(
139139
catalog: ConfiguredAirbyteCatalog,
140140
state: Optional[List[AirbyteStateMessage]] = None,
141141
) -> Iterator[AirbyteMessage]:
142-
concurrent_streams, _ = self._group_streams(config=config)
142+
concurrent_streams, _ = self._group_streams(config=config, catalog=catalog)
143143

144144
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
145145
# the concurrent streams must be saved so that they can be removed from the catalog before starting
@@ -180,7 +180,7 @@ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> Airbyte
180180
]
181181
)
182182

183-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
183+
def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog | None = None) -> List[Stream]:
184184
"""
185185
The `streams` method is used as part of the AbstractSource in the following cases:
186186
* ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
@@ -189,10 +189,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
189189
190190
In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
191191
"""
192-
return super().streams(config)
192+
return super().streams(config, catalog=catalog)
193193

194194
def _group_streams(
195-
self, config: Mapping[str, Any]
195+
self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog | None = None
196196
) -> Tuple[List[AbstractStream], List[Stream]]:
197197
concurrent_streams: List[AbstractStream] = []
198198
synchronous_streams: List[Stream] = []
@@ -205,7 +205,7 @@ def _group_streams(
205205

206206
name_to_stream_mapping = {stream["name"]: stream for stream in streams}
207207

208-
for declarative_stream in self.streams(config=config):
208+
for declarative_stream in self.streams(config=config, catalog=catalog):
209209
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
210210
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
211211
# so we need to treat them as synchronous

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
AirbyteMessage,
2121
AirbyteStateMessage,
2222
ConfiguredAirbyteCatalog,
23+
ConfiguredAirbyteStream,
2324
ConnectorSpecification,
2425
FailureType,
2526
)
@@ -141,7 +142,7 @@ def connection_checker(self) -> ConnectionChecker:
141142
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
142143
)
143144

144-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
145+
def streams(self, config: Mapping[str, Any], catalog: Optional[ConfiguredAirbyteCatalog] = None) -> List[Stream]:
145146
self._emit_manifest_debug_message(
146147
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
147148
)
@@ -154,6 +155,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
154155
if api_budget_model:
155156
self._constructor.set_api_budget(api_budget_model, config)
156157

158+
catalog_with_streams_name = self._catalog_with_streams_name(catalog)
157159
source_streams = [
158160
self._constructor.create_component(
159161
StateDelegatingStreamModel
@@ -162,12 +164,35 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
162164
stream_config,
163165
config,
164166
emit_connector_builder_messages=self._emit_connector_builder_messages,
167+
include_files=self._get_include_files(stream_config=stream_config, catalog_with_streams_name=catalog_with_streams_name),
165168
)
166169
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
167170
]
168171

169172
return source_streams
170173

174+
@staticmethod
175+
def _get_include_files(
176+
stream_config: Dict[str, Any], catalog_with_streams_name: Mapping[str, ConfiguredAirbyteStream] | None
177+
) -> Optional[bool]:
178+
"""
179+
Returns the include_files for the stream if it exists in the catalog.
180+
"""
181+
if catalog_with_streams_name:
182+
stream_name = stream_config.get("name")
183+
configured_catalog_stream = catalog_with_streams_name.get(stream_name)
184+
return configured_catalog_stream and configured_catalog_stream.include_files
185+
return False
186+
187+
@staticmethod
188+
def _catalog_with_streams_name(catalog: ConfiguredAirbyteCatalog | None) -> Mapping[str, ConfiguredAirbyteStream] | None:
189+
"""
190+
Returns a dict mapping stream names to their corresponding ConfiguredAirbyteStream objects.
191+
"""
192+
if catalog:
193+
return { configured_stream.stream.name: configured_stream for configured_stream in catalog.streams}
194+
return None
195+
171196
@staticmethod
172197
def _initialize_cache_for_parent_streams(
173198
stream_configs: List[Dict[str, Any]],

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1852,8 +1852,9 @@ def create_declarative_stream(
18521852
)
18531853
file_uploader = None
18541854
if model.file_uploader:
1855+
include_files = kwargs.pop("include_files", False)
18551856
file_uploader = self._create_component_from_model(
1856-
model=model.file_uploader, config=config
1857+
model=model.file_uploader, config=config, include_files=include_files
18571858
)
18581859

18591860
retriever = self._create_component_from_model(
@@ -3597,7 +3598,7 @@ def create_fixed_window_call_rate_policy(
35973598
)
35983599

35993600
def create_file_uploader(
3600-
self, model: FileUploaderModel, config: Config, **kwargs: Any
3601+
self, model: FileUploaderModel, config: Config, include_files: bool, **kwargs: Any
36013602
) -> FileUploader:
36023603
name = "File Uploader"
36033604
requester = self._create_component_from_model(
@@ -3618,7 +3619,7 @@ def create_file_uploader(
36183619
download_target_extractor=download_target_extractor,
36193620
config=config,
36203621
file_writer=NoopFileWriter()
3621-
if emit_connector_builder_messages
3622+
if emit_connector_builder_messages or not include_files
36223623
else LocalFileSystemFileWriter(),
36233624
parameters=model.parameters or {},
36243625
filename_extractor=model.filename_extractor if model.filename_extractor else None,

airbyte_cdk/test/catalog_builder.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStr
4141
self._stream["stream"]["json_schema"] = json_schema
4242
return self
4343

44+
def with_include_files(self, include_files: bool) -> "ConfiguredAirbyteStreamBuilder":
45+
"""
46+
Set whether the stream should include files in the sync.
47+
:param include_files: True if files should be included, False otherwise.
48+
"""
49+
self._stream["include_files"] = include_files
50+
return self
51+
4452
def build(self) -> ConfiguredAirbyteStream:
4553
return ConfiguredAirbyteStreamSerializer.load(self._stream)
4654

unit_tests/sources/declarative/file/test_file_stream.py

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ def test_get_articles(self) -> None:
120120
assert output.records
121121

122122
def test_get_article_attachments(self) -> None:
123-
with HttpMocker() as http_mocker:
123+
with HttpMocker() as http_mocker, patch(
124+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write') as mock_noop_write, patch(
125+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write') as mock_file_system_write:
124126
http_mocker.get(
125127
HttpRequest(url=STREAM_URL),
126128
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
@@ -138,14 +140,21 @@ def test_get_article_attachments(self) -> None:
138140
),
139141
)
140142

143+
file_size = 12345
144+
mock_file_system_write.return_value = file_size # Simulate a file size
145+
141146
output = read(
142147
self._config(),
143148
CatalogBuilder()
144-
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
149+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments").with_include_files(True))
145150
.build(),
146151
)
147152

148153
assert output.records
154+
# Ensure that FileSystemFileWriter is called.
155+
mock_file_system_write.assert_called()
156+
# Ensure that NoopFileWriter is not called.
157+
mock_noop_write.assert_not_called()
149158
file_reference = output.records[0].record.file_reference
150159
assert file_reference
151160
assert file_reference.staging_file_url
@@ -158,7 +167,8 @@ def test_get_article_attachments(self) -> None:
158167
)
159168
assert file_reference.file_size_bytes
160169

161-
def test_get_article_attachments_with_filename_extractor(self) -> None:
170+
def test_get_article_attachments_and_file_is_uploaded(self) -> None:
171+
"""Test that article attachments can be read and the file is uploaded to the staging directory"""
162172
with HttpMocker() as http_mocker:
163173
http_mocker.get(
164174
HttpRequest(url=STREAM_URL),
@@ -180,12 +190,52 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
180190
output = read(
181191
self._config(),
182192
CatalogBuilder()
183-
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
193+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments").with_include_files(True))
194+
.build(),
195+
yaml_file="test_file_stream_with_filename_extractor.yaml",
196+
)
197+
file_reference = output.records[0].record.file_reference
198+
assert file_reference.file_size_bytes
199+
assert Path(file_reference.staging_file_url).exists(), "File should be uploaded to the staging directory"
200+
201+
def test_get_article_attachments_with_filename_extractor(self) -> None:
202+
"""Test that article attachments can be read with filename extractor and file system writer is called"""
203+
with HttpMocker() as http_mocker, patch(
204+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write') as mock_noop_write, patch(
205+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write') as mock_file_system_write:
206+
http_mocker.get(
207+
HttpRequest(url=STREAM_URL),
208+
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
209+
)
210+
http_mocker.get(
211+
HttpRequest(url=STREAM_ATTACHMENTS_URL),
212+
HttpResponse(
213+
json.dumps(find_template("file_api/article_attachments", __file__)), 200
214+
),
215+
)
216+
http_mocker.get(
217+
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
218+
HttpResponse(
219+
find_binary_response("file_api/article_attachment_content.png", __file__), 200
220+
),
221+
)
222+
223+
file_size = 12345
224+
mock_file_system_write.return_value = file_size # Simulate a file size
225+
226+
output = read(
227+
self._config(),
228+
CatalogBuilder()
229+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments").with_include_files(True))
184230
.build(),
185231
yaml_file="test_file_stream_with_filename_extractor.yaml",
186232
)
187233

188234
assert len(output.records) == 1
235+
# Ensure that FileSystemFileWriter is called.
236+
mock_file_system_write.assert_called()
237+
# Ensure that NoopFileWriter is not called.
238+
mock_noop_write.assert_not_called()
189239
file_reference = output.records[0].record.file_reference
190240
assert file_reference
191241
assert (
@@ -196,10 +246,63 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
196246
assert not re.match(
197247
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path
198248
)
199-
assert file_reference.file_size_bytes
249+
assert file_reference.file_size_bytes == file_size
250+
251+
def test_get_article_attachments_without_include_files(self) -> None:
252+
"""Test that article attachments can be read without including files, it can be opt-out by configured catalog"""
253+
include_files = False
254+
with HttpMocker() as http_mocker, patch(
255+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write') as mock_noop_write, patch(
256+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write') as mock_file_system_write:
257+
http_mocker.get(
258+
HttpRequest(url=STREAM_URL),
259+
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
260+
)
261+
http_mocker.get(
262+
HttpRequest(url=STREAM_ATTACHMENTS_URL),
263+
HttpResponse(
264+
json.dumps(find_template("file_api/article_attachments", __file__)), 200
265+
),
266+
)
267+
http_mocker.get(
268+
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
269+
HttpResponse(
270+
find_binary_response("file_api/article_attachment_content.png", __file__), 200
271+
),
272+
)
273+
274+
mock_noop_write.return_value = NoopFileWriter.NOOP_FILE_SIZE
275+
276+
output = read(
277+
self._config(),
278+
CatalogBuilder()
279+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments").with_include_files(include_files))
280+
.build(),
281+
yaml_file="test_file_stream_with_filename_extractor.yaml",
282+
)
283+
284+
assert len(output.records) == 1
285+
# Ensure that LocalFileSystemFileWriter is not called when include_files is False
286+
mock_file_system_write.assert_not_called()
287+
# Ensure that NoopFileWriter is called to simulate file writing
288+
mock_noop_write.assert_called()
289+
file_reference = output.records[0].record.file_reference
290+
assert file_reference
291+
assert (
292+
file_reference.staging_file_url
293+
== "/tmp/airbyte-file-transfer/article_attachments/12138758717583/some_image_name.png"
294+
)
295+
296+
assert file_reference.source_file_relative_path
297+
assert not re.match(
298+
r"^article_attachments/[0-9a-fA-F-]{36}$", file_reference.source_file_relative_path
299+
)
300+
assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE
200301

201302
def test_get_article_attachments_messages_for_connector_builder(self) -> None:
202-
with HttpMocker() as http_mocker:
303+
with HttpMocker() as http_mocker, patch(
304+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer.NoopFileWriter.write') as mock_noop_write, patch(
305+
'airbyte_cdk.sources.declarative.retrievers.file_uploader.local_file_system_file_writer.LocalFileSystemFileWriter.write') as mock_file_system_write:
203306
http_mocker.get(
204307
HttpRequest(url=STREAM_URL),
205308
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
@@ -217,6 +320,9 @@ def test_get_article_attachments_messages_for_connector_builder(self) -> None:
217320
),
218321
)
219322

323+
file_size = NoopFileWriter.NOOP_FILE_SIZE
324+
mock_noop_write.return_value = file_size # Simulate a file size
325+
220326
# Define a mock factory that forces emit_connector_builder_messages=True
221327
class MockModelToComponentFactory(OriginalModelToComponentFactory):
222328
def __init__(self, *args, **kwargs):
@@ -231,12 +337,16 @@ def __init__(self, *args, **kwargs):
231337
output = read(
232338
self._config(),
233339
CatalogBuilder()
234-
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
340+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments").with_include_files(True))
235341
.build(),
236342
yaml_file="test_file_stream_with_filename_extractor.yaml",
237343
)
238344

239345
assert len(output.records) == 1
346+
# Ensure that NoopFileWriter is called.
347+
mock_noop_write.assert_called()
348+
# Ensure that LocalFileSystemFileWriter is not called.
349+
mock_file_system_write.assert_not_called()
240350
file_reference = output.records[0].record.file_reference
241351
assert file_reference
242352
assert file_reference.staging_file_url

0 commit comments

Comments
 (0)