Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,13 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
BaseFileUploader,
ConnectorBuilderFileUploader,
FileUploader,
FileWriter,
NoopFileWriter,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -3592,7 +3598,7 @@ def create_fixed_window_call_rate_policy(

def create_file_uploader(
self, model: FileUploaderModel, config: Config, **kwargs: Any
) -> FileUploader:
) -> BaseFileUploader:
name = "File Uploader"
requester = self._create_component_from_model(
model=model.requester,
Expand All @@ -3606,14 +3612,22 @@ def create_file_uploader(
name=name,
**kwargs,
)
return FileUploader(
emit_connector_builder_messages = self._emit_connector_builder_messages
file_uploader = FileUploader(
requester=requester,
download_target_extractor=download_target_extractor,
config=config,
file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(),
parameters=model.parameters or {},
filename_extractor=model.filename_extractor if model.filename_extractor else None,
)

return (
ConnectorBuilderFileUploader(file_uploader)
if emit_connector_builder_messages
else file_uploader
)

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
) -> MovingWindowCallRatePolicy:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .base_file_uploader import BaseFileUploader
from .base_file_writer import BaseFileWriter
from .connector_builder_file_uploader import ConnectorBuilderFileUploader
from .file_uploader import FileUploader
from .file_writer import FileWriter
from .noop_file_writer import NoopFileWriter

__all__ = [
"FileUploader",
"FileWriter",
"NoopFileWriter",
"ConnectorBuilderFileUploader",
"BaseFileUploader",
"BaseFileWriter",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass

from airbyte_cdk.sources.declarative.types import Record


@dataclass
class BaseFileUploader(ABC):
"""
Base class for file uploader
"""

@abstractmethod
def upload(self, record: Record) -> None:
"""
Uploads the file to the specified location
"""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from pathlib import Path


class BaseFileWriter(ABC):
"""
Base File writer class
"""

@abstractmethod
def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass

from airbyte_cdk.sources.declarative.types import Record

from .base_file_uploader import BaseFileUploader
from .file_uploader import FileUploader


@dataclass
class ConnectorBuilderFileUploader(BaseFileUploader):
"""
Connector builder file uploader
Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
"""

file_uploader: FileUploader

def upload(self, record: Record) -> None:
self.file_uploader.upload(record=record)
for file_reference_key, file_reference_value in record.file_reference.__dict__.items():
if not file_reference_key.startswith("_"):
record.data[file_reference_key] = file_reference_value # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,24 @@
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.files_directory import get_files_directory

from .base_file_uploader import BaseFileUploader
from .base_file_writer import BaseFileWriter

logger = logging.getLogger("airbyte")


@dataclass
class FileUploader:
class FileUploader(BaseFileUploader):
"""
File uploader class
Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
"""

requester: Requester
download_target_extractor: RecordExtractor
config: Config
file_writer: BaseFileWriter
parameters: InitVar[Mapping[str, Any]]

filename_extractor: Optional[Union[InterpolatedString, str]] = None
Expand Down Expand Up @@ -77,9 +87,7 @@ def upload(self, record: Record) -> None:
full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size
file_size_bytes = self.file_writer.write(full_path, content=response.content)

logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from pathlib import Path

from .base_file_writer import BaseFileWriter


class FileWriter(BaseFileWriter):
def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
with open(str(file_path), "wb") as f:
f.write(content)

return file_path.stat().st_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from pathlib import Path

from .file_uploader import BaseFileWriter


class NoopFileWriter(BaseFileWriter):
def write(self, file_path: Path, content: bytes) -> int:
"""
Noop file writer
"""
return 0
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
emit_connector_builder_messages: Optional[bool] = False,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
Expand All @@ -36,6 +37,7 @@ def __init__(
config=config or {},
state=state or [],
source_config=source_config,
emit_connector_builder_messages=emit_connector_builder_messages or False,
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
Expand Down
55 changes: 53 additions & 2 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import Mock, patch

from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
Expand Down Expand Up @@ -34,6 +34,7 @@ def _source(
config: Dict[str, Any],
state: Optional[List[AirbyteStateMessage]] = None,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False,
) -> YamlDeclarativeSource:
if not yaml_file:
yaml_file = "file_stream_manifest.yaml"
Expand All @@ -42,6 +43,7 @@ def _source(
catalog=catalog,
config=config,
state=state,
emit_connector_builder_messages=emit_connector_builder_messages,
)


Expand All @@ -51,11 +53,16 @@ def read(
state_builder: Optional[StateBuilder] = None,
expecting_exception: bool = False,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False,
) -> EntrypointOutput:
config = config_builder.build()
state = state_builder.build() if state_builder else StateBuilder().build()
return entrypoint_read(
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
_source(catalog, config, state, yaml_file, emit_connector_builder_messages),
config,
catalog,
state,
expecting_exception,
)


Expand Down Expand Up @@ -190,6 +197,50 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
)
assert file_reference.file_size_bytes

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
emit_connector_builder_messages=True,
)

assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
assert file_reference.source_file_relative_path
# because we didn't write the file, the size is 0
assert file_reference.file_size_bytes == 0

# Assert file reference fields are copied to record data
record_data = output.records[0].record.data
assert record_data["staging_file_url"] == file_reference.staging_file_url
assert (
record_data["source_file_relative_path"] == file_reference.source_file_relative_path
)
assert record_data["file_size_bytes"] == file_reference.file_size_bytes

def test_discover_article_attachments(self) -> None:
output = discover(self._config())

Expand Down
Loading