Skip to content

Commit ac278b9

Browse files
committed
files-mode-api: initial changes to emit record
1 parent de09ae8 commit ac278b9

File tree

11 files changed

+220
-236
lines changed

11 files changed

+220
-236
lines changed

airbyte_cdk/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AirbyteMessage,
2020
AirbyteProtocol,
2121
AirbyteRecordMessage,
22+
AirbyteRecordMessageFileReference,
2223
AirbyteStateBlob,
2324
AirbyteStateMessage,
2425
AirbyteStateStats,

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
150150
stream_name=record.stream_name,
151151
data_or_message=record.data,
152152
is_file_transfer_message=record.is_file_transfer_message,
153+
file_reference=record.file_reference,
153154
)
154155
stream = self._stream_name_to_instance[record.stream_name]
155156

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
77
SafeResponse,
88
)
9+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
910
from airbyte_cdk.sources.declarative.requesters import Requester
1011
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
12+
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1113

1214

1315
class FileUploader:
@@ -21,10 +23,10 @@ def __init__(
2123
self._download_target_extractor = download_target_extractor
2224
self._content_extractor = content_extractor
2325

24-
def upload(self, record: Record) -> None:
26+
def upload(self, record: Record) -> AirbyteRecordMessageFileReference:
2527
# TODO validate record shape - is the transformation applied at this point?
2628
mocked_response = SafeResponse()
27-
mocked_response.content = json.dumps(record.data)
29+
mocked_response.content = json.dumps(record.data).encode("utf-8")
2830
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
2931
if not isinstance(download_target, str):
3032
raise ValueError(
@@ -40,5 +42,20 @@ def upload(self, record: Record) -> None:
4042
if self._content_extractor:
4143
raise NotImplementedError("TODO")
4244
else:
43-
with open(str(Path(__file__).parent / record.data["file_name"]), "ab") as f:
45+
files_directory = Path(get_files_directory())
46+
# TODO:: we could either interpolate record data if some relative_path is provided or
47+
# use partition_field value in the slice {"partition_field": some_value_id} to create a path
48+
file_relative_path = Path(record.stream_name) / record.data["file_name"]
49+
50+
full_path = files_directory / file_relative_path
51+
full_path.parent.mkdir(parents=True, exist_ok=True)
52+
53+
with open(str(full_path), "wb") as f:
4454
f.write(response.content)
55+
file_size_bytes = full_path.stat().st_size
56+
57+
return AirbyteRecordMessageFileReference(
58+
file_url=download_target,
59+
file_relative_path=str(file_relative_path),
60+
file_size_bytes=file_size_bytes,
61+
)

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ def read(self) -> Iterable[Record]:
7474
)
7575
)
7676
if self._file_uploader:
77-
self._file_uploader.upload(record)
77+
file_reference = self._file_uploader.upload(record)
78+
record.file_reference = file_reference
7879
yield record
7980
else:
8081
self._message_repository.emit_message(stream_data)

airbyte_cdk/sources/file_based/file_types/file_transfer.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,12 @@
88
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
99
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
1010
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
11-
12-
AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
13-
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"
11+
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1412

1513

1614
class FileTransfer:
1715
def __init__(self) -> None:
18-
self._local_directory = (
19-
AIRBYTE_STAGING_DIRECTORY
20-
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
21-
else DEFAULT_LOCAL_DIRECTORY
22-
)
16+
self._local_directory = get_files_directory()
2317

2418
def get_file(
2519
self,

airbyte_cdk/sources/types.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView
88

9+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
910
from airbyte_cdk.utils.slice_hasher import SliceHasher
1011

1112
# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
@@ -24,11 +25,13 @@ def __init__(
2425
stream_name: str,
2526
associated_slice: Optional[StreamSlice] = None,
2627
is_file_transfer_message: bool = False,
28+
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
2729
):
2830
self._data = data
2931
self._associated_slice = associated_slice
3032
self.stream_name = stream_name
3133
self.is_file_transfer_message = is_file_transfer_message
34+
self._file_reference = file_reference
3235

3336
@property
3437
def data(self) -> Mapping[str, Any]:
@@ -38,6 +41,14 @@ def data(self) -> Mapping[str, Any]:
3841
def associated_slice(self) -> Optional[StreamSlice]:
3942
return self._associated_slice
4043

44+
@property
45+
def file_reference(self) -> AirbyteRecordMessageFileReference:
46+
return self._file_reference
47+
48+
@file_reference.setter
49+
def file_reference(self, value: AirbyteRecordMessageFileReference):
50+
self._file_reference = value
51+
4152
def __repr__(self) -> str:
4253
return repr(self._data)
4354

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
import os
5+
6+
AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
7+
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"
8+
9+
10+
def get_files_directory() -> str:
11+
return (
12+
AIRBYTE_STAGING_DIRECTORY
13+
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
14+
else DEFAULT_LOCAL_DIRECTORY
15+
)

airbyte_cdk/sources/utils/record_helper.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
AirbyteMessage,
1111
AirbyteRecordMessage,
1212
AirbyteTraceMessage,
13+
AirbyteRecordMessageFileReference,
1314
)
1415
from airbyte_cdk.models import Type as MessageType
1516
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
@@ -23,6 +24,7 @@ def stream_data_to_airbyte_message(
2324
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
2425
schema: Optional[Mapping[str, Any]] = None,
2526
is_file_transfer_message: bool = False,
27+
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
2628
) -> AirbyteMessage:
2729
if schema is None:
2830
schema = {}
@@ -41,7 +43,12 @@ def stream_data_to_airbyte_message(
4143
stream=stream_name, file=data, emitted_at=now_millis, data={}
4244
)
4345
else:
44-
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
46+
message = AirbyteRecordMessage(
47+
stream=stream_name,
48+
data=data,
49+
emitted_at=now_millis,
50+
file_reference=file_reference,
51+
)
4552
return AirbyteMessage(type=MessageType.RECORD, record=message)
4653
case AirbyteTraceMessage():
4754
return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)

0 commit comments

Comments
 (0)