Skip to content

Commit a6658e6

Browse files
committed
connector builder: initial changes to pass file reference info to data
1 parent 2f29eff commit a6658e6

File tree

4 files changed

+114
-9
lines changed

4 files changed

+114
-9
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,8 @@
481481
SimpleRetriever,
482482
SimpleRetrieverTestReadDecorator,
483483
)
484-
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
484+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \
485+
ConnectorBuilderFileUploader, BaseFileUploader
485486
from airbyte_cdk.sources.declarative.schema import (
486487
ComplexFieldType,
487488
DefaultSchemaLoader,
@@ -3592,7 +3593,7 @@ def create_fixed_window_call_rate_policy(
35923593

35933594
def create_file_uploader(
35943595
self, model: FileUploaderModel, config: Config, **kwargs: Any
3595-
) -> FileUploader:
3596+
) -> BaseFileUploader:
35963597
name = "File Uploader"
35973598
requester = self._create_component_from_model(
35983599
model=model.requester,
@@ -3606,14 +3607,18 @@ def create_file_uploader(
36063607
name=name,
36073608
**kwargs,
36083609
)
3609-
return FileUploader(
3610+
emit_connector_builder_messages = self._emit_connector_builder_messages
3611+
file_uploader = FileUploader(
36103612
requester=requester,
36113613
download_target_extractor=download_target_extractor,
36123614
config=config,
3615+
file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(),
36133616
parameters=model.parameters or {},
36143617
filename_extractor=model.filename_extractor if model.filename_extractor else None,
36153618
)
36163619

3620+
return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader
3621+
36173622
def create_moving_window_call_rate_policy(
36183623
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
36193624
) -> MovingWindowCallRatePolicy:

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pathlib import Path
1010
from typing import Any, Mapping, Optional, Union
1111

12+
from abc import ABC, abstractmethod
1213
from airbyte_cdk.models import AirbyteRecordMessageFileReference
1314
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1415
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
@@ -24,12 +25,56 @@
2425

2526
logger = logging.getLogger("airbyte")
2627

28+
@dataclass
29+
class BaseFileUploader(ABC):
30+
"""
31+
Base class for file uploader
32+
"""
33+
34+
@abstractmethod
35+
def upload(self, record: Record) -> None:
36+
"""
37+
Uploads the file to the specified location
38+
"""
39+
...
40+
41+
class BaseFileWriter(ABC):
42+
"""
43+
Base File writer class
44+
"""
45+
46+
@abstractmethod
47+
def write(self, file_path: Path, content: bytes) -> int:
48+
"""
49+
Writes the file to the specified location
50+
"""
51+
...
52+
53+
class FileWriter(BaseFileWriter):
54+
55+
def write(self, file_path: Path, content: bytes) -> int:
56+
"""
57+
Writes the file to the specified location
58+
"""
59+
with open(str(file_path), "wb") as f:
60+
f.write(content)
61+
62+
return file_path.stat().st_size
63+
64+
class NoopFileWriter(BaseFileWriter):
65+
66+
def write(self, file_path: Path, content: bytes) -> int:
67+
"""
68+
Noop file writer
69+
"""
70+
return 0
2771

2872
@dataclass
29-
class FileUploader:
73+
class FileUploader(BaseFileUploader):
3074
requester: Requester
3175
download_target_extractor: RecordExtractor
3276
config: Config
77+
file_writer: BaseFileWriter
3378
parameters: InitVar[Mapping[str, Any]]
3479

3580
filename_extractor: Optional[Union[InterpolatedString, str]] = None
@@ -77,9 +122,7 @@ def upload(self, record: Record) -> None:
77122
full_path = files_directory / file_relative_path
78123
full_path.parent.mkdir(parents=True, exist_ok=True)
79124

80-
with open(str(full_path), "wb") as f:
81-
f.write(response.content)
82-
file_size_bytes = full_path.stat().st_size
125+
file_size_bytes = self.file_writer.write(full_path, content=response.content)
83126

84127
logger.info("File uploaded successfully")
85128
logger.info(f"File url: {str(full_path)}")
@@ -91,3 +134,13 @@ def upload(self, record: Record) -> None:
91134
source_file_relative_path=str(file_relative_path),
92135
file_size_bytes=file_size_bytes,
93136
)
137+
138+
139+
@dataclass
140+
class ConnectorBuilderFileUploader(BaseFileUploader):
141+
file_uploader: FileUploader
142+
143+
def upload(self, record: Record) -> None:
144+
self.file_uploader.upload(record=record)
145+
for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]:
146+
record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute)

airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def __init__(
2424
catalog: Optional[ConfiguredAirbyteCatalog] = None,
2525
config: Optional[Mapping[str, Any]] = None,
2626
state: Optional[List[AirbyteStateMessage]] = None,
27+
emit_connector_builder_messages: Optional[bool] = False
2728
) -> None:
2829
"""
2930
:param path_to_yaml: Path to the yaml file describing the source
@@ -36,6 +37,7 @@ def __init__(
3637
config=config or {},
3738
state=state or [],
3839
source_config=source_config,
40+
emit_connector_builder_messages=emit_connector_builder_messages
3941
)
4042

4143
def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:

unit_tests/sources/declarative/file/test_file_stream.py

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from pathlib import Path
44
from typing import Any, Dict, List, Optional
55
from unittest import TestCase
6-
from unittest.mock import Mock
6+
from unittest.mock import Mock, patch
77

88
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
99
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
@@ -34,6 +34,7 @@ def _source(
3434
config: Dict[str, Any],
3535
state: Optional[List[AirbyteStateMessage]] = None,
3636
yaml_file: Optional[str] = None,
37+
emit_connector_builder_messages: Optional[bool] = False
3738
) -> YamlDeclarativeSource:
3839
if not yaml_file:
3940
yaml_file = "file_stream_manifest.yaml"
@@ -42,6 +43,7 @@ def _source(
4243
catalog=catalog,
4344
config=config,
4445
state=state,
46+
emit_connector_builder_messages=emit_connector_builder_messages
4547
)
4648

4749

@@ -51,11 +53,12 @@ def read(
5153
state_builder: Optional[StateBuilder] = None,
5254
expecting_exception: bool = False,
5355
yaml_file: Optional[str] = None,
56+
emit_connector_builder_messages: Optional[bool] = False
5457
) -> EntrypointOutput:
5558
config = config_builder.build()
5659
state = state_builder.build() if state_builder else StateBuilder().build()
5760
return entrypoint_read(
58-
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
61+
_source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception
5962
)
6063

6164

@@ -190,6 +193,48 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
190193
)
191194
assert file_reference.file_size_bytes
192195

196+
def test_get_article_attachments_messages_for_connector_builder(self) -> None:
197+
with HttpMocker() as http_mocker:
198+
http_mocker.get(
199+
HttpRequest(url=STREAM_URL),
200+
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
201+
)
202+
http_mocker.get(
203+
HttpRequest(url=STREAM_ATTACHMENTS_URL),
204+
HttpResponse(
205+
json.dumps(find_template("file_api/article_attachments", __file__)), 200
206+
),
207+
)
208+
http_mocker.get(
209+
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
210+
HttpResponse(
211+
find_binary_response("file_api/article_attachment_content.png", __file__), 200
212+
),
213+
)
214+
215+
output = read(
216+
self._config(),
217+
CatalogBuilder()
218+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
219+
.build(),
220+
yaml_file="test_file_stream_with_filename_extractor.yaml",
221+
emit_connector_builder_messages=True,
222+
)
223+
224+
assert len(output.records) == 1
225+
file_reference = output.records[0].record.file_reference
226+
assert file_reference
227+
assert file_reference.staging_file_url
228+
assert file_reference.source_file_relative_path
229+
# because we didn't write the file, the size is 0
230+
assert file_reference.file_size_bytes == 0
231+
232+
# Assert file reference fields are copied to record data
233+
record_data = output.records[0].record.data
234+
assert record_data["staging_file_url"] == file_reference.staging_file_url
235+
assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path
236+
assert record_data["file_size_bytes"] == file_reference.file_size_bytes
237+
193238
def test_discover_article_attachments(self) -> None:
194239
output = discover(self._config())
195240

0 commit comments

Comments
 (0)