Skip to content

Commit d5941c3

Browse files
add AbstractFileBasedFileTransferReader
1 parent 25132b6 commit d5941c3

File tree

2 files changed

+130
-2
lines changed

2 files changed

+130
-2
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from abc import ABC, abstractmethod
2+
3+
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
4+
5+
6+
class AbstractFileBasedFileTransferReader(ABC):
7+
FILE_SIZE_LIMIT = 1_500_000_000
8+
9+
def __init__(self, remote_file: RemoteFile) -> None:
10+
self.remote_file = remote_file
11+
12+
@property
13+
@abstractmethod
14+
def file_id(self) -> str:
15+
"""
16+
Unique identifier for the file being transferred.
17+
"""
18+
...
19+
20+
@property
21+
@abstractmethod
22+
def file_created_at(self) -> str:
23+
"""
24+
Date time when the file was created.
25+
"""
26+
...
27+
28+
@property
29+
@abstractmethod
30+
def file_updated_at(self) -> str:
31+
"""
32+
Date time when the file was last updated.
33+
"""
34+
...
35+
36+
@property
37+
@abstractmethod
38+
def file_size(self) -> int:
39+
"""
40+
Returns the file size in bytes.
41+
"""
42+
...
43+
44+
@abstractmethod
45+
def download_to_local_directory(self, local_file_path: str) -> None:
46+
"""
47+
Download the file from remote source to local storage.
48+
"""
49+
...
50+
51+
@property
52+
@abstractmethod
53+
def source_file_relative_path(self) -> str:
54+
"""
55+
Returns the relative path of the source file.
56+
"""
57+
...
58+
59+
@property
60+
def file_uri_for_logging(self):
61+
"""
62+
Returns the URI for the file being logged.
63+
"""
64+
return self.remote_file.uri

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44

55
import logging
6+
import time
67
from abc import ABC, abstractmethod
78
from datetime import datetime
89
from enum import Enum
@@ -19,8 +20,13 @@
1920
preserve_directory_structure,
2021
use_file_transfer,
2122
)
23+
from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import (
24+
AbstractFileBasedFileTransferReader,
25+
)
2226
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
2327
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
28+
from airbyte_protocol_dataclasses.models import FailureType
29+
from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError
2430

2531

2632
class FileReadMode(Enum):
@@ -38,6 +44,14 @@ class AbstractFileBasedStreamReader(ABC):
3844
def __init__(self) -> None:
3945
self._config = None
4046

47+
if (
48+
self.file_transfer_reader_class is None
49+
and self.upload.__func__ == AbstractFileBasedStreamReader.upload
50+
):
51+
raise NotImplementedError(
52+
"One of file_transfer_reader_class or upload method must be defined to support file transfer."
53+
)
54+
4155
@property
4256
def config(self) -> Optional[AbstractFileBasedSpec]:
4357
return self._config
@@ -153,7 +167,10 @@ def include_identities_stream(self) -> bool:
153167
return include_identities_stream(self.config)
154168
return False
155169

156-
@abstractmethod
170+
@property
171+
def file_transfer_reader_class(self) -> AbstractFileBasedFileTransferReader | None:
172+
return None
173+
157174
def upload(
158175
self, file: RemoteFile, local_directory: str, logger: logging.Logger
159176
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
@@ -173,7 +190,54 @@ def upload(
173190
- file_size_bytes (int): The size of the referenced file in bytes.
174191
- source_file_relative_path (str): The relative path to the referenced file in source.
175192
"""
176-
...
193+
# if self.file_transfer_reader_class is None and self.upload.__func__ == AbstractFileBasedStreamReader.upload:
194+
# raise NotImplementedError("One of file_transfer_reader_class or upload method must be defined to support file transfer.")
195+
196+
file_transfer = self.file_transfer_reader_class(file)
197+
file_size = file_transfer.file_size
198+
199+
if file_size > file_transfer.FILE_SIZE_LIMIT:
200+
message = "File size exceeds the 1 GB limit."
201+
raise FileSizeLimitError(
202+
message=message, internal_message=message, failure_type=FailureType.config_error
203+
)
204+
205+
file_paths = self._get_file_transfer_paths(
206+
source_file_relative_path=file_transfer.source_file_relative_path,
207+
staging_directory=local_directory,
208+
)
209+
local_file_path = file_paths[self.LOCAL_FILE_PATH]
210+
file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
211+
file_name = file_paths[self.FILE_NAME]
212+
213+
logger.info(
214+
f"Starting to download the file {file_transfer.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
215+
)
216+
start_download_time = time.time()
217+
218+
file_transfer.download_to_local_directory(local_file_path)
219+
220+
write_duration = time.time() - start_download_time
221+
logger.info(
222+
f"Finished downloading the file {file_transfer.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
223+
)
224+
225+
file_record_data = FileRecordData(
226+
folder=file_paths[self.FILE_FOLDER],
227+
file_name=file_name,
228+
bytes=file_size,
229+
id=file_transfer.file_id,
230+
mime_type=file.mime_type,
231+
created_at=file_transfer.file_created_at,
232+
updated_at=file_transfer.file_updated_at,
233+
source_uri=file.uri,
234+
)
235+
file_reference = AirbyteRecordMessageFileReference(
236+
staging_file_url=local_file_path,
237+
source_file_relative_path=file_relative_path,
238+
file_size_bytes=file_size,
239+
)
240+
return file_record_data, file_reference
177241

178242
def _get_file_transfer_paths(
179243
self, source_file_relative_path: str, staging_directory: str

0 commit comments

Comments
 (0)