@@ -56,6 +56,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
5656 airbyte_columns = [ab_last_mod_col , ab_file_name_col ]
5757 use_file_transfer = False
5858 preserve_directory_structure = True
59+ _file_transfer = FileTransfer ()
5960
6061 def __init__ (self , ** kwargs : Any ):
6162 if self .FILE_TRANSFER_KW in kwargs :
@@ -93,14 +94,6 @@ def primary_key(self) -> PrimaryKeyType:
9394 self .config
9495 )
9596
96- def _filter_schema_invalid_properties (
97- self , configured_catalog_json_schema : Dict [str , Any ]
98- ) -> Dict [str , Any ]:
99- if self .use_file_transfer :
100- return file_transfer_schema
101- else :
102- return super ()._filter_schema_invalid_properties (configured_catalog_json_schema )
103-
10497 def _duplicated_files_names (
10598 self , slices : List [dict [str , List [RemoteFile ]]]
10699 ) -> List [dict [str , List [str ]]]:
@@ -151,15 +144,14 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
151144 raise MissingSchemaError (FileBasedSourceError .MISSING_SCHEMA , stream = self .name )
152145 # The stream only supports a single file type, so we can use the same parser for all files
153146 parser = self .get_parser ()
154- file_transfer = FileTransfer ()
155147 for file in stream_slice ["files" ]:
156148 # only serialize the datetime once
157149 file_datetime_string = file .last_modified .strftime (self .DATE_TIME_FORMAT )
158150 n_skipped = line_no = 0
159151
160152 try :
161153 if self .use_file_transfer :
162- for file_record_data , file_reference in file_transfer .upload (
154+ for file_record_data , file_reference in self . _file_transfer .upload (
163155 file = file , stream_reader = self .stream_reader , logger = self .logger
164156 ):
165157 yield stream_data_to_airbyte_message (
0 commit comments