33#
44
55import logging
6+ import time
67from abc import ABC , abstractmethod
78from datetime import datetime
89from enum import Enum
910from io import IOBase
1011from os import makedirs , path
11- from typing import Any , Callable , Iterable , List , MutableMapping , Optional , Set , Tuple
12+ from typing import Any , Iterable , List , MutableMapping , Optional , Set , Tuple
1213
14+ from airbyte_protocol_dataclasses .models import FailureType
1315from wcmatch .glob import GLOBSTAR , globmatch
1416
1517from airbyte_cdk .models import AirbyteRecordMessageFileReference
1921 preserve_directory_structure ,
2022 use_file_transfer ,
2123)
24+ from airbyte_cdk .sources .file_based .exceptions import FileSizeLimitError
2225from airbyte_cdk .sources .file_based .file_record_data import FileRecordData
23- from airbyte_cdk .sources .file_based .remote_file import RemoteFile
26+ from airbyte_cdk .sources .file_based .remote_file import RemoteFile , UploadableRemoteFile
2427
2528
2629class FileReadMode (Enum ):
@@ -34,6 +37,7 @@ class AbstractFileBasedStreamReader(ABC):
3437 FILE_NAME = "file_name"
3538 LOCAL_FILE_PATH = "local_file_path"
3639 FILE_FOLDER = "file_folder"
40+ FILE_SIZE_LIMIT = 1_500_000_000
3741
3842 def __init__ (self ) -> None :
3943 self ._config = None
@@ -113,16 +117,6 @@ def filter_files_by_globs_and_start_date(
113117 seen .add (file .uri )
114118 yield file
115119
116- @abstractmethod
117- def file_size (self , file : RemoteFile ) -> int :
118- """Utility method to get size of the remote file.
119-
120- This is required for connectors that will support writing to
121- files. If the connector does not support writing files, then the
122- subclass can simply `return 0`.
123- """
124- ...
125-
126120 @staticmethod
127121 def file_matches_globs (file : RemoteFile , globs : List [str ]) -> bool :
128122 # Use the GLOBSTAR flag to enable recursive ** matching
@@ -153,9 +147,8 @@ def include_identities_stream(self) -> bool:
153147 return include_identities_stream (self .config )
154148 return False
155149
156- @abstractmethod
157150 def upload (
158- self , file : RemoteFile , local_directory : str , logger : logging .Logger
151+ self , file : UploadableRemoteFile , local_directory : str , logger : logging .Logger
159152 ) -> Tuple [FileRecordData , AirbyteRecordMessageFileReference ]:
160153 """
161154 This is required for connectors that will support writing to
@@ -173,7 +166,53 @@ def upload(
173166 - file_size_bytes (int): The size of the referenced file in bytes.
174167 - source_file_relative_path (str): The relative path to the referenced file in source.
175168 """
176- ...
169+ if not isinstance (file , UploadableRemoteFile ):
170+ raise TypeError (f"Expected UploadableRemoteFile, got { type (file )} " )
171+
172+ file_size = file .size
173+
174+ if file_size > self .FILE_SIZE_LIMIT :
175+ message = f"File size exceeds the { self .FILE_SIZE_LIMIT / 1e9 } GB limit."
176+ raise FileSizeLimitError (
177+ message = message , internal_message = message , failure_type = FailureType .config_error
178+ )
179+
180+ file_paths = self ._get_file_transfer_paths (
181+ source_file_relative_path = file .source_file_relative_path ,
182+ staging_directory = local_directory ,
183+ )
184+ local_file_path = file_paths [self .LOCAL_FILE_PATH ]
185+ file_relative_path = file_paths [self .FILE_RELATIVE_PATH ]
186+ file_name = file_paths [self .FILE_NAME ]
187+
188+ logger .info (
189+ f"Starting to download the file { file .file_uri_for_logging } with size: { file_size / (1024 * 1024 ):,.2f} MB ({ file_size / (1024 * 1024 * 1024 ):.2f} GB)"
190+ )
191+ start_download_time = time .time ()
192+
193+ file .download_to_local_directory (local_file_path )
194+
195+ write_duration = time .time () - start_download_time
196+ logger .info (
197+ f"Finished downloading the file { file .file_uri_for_logging } and saved to { local_file_path } in { write_duration :,.2f} seconds."
198+ )
199+
200+ file_record_data = FileRecordData (
201+ folder = file_paths [self .FILE_FOLDER ],
202+ file_name = file_name ,
203+ bytes = file_size ,
204+ id = file .id ,
205+ mime_type = file .mime_type ,
206+ created_at = file .created_at ,
207+ updated_at = file .updated_at ,
208+ source_uri = file .uri ,
209+ )
210+ file_reference = AirbyteRecordMessageFileReference (
211+ staging_file_url = local_file_path ,
212+ source_file_relative_path = file_relative_path ,
213+ file_size_bytes = file_size ,
214+ )
215+ return file_record_data , file_reference
177216
178217 def _get_file_transfer_paths (
179218 self , source_file_relative_path : str , staging_directory : str
0 commit comments