2525 FileUploadCompletionBody ,
2626 FileUploadData ,
2727 UploadedPart ,
28- ClientFileInProgramJob ,
28+ ClientFileToProgramJob ,
29+ UserFile ,
2930)
3031from urllib .parse import urljoin
3132import aiofiles
@@ -116,23 +117,31 @@ async def download_file_async(
116117 ) # aiofiles doesnt seem to have an async variant of this
117118 return dest_file
118119
119- def upload_file (
120+ def upload_file_to_program_job (
120121 self ,
121122 file : Union [str , Path ],
123+ program_key : str ,
124+ program_version : str ,
125+ job_id : str ,
126+ workspace_path : str ,
122127 timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
123128 max_concurrent_uploads : int = _MAX_CONCURRENT_UPLOADS ,
124129 ** kwargs ,
125130 ):
126131 return asyncio .run (
127- self .upload_file_async (
132+ self .upload_file_to_program_job_async (
128133 file = file ,
134+ program_key = program_key ,
135+ program_version = program_version ,
136+ job_id = job_id ,
137+ workspace_path = workspace_path ,
129138 timeout_seconds = timeout_seconds ,
130139 max_concurrent_uploads = max_concurrent_uploads ,
131140 ** kwargs ,
132141 )
133142 )
134143
135- async def upload_file_async (
144+ async def upload_file_to_program_job_async (
136145 self ,
137146 file : Union [str , Path ],
138147 program_key : str ,
@@ -155,13 +164,8 @@ async def upload_file_async(
155164 # if a file has the same sha256 checksum
156165 # and name they are considered equal
157166 return file_result
158- # ClientFileInProgramJob(
159- # filename=file.name,
160- # filesize=file.stat().st_size,
161- # sha256_checksum=checksum,
162- # )
163- client_file = ClientFile (
164- ClientFileInProgramJob (
167+ user_file = UserFile (
168+ ClientFileToProgramJob (
165169 filename = file .name ,
166170 filesize = file .stat ().st_size ,
167171 sha256_checksum = checksum ,
@@ -171,23 +175,80 @@ async def upload_file_async(
171175 workspace_path = workspace_path ,
172176 )
173177 )
178+ return await self ._upload_user_file (
179+ user_file , file , timeout_seconds , max_concurrent_uploads , ** kwargs
180+ )
181+
182+ def upload_file (
183+ self ,
184+ file : Union [str , Path ],
185+ timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
186+ max_concurrent_uploads : int = _MAX_CONCURRENT_UPLOADS ,
187+ ** kwargs ,
188+ ) -> File :
189+ return asyncio .run (
190+ self .upload_file_async (
191+ file = file ,
192+ timeout_seconds = timeout_seconds ,
193+ max_concurrent_uploads = max_concurrent_uploads ,
194+ ** kwargs ,
195+ )
196+ )
197+
198+ async def upload_file_async (
199+ self ,
200+ file : Union [str , Path ],
201+ timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
202+ max_concurrent_uploads : int = _MAX_CONCURRENT_UPLOADS ,
203+ ** kwargs ,
204+ ) -> File :
205+ if isinstance (file , str ):
206+ file = Path (file )
207+ if not file .is_file ():
208+ raise RuntimeError (f"{ file } is not a file" )
209+ checksum : str = await compute_sha256 (file )
210+ for file_result in self ._search_files (
211+ sha256_checksum = checksum , timeout_seconds = timeout_seconds
212+ ):
213+ if file_result .filename == file .name :
214+ # if a file has the same sha256 checksum
215+ # and name they are considered equal
216+ return file_result
217+ user_file = UserFile (
218+ ClientFile (
219+ filename = file .name ,
220+ filesize = file .stat ().st_size ,
221+ sha256_checksum = checksum ,
222+ )
223+ )
224+ return await self ._upload_user_file (
225+ user_file , file , timeout_seconds , max_concurrent_uploads , ** kwargs
226+ )
227+
228+ async def _upload_user_file (
229+ self ,
230+ user_file : UserFile ,
231+ file : Path ,
232+ timeout_seconds : int = DEFAULT_TIMEOUT_SECONDS ,
233+ max_concurrent_uploads : int = _MAX_CONCURRENT_UPLOADS ,
234+ ** kwargs ,
235+ ) -> File :
236+ assert file .is_file () # nosec
174237 client_upload_schema : ClientFileUploadData = super ().get_upload_links (
175- client_file = client_file , _request_timeout = timeout_seconds , ** kwargs
238+ user_file = user_file , _request_timeout = timeout_seconds , ** kwargs
176239 )
177240 chunk_size : int = client_upload_schema .upload_schema .chunk_size
178241 links : FileUploadData = client_upload_schema .upload_schema .links
179242 url_iter : Iterator [Tuple [int , str ]] = enumerate (
180243 iter (client_upload_schema .upload_schema .urls ), start = 1
181244 )
182245 n_urls : int = len (client_upload_schema .upload_schema .urls )
183- if n_urls < math .ceil (file . stat (). st_size / chunk_size ):
246+ if n_urls < math .ceil (user_file . actual_instance . filesize / chunk_size ):
184247 raise RuntimeError (
185248 "Did not receive sufficient number of upload URLs from the server."
186249 )
187250
188- abort_body = BodyAbortMultipartUploadV0FilesFileIdAbortPost (
189- client_file = client_file
190- )
251+ abort_body = BodyAbortMultipartUploadV0FilesFileIdAbortPost (user_file = user_file )
191252 upload_tasks : Set [asyncio .Task ] = set ()
192253 uploaded_parts : List [UploadedPart ] = []
193254
@@ -240,7 +301,7 @@ async def upload_file_async(
240301 server_file : File = await self ._complete_multipart_upload (
241302 api_server_session ,
242303 links .complete_upload , # type: ignore
243- client_file ,
304+ user_file ,
244305 uploaded_parts ,
245306 )
246307 _logger .debug ("File upload complete: %s" , file .name )
@@ -250,11 +311,11 @@ async def _complete_multipart_upload(
250311 self ,
251312 http_client : AsyncHttpClient ,
252313 complete_link : str ,
253- client_file : ClientFile ,
314+ user_file : UserFile ,
254315 uploaded_parts : List [UploadedPart ],
255316 ) -> File :
256317 complete_payload = BodyCompleteMultipartUploadV0FilesFileIdCompletePost (
257- client_file = client_file ,
318+ user_file = user_file ,
258319 uploaded_parts = FileUploadCompletionBody (parts = uploaded_parts ),
259320 )
260321 response : Response = await http_client .post (
0 commit comments