Skip to content

Commit 7f92efa

Browse files
fix: Restore AirbyteRecordMessageFileReference import for file-based streams
- Add missing import in record_helper.py, file_transfer.py, and test files - Restore proper type annotations for file reference parameters - Uncomment file reference creation logic in default_file_uploader.py - Fix FileStreamTest.test_get_article_attachments failure - Maintains CI detection logic for manifest validation tests Co-Authored-By: AJ Steers <[email protected]>
1 parent a87f360 commit 7f92efa

File tree

5 files changed

+23
-10
lines changed

5 files changed

+23
-10
lines changed

airbyte_cdk/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AirbyteMessage,
2020
AirbyteProtocol,
2121
AirbyteRecordMessage,
22+
AirbyteRecordMessageFileReference,
2223
AirbyteStateBlob,
2324
AirbyteStateMessage,
2425
AirbyteStateStats,

airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from pathlib import Path
1010
from typing import Any, Mapping, Optional, Union
1111

12+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
1213
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
1314
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
1415
InterpolatedString,
@@ -89,8 +90,8 @@ def upload(self, record: Record) -> None:
8990
logger.info(f"File size: {file_size_bytes / 1024} KB")
9091
logger.info(f"File relative path: {str(file_relative_path)}")
9192

92-
# record.file_reference = AirbyteRecordMessageFileReference(
93-
# staging_file_url=str(full_path),
94-
# source_file_relative_path=str(file_relative_path),
95-
# file_size_bytes=file_size_bytes,
96-
# )
93+
record.file_reference = AirbyteRecordMessageFileReference(
94+
staging_file_url=str(full_path),
95+
source_file_relative_path=str(file_relative_path),
96+
file_size_bytes=file_size_bytes,
97+
)

airbyte_cdk/sources/file_based/file_types/file_transfer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
44
import logging
5-
from typing import Any, Iterable, Optional, Tuple
5+
from typing import Iterable, Tuple
66

7+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
78
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
89
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
910
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
@@ -19,7 +20,7 @@ def upload(
1920
file: RemoteFile,
2021
stream_reader: AbstractFileBasedStreamReader,
2122
logger: logging.Logger,
22-
) -> Iterable[Tuple[FileRecordData, Optional[Any]]]:
23+
) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]:
2324
try:
2425
yield stream_reader.upload(
2526
file=file, local_directory=self._local_directory, logger=logger

airbyte_cdk/sources/utils/record_helper.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
AirbyteLogMessage,
1010
AirbyteMessage,
1111
AirbyteRecordMessage,
12+
AirbyteRecordMessageFileReference,
1213
AirbyteTraceMessage,
1314
)
1415
from airbyte_cdk.models import Type as MessageType
@@ -21,7 +22,7 @@ def stream_data_to_airbyte_message(
2122
data_or_message: StreamData,
2223
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
2324
schema: Optional[Mapping[str, Any]] = None,
24-
file_reference: Optional[Any] = None,
25+
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
2526
) -> AirbyteMessage:
2627
if schema is None:
2728
schema = {}

unit_tests/sources/file_based/stream/test_default_file_based_stream.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from airbyte_cdk.models import (
1616
AirbyteLogMessage,
1717
AirbyteMessage,
18+
AirbyteRecordMessageFileReference,
1819
AirbyteStream,
1920
Level,
2021
)
@@ -288,7 +289,11 @@ class DefaultFileBasedStreamFileTransferTest(unittest.TestCase):
288289
bytes=10,
289290
source_uri="file:///absolute/path/file.csv",
290291
)
291-
_A_FILE_REFERENCE_MESSAGE = None # AirbyteRecordMessageFileReference removed
292+
_A_FILE_REFERENCE_MESSAGE = AirbyteRecordMessageFileReference(
293+
file_size_bytes=10,
294+
source_file_relative_path="relative/path/file.csv",
295+
staging_file_url="/absolute/path/file.csv",
296+
)
292297

293298
def setUp(self) -> None:
294299
self._stream_config = Mock()
@@ -470,7 +475,11 @@ def test_when_compute_slices_with_duplicates(self) -> None:
470475

471476
class DefaultFileBasedStreamSchemaTest(unittest.TestCase):
472477
_NOW = datetime(2022, 10, 22, tzinfo=timezone.utc)
473-
_A_FILE_REFERENCE_MESSAGE = None # AirbyteRecordMessageFileReference removed
478+
_A_FILE_REFERENCE_MESSAGE = AirbyteRecordMessageFileReference(
479+
file_size_bytes=10,
480+
source_file_relative_path="relative/path/file.csv",
481+
staging_file_url="/absolute/path/file.csv",
482+
)
474483

475484
def setUp(self) -> None:
476485
self._stream_config = Mock(spec=FileBasedStreamConfig)

0 commit comments

Comments
 (0)