Skip to content

Commit b8c9a2c

Browse files
added UploadableRemoteFile
1 parent 1be0b73 commit b8c9a2c

File tree

4 files changed

+82
-145
lines changed

4 files changed

+82
-145
lines changed

airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py

Lines changed: 0 additions & 64 deletions
This file was deleted.

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from enum import Enum
1010
from io import IOBase
1111
from os import makedirs, path
12-
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple, Type
12+
from typing import Any, Iterable, List, MutableMapping, Optional, Set, Tuple
1313

1414
from airbyte_protocol_dataclasses.models import FailureType
1515
from wcmatch.glob import GLOBSTAR, globmatch
@@ -22,11 +22,8 @@
2222
use_file_transfer,
2323
)
2424
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
25-
from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import (
26-
AbstractFileBasedFileTransferReader,
27-
)
2825
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
29-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
26+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile
3027

3128

3229
class FileReadMode(Enum):
@@ -40,16 +37,10 @@ class AbstractFileBasedStreamReader(ABC):
4037
FILE_NAME = "file_name"
4138
LOCAL_FILE_PATH = "local_file_path"
4239
FILE_FOLDER = "file_folder"
40+
FILE_SIZE_LIMIT = 1_500_000_000
4341

4442
def __init__(self) -> None:
4543
self._config = None
46-
if (
47-
self.file_transfer_reader_class is None
48-
and type(self).upload is AbstractFileBasedStreamReader.upload
49-
):
50-
raise NotImplementedError(
51-
"One of file_transfer_reader_class or upload method must be defined to support file transfer."
52-
)
5344

5445
@property
5546
def config(self) -> Optional[AbstractFileBasedSpec]:
@@ -156,12 +147,8 @@ def include_identities_stream(self) -> bool:
156147
return include_identities_stream(self.config)
157148
return False
158149

159-
@property
160-
def file_transfer_reader_class(self) -> Type[AbstractFileBasedFileTransferReader] | None:
161-
return None
162-
163150
def upload(
164-
self, file: RemoteFile, local_directory: str, logger: logging.Logger
151+
self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger
165152
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
166153
"""
167154
This is required for connectors that will support writing to
@@ -179,48 +166,45 @@ def upload(
179166
- file_size_bytes (int): The size of the referenced file in bytes.
180167
- source_file_relative_path (str): The relative path to the referenced file in source.
181168
"""
182-
if self.file_transfer_reader_class is None:
183-
raise NotImplementedError(
184-
"file_transfer_reader_class must be defined to support default file transfer upload method."
185-
)
169+
if not isinstance(file, UploadableRemoteFile):
170+
raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}")
186171

187-
file_transfer = self.file_transfer_reader_class(file)
188-
file_size = file_transfer.file_size
172+
file_size = file.size
189173

190-
if file_size > file_transfer.FILE_SIZE_LIMIT:
191-
message = f"File size exceeds the {file_transfer.FILE_SIZE_LIMIT / 1e9} GB limit."
174+
if file_size > self.FILE_SIZE_LIMIT:
175+
message = f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit."
192176
raise FileSizeLimitError(
193177
message=message, internal_message=message, failure_type=FailureType.config_error
194178
)
195179

196180
file_paths = self._get_file_transfer_paths(
197-
source_file_relative_path=file_transfer.source_file_relative_path,
181+
source_file_relative_path=file.source_file_relative_path,
198182
staging_directory=local_directory,
199183
)
200184
local_file_path = file_paths[self.LOCAL_FILE_PATH]
201185
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
202186
file_name = file_paths[self.FILE_NAME]
203187

204188
logger.info(
205-
f"Starting to download the file {file_transfer.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
189+
f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
206190
)
207191
start_download_time = time.time()
208192

209-
file_transfer.download_to_local_directory(local_file_path)
193+
file.download_to_local_directory(local_file_path)
210194

211195
write_duration = time.time() - start_download_time
212196
logger.info(
213-
f"Finished downloading the file {file_transfer.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
197+
f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
214198
)
215199

216200
file_record_data = FileRecordData(
217201
folder=file_paths[self.FILE_FOLDER],
218202
file_name=file_name,
219203
bytes=file_size,
220-
id=file_transfer.file_id,
204+
id=file.id,
221205
mime_type=file.mime_type,
222-
created_at=file_transfer.file_created_at,
223-
updated_at=file_transfer.file_updated_at,
206+
created_at=file.created_at,
207+
updated_at=file.updated_at,
224208
source_uri=file.uri,
225209
)
226210
file_reference = AirbyteRecordMessageFileReference(

airbyte_cdk/sources/file_based/remote_file.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
from abc import abstractmethod, ABC
55
from datetime import datetime
66
from typing import Optional
77

@@ -16,3 +16,42 @@ class RemoteFile(BaseModel):
1616
uri: str
1717
last_modified: datetime
1818
mime_type: Optional[str] = None
19+
20+
21+
class UploadableRemoteFile(RemoteFile, ABC):
22+
"""
23+
A file in a file-based stream that supports uploading(file transferring).
24+
"""
25+
26+
id: Optional[str] = None
27+
created_at: Optional[datetime] = None
28+
updated_at: Optional[datetime] = None
29+
30+
@property
31+
@abstractmethod
32+
def size(self) -> int:
33+
"""
34+
Returns the file size in bytes.
35+
"""
36+
...
37+
38+
@abstractmethod
39+
def download_to_local_directory(self, local_file_path: str) -> None:
40+
"""
41+
Download the file from remote source to local storage.
42+
"""
43+
...
44+
45+
@property
46+
def source_file_relative_path(self) -> str:
47+
"""
48+
Returns the relative path of the source file.
49+
"""
50+
return self.uri
51+
52+
@property
53+
def file_uri_for_logging(self) -> str:
54+
"""
55+
Returns the URI for the file being logged.
56+
"""
57+
return self.uri

unit_tests/sources/file_based/test_file_based_stream_reader.py

Lines changed: 26 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414

1515
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
1616
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
17-
from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import (
18-
AbstractFileBasedFileTransferReader,
19-
)
2017
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
21-
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
18+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile
2219
from airbyte_cdk.sources.utils.files_directory import get_files_directory
2320
from unit_tests.sources.file_based.helpers import make_remote_files
2421

@@ -69,40 +66,9 @@
6966
}
7067

7168

72-
class TestFileBasedFileTransferReader(AbstractFileBasedFileTransferReader):
73-
@property
74-
def file_id(self) -> str:
75-
return "test_file_id"
76-
77-
@property
78-
def file_created_at(self) -> str:
79-
return "2025-05-05"
80-
81-
@property
82-
def file_updated_at(self) -> str:
83-
return "2025-05-06"
84-
85-
@property
86-
def file_size(self) -> int:
87-
return self.remote_file.size
88-
89-
def download_to_local_directory(self, local_file_path: str) -> None:
90-
pass
91-
92-
@property
93-
def source_file_relative_path(self) -> str:
94-
return "source/path"
95-
96-
@property
97-
def file_uri_for_logging(self) -> str:
98-
return "logging/url"
99-
100-
101-
class TestStreamReaderWithFileTransferClass(AbstractFileBasedStreamReader):
69+
class TestStreamReaderWithDefaultUpload(AbstractFileBasedStreamReader):
10270
__test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name
10371

104-
file_transfer_reader_class = TestFileBasedFileTransferReader
105-
10672
@property
10773
def config(self) -> Optional[AbstractFileBasedSpec]:
10874
return self._config
@@ -529,22 +495,34 @@ def test_preserve_sub_directories_scenarios(
529495

530496

531497
def test_upload_with_file_transfer_reader():
532-
stream_reader = TestStreamReaderWithFileTransferClass()
498+
stream_reader = TestStreamReaderWithDefaultUpload()
499+
500+
class TestUploadableRemoteFile(UploadableRemoteFile):
501+
blob: Any
502+
503+
@property
504+
def size(self) -> int:
505+
return self.blob.size
506+
507+
def download_to_local_directory(self, local_file_path: str) -> None:
508+
pass
509+
510+
blob = MagicMock()
511+
blob.size = 200
512+
uploadable_remote_file = TestUploadableRemoteFile(
513+
uri="test/uri", last_modified=datetime.now(), blob=blob
514+
)
515+
533516
logger = logging.getLogger("airbyte")
534517

535-
remote_file = MagicMock()
536-
remote_file.size = 200
537-
remote_file.uri = "test_url"
538-
remote_file.mime_type = "test_mime_type"
539-
file_record_data, file_reference = stream_reader.upload(remote_file, "test_directory", logger)
518+
file_record_data, file_reference = stream_reader.upload(
519+
uploadable_remote_file, "test_directory", logger
520+
)
540521
assert file_record_data
541522
assert file_reference
542523

543-
remote_file = MagicMock()
544-
remote_file.size = 2_500_000_000
545-
remote_file.uri = "test_url"
546-
remote_file.mime_type = "test_mime_type"
524+
blob.size = 2_500_000_000
547525
with pytest.raises(FileSizeLimitError):
548-
stream_reader.upload(remote_file, "test_directory", logger)
526+
stream_reader.upload(uploadable_remote_file, "test_directory", logger)
549527
with pytest.raises(FileSizeLimitError):
550-
stream_reader.upload(remote_file, "test_directory", logger)
528+
stream_reader.upload(uploadable_remote_file, "test_directory", logger)

0 commit comments

Comments
 (0)