Skip to content
1 change: 1 addition & 0 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AirbyteMessage,
AirbyteProtocol,
AirbyteRecordMessage,
AirbyteRecordMessageFileReference,
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
stream_name=record.stream_name,
data_or_message=record.data,
is_file_transfer_message=record.is_file_transfer_message,
file_reference=record.file_reference,
)
stream = self._stream_name_to_instance[record.stream_name]

Expand Down
23 changes: 20 additions & 3 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SafeResponse,
)
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.requesters import Requester
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.utils.files_directory import get_files_directory


class FileUploader:
Expand All @@ -21,10 +23,10 @@ def __init__(
self._download_target_extractor = download_target_extractor
self._content_extractor = content_extractor

def upload(self, record: Record) -> None:
def upload(self, record: Record) -> AirbyteRecordMessageFileReference:
# TODO validate record shape - is the transformation applied at this point?
mocked_response = SafeResponse()
mocked_response.content = json.dumps(record.data)
mocked_response.content = json.dumps(record.data).encode("utf-8")
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
if not isinstance(download_target, str):
raise ValueError(
Expand All @@ -40,5 +42,20 @@ def upload(self, record: Record) -> None:
if self._content_extractor:
raise NotImplementedError("TODO")
else:
with open(str(Path(__file__).parent / record.data["file_name"]), "ab") as f:
files_directory = Path(get_files_directory())
# TODO:: we could either interpolate record data if some relative_path is provided or
# use partition_field value in the slice {"partition_field": some_value_id} to create a path
file_relative_path = Path(record.stream_name) / record.data["file_name"]

full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size

return AirbyteRecordMessageFileReference(
file_url=download_target,
file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def read(self) -> Iterable[Record]:
)
)
if self._file_uploader:
self._file_uploader.upload(record)
file_reference = self._file_uploader.upload(record)
record.file_reference = file_reference
yield record
else:
self._message_repository.emit_message(stream_data)
Expand Down
10 changes: 2 additions & 8 deletions airbyte_cdk/sources/file_based/file_types/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,12 @@
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.remote_file import RemoteFile

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"
from airbyte_cdk.sources.utils.files_directory import get_files_directory


class FileTransfer:
def __init__(self) -> None:
self._local_directory = (
AIRBYTE_STAGING_DIRECTORY
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
else DEFAULT_LOCAL_DIRECTORY
)
self._local_directory = get_files_directory()

def get_file(
self,
Expand Down
11 changes: 11 additions & 0 deletions airbyte_cdk/sources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView

from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.utils.slice_hasher import SliceHasher

# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
Expand All @@ -24,11 +25,13 @@ def __init__(
stream_name: str,
associated_slice: Optional[StreamSlice] = None,
is_file_transfer_message: bool = False,
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
):
self._data = data
self._associated_slice = associated_slice
self.stream_name = stream_name
self.is_file_transfer_message = is_file_transfer_message
self._file_reference = file_reference

@property
def data(self) -> Mapping[str, Any]:
Expand All @@ -38,6 +41,14 @@ def data(self) -> Mapping[str, Any]:
def associated_slice(self) -> Optional[StreamSlice]:
return self._associated_slice

@property
def file_reference(self) -> AirbyteRecordMessageFileReference:
return self._file_reference

@file_reference.setter
def file_reference(self, value: AirbyteRecordMessageFileReference):
self._file_reference = value

def __repr__(self) -> str:
return repr(self._data)

Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/utils/files_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import os

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"


def get_files_directory() -> str:
return (
AIRBYTE_STAGING_DIRECTORY
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
else DEFAULT_LOCAL_DIRECTORY
)
9 changes: 8 additions & 1 deletion airbyte_cdk/sources/utils/record_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
AirbyteMessage,
AirbyteRecordMessage,
AirbyteTraceMessage,
AirbyteRecordMessageFileReference,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
Expand All @@ -23,6 +24,7 @@ def stream_data_to_airbyte_message(
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
schema: Optional[Mapping[str, Any]] = None,
is_file_transfer_message: bool = False,
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
) -> AirbyteMessage:
if schema is None:
schema = {}
Expand All @@ -41,7 +43,12 @@ def stream_data_to_airbyte_message(
stream=stream_name, file=data, emitted_at=now_millis, data={}
)
else:
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
message = AirbyteRecordMessage(
stream=stream_name,
data=data,
emitted_at=now_millis,
file_reference=file_reference,
)
return AirbyteMessage(type=MessageType.RECORD, record=message)
case AirbyteTraceMessage():
return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)
Expand Down
Loading
Loading