@@ -336,6 +336,8 @@ string S3FileSystem::InitializeMultipartUpload(S3FileHandle &file_handle) {
336336
337337 open_tag_pos += 10 ; // Skip open tag
338338
339+ file_handle.initialized_multipart_upload = true ;
340+
339341 return result.substr (open_tag_pos, close_tag_pos - open_tag_pos);
340342}
341343
@@ -353,10 +355,21 @@ void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) {
353355}
354356
355357void S3FileSystem::UploadBuffer (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
356- auto &s3fs = (S3FileSystem &)file_handle.file_system ;
357-
358358 string query_param = " partNumber=" + to_string (write_buffer->part_no + 1 ) + " &" +
359359 " uploadId=" + S3FileSystem::UrlEncode (file_handle.multipart_upload_id , true );
360+
361+ UploadBufferImplementation (file_handle, write_buffer, query_param, false );
362+
363+ NotifyUploadsInProgress (file_handle);
364+ }
365+
366+ void S3FileSystem::UploadSingleBuffer (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
367+ UploadBufferImplementation (file_handle, write_buffer, " " , true );
368+ }
369+
370+ void S3FileSystem::UploadBufferImplementation (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer, string query_param, bool single_upload) {
371+ auto &s3fs = (S3FileSystem &)file_handle.file_system ;
372+
360373 unique_ptr<HTTPResponse> res;
361374 string etag;
362375
@@ -374,6 +387,9 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
374387 }
375388 etag = res->headers .GetHeaderValue (" ETag" );
376389 } catch (std::exception &ex) {
390+ if (single_upload) {
391+ throw ;
392+ }
377393 ErrorData error (ex);
378394 if (error.Type () != ExceptionType::IO && error.Type () != ExceptionType::HTTP) {
379395 throw ;
@@ -385,6 +401,7 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
385401 file_handle.upload_exception = std::current_exception ();
386402 }
387403
404+ D_ASSERT (!single_upload); // If we are here we are in the multi-buffer situation
388405 NotifyUploadsInProgress (file_handle);
389406 return ;
390407 }
@@ -399,8 +416,6 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
399416
400417 // Free up space for another thread to acquire an S3WriteBuffer
401418 write_buffer.reset ();
402-
403- NotifyUploadsInProgress (file_handle);
404419}
405420
406421void S3FileSystem::FlushBuffer (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
@@ -437,6 +452,9 @@ void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuff
437452#endif
438453 file_handle.uploads_in_progress ++;
439454 }
455+ if (file_handle.initialized_multipart_upload == false ) {
456+ file_handle.multipart_upload_id = InitializeMultipartUpload (file_handle);
457+ }
440458
441459#ifdef SAME_THREAD_UPLOAD
442460 UploadBuffer (file_handle, write_buffer);
@@ -459,6 +477,16 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
459477 }
460478 file_handle.write_buffers_lock .unlock ();
461479
480+ if (file_handle.initialized_multipart_upload == false ) {
481+ // TODO (carlo): unclear how to handle kms_key_id, but given currently they are custom, leave the multiupload codepath in that case
482+ if (to_flush.size () == 1 && file_handle.auth_params .kms_key_id .empty ()) {
483+ UploadSingleBuffer (file_handle, to_flush[0 ]);
484+ file_handle.upload_finalized = true ;
485+ return ;
486+ } else {
487+ file_handle.multipart_upload_id = InitializeMultipartUpload (file_handle);
488+ }
489+ }
462490 // Flush all buffers that aren't already uploading
463491 for (auto &write_buffer : to_flush) {
464492 if (!write_buffer->uploading ) {
@@ -475,6 +503,10 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
475503
476504void S3FileSystem::FinalizeMultipartUpload (S3FileHandle &file_handle) {
477505 auto &s3fs = (S3FileSystem &)file_handle.file_system ;
506+ if (file_handle.upload_finalized ) {
507+ return ;
508+ }
509+
478510 file_handle.upload_finalized = true ;
479511
480512 std::stringstream ss;
@@ -889,8 +921,6 @@ void S3FileHandle::Initialize(optional_ptr<FileOpener> opener) {
889921 part_size = ((minimum_part_size + Storage::DEFAULT_BLOCK_SIZE - 1 ) / Storage::DEFAULT_BLOCK_SIZE) *
890922 Storage::DEFAULT_BLOCK_SIZE;
891923 D_ASSERT (part_size * max_part_count >= config_params.max_file_size );
892-
893- multipart_upload_id = s3fs.InitializeMultipartUpload (*this );
894924 }
895925}
896926
0 commit comments