@@ -355,10 +355,21 @@ void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) {
355355}
356356
357357void S3FileSystem::UploadBuffer (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
358- auto &s3fs = (S3FileSystem &)file_handle.file_system ;
359-
360358 string query_param = " partNumber=" + to_string (write_buffer->part_no + 1 ) + " &" +
361359 " uploadId=" + S3FileSystem::UrlEncode (file_handle.multipart_upload_id , true );
360+
361+ UploadBufferImplementation (file_handle, write_buffer, query_param, false );
362+
363+ NotifyUploadsInProgress (file_handle);
364+ }
365+
366+ void S3FileSystem::UploadSingleBuffer (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
367+ UploadBufferImplementation (file_handle, write_buffer, " " , true );
368+ }
369+
370+ void S3FileSystem::UploadBufferImplementation (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer, string query_param, bool single_upload) {
371+ auto &s3fs = (S3FileSystem &)file_handle.file_system ;
372+
362373 unique_ptr<HTTPResponse> res;
363374 string etag;
364375
@@ -376,6 +387,9 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
376387 }
377388 etag = res->headers .GetHeaderValue (" ETag" );
378389 } catch (std::exception &ex) {
390+ if (single_upload) {
391+ throw ;
392+ }
379393 ErrorData error (ex);
380394 if (error.Type () != ExceptionType::IO && error.Type () != ExceptionType::HTTP) {
381395 throw ;
@@ -387,6 +401,7 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
387401 file_handle.upload_exception = std::current_exception ();
388402 }
389403
404+ D_ASSERT (!single_upload); // If we are here we are in the multi-buffer situation
390405 NotifyUploadsInProgress (file_handle);
391406 return ;
392407 }
@@ -401,8 +416,6 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
401416
402417 // Free up space for another thread to acquire an S3WriteBuffer
403418 write_buffer.reset ();
404-
405- NotifyUploadsInProgress (file_handle);
406419}
407420
408421void S3FileSystem::FlushBuffer (S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
@@ -465,7 +478,13 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
465478 file_handle.write_buffers_lock .unlock ();
466479
467480 if (file_handle.initialized_multipart_upload == false ) {
468- file_handle.multipart_upload_id = InitializeMultipartUpload (file_handle);
481+ if (to_flush.size () == 1 ) {
482+ UploadSingleBuffer (file_handle, to_flush[0 ]);
483+ file_handle.upload_finalized = true ;
484+ return ;
485+ } else {
486+ file_handle.multipart_upload_id = InitializeMultipartUpload (file_handle);
487+ }
469488 }
470489 // Flush all buffers that aren't already uploading
471490 for (auto &write_buffer : to_flush) {
@@ -483,6 +502,10 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
483502
484503void S3FileSystem::FinalizeMultipartUpload (S3FileHandle &file_handle) {
485504 auto &s3fs = (S3FileSystem &)file_handle.file_system ;
505+ if (file_handle.upload_finalized ) {
506+ return ;
507+ }
508+
486509 file_handle.upload_finalized = true ;
487510
488511 std::stringstream ss;
0 commit comments