Skip to content

Commit 1f137f7

Browse files
authored
Merge pull request #128 from carlopi/one_request_upload
Single PUT when uploading single buffer-files to S3 (version 2)
2 parents b273f4d + b30a6c5 commit 1f137f7

File tree

4 files changed

+47
-9
lines changed

4 files changed

+47
-9
lines changed

extension/httpfs/include/s3fs.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class S3FileHandle : public HTTPFileHandle {
127127

128128
S3AuthParams auth_params;
129129
const S3ConfigParams config_params;
130+
bool initialized_multipart_upload {false};
130131

131132
public:
132133
void Close() override;
@@ -214,6 +215,8 @@ class S3FileSystem : public HTTPFileSystem {
214215
// Uploads the contents of write_buffer to S3.
215216
// Note: caller is responsible to not call this method twice on the same buffer
216217
static void UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer);
218+
static void UploadSingleBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer);
219+
static void UploadBufferImplementation(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer, string query_param, bool direct_throw);
217220

218221
vector<OpenFileInfo> Glob(const string &glob_pattern, FileOpener *opener = nullptr) override;
219222
bool ListFiles(const string &directory, const std::function<void(const string &, bool)> &callback,

extension/httpfs/s3fs.cpp

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ string S3FileSystem::InitializeMultipartUpload(S3FileHandle &file_handle) {
336336

337337
open_tag_pos += 10; // Skip open tag
338338

339+
file_handle.initialized_multipart_upload = true;
340+
339341
return result.substr(open_tag_pos, close_tag_pos - open_tag_pos);
340342
}
341343

@@ -353,10 +355,21 @@ void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) {
353355
}
354356

355357
void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
356-
auto &s3fs = (S3FileSystem &)file_handle.file_system;
357-
358358
string query_param = "partNumber=" + to_string(write_buffer->part_no + 1) + "&" +
359359
"uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true);
360+
361+
UploadBufferImplementation(file_handle, write_buffer, query_param, false);
362+
363+
NotifyUploadsInProgress(file_handle);
364+
}
365+
366+
void S3FileSystem::UploadSingleBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
367+
UploadBufferImplementation(file_handle, write_buffer, "", true);
368+
}
369+
370+
void S3FileSystem::UploadBufferImplementation(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer, string query_param, bool single_upload) {
371+
auto &s3fs = (S3FileSystem &)file_handle.file_system;
372+
360373
unique_ptr<HTTPResponse> res;
361374
string etag;
362375

@@ -374,6 +387,9 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
374387
}
375388
etag = res->headers.GetHeaderValue("ETag");
376389
} catch (std::exception &ex) {
390+
if (single_upload) {
391+
throw;
392+
}
377393
ErrorData error(ex);
378394
if (error.Type() != ExceptionType::IO && error.Type() != ExceptionType::HTTP) {
379395
throw;
@@ -385,6 +401,7 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
385401
file_handle.upload_exception = std::current_exception();
386402
}
387403

404+
D_ASSERT(!single_upload); // If we are here we are in the multi-buffer situation
388405
NotifyUploadsInProgress(file_handle);
389406
return;
390407
}
@@ -399,8 +416,6 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
399416

400417
// Free up space for another thread to acquire an S3WriteBuffer
401418
write_buffer.reset();
402-
403-
NotifyUploadsInProgress(file_handle);
404419
}
405420

406421
void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
@@ -437,6 +452,9 @@ void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuff
437452
#endif
438453
file_handle.uploads_in_progress++;
439454
}
455+
if (file_handle.initialized_multipart_upload == false) {
456+
file_handle.multipart_upload_id = InitializeMultipartUpload(file_handle);
457+
}
440458

441459
#ifdef SAME_THREAD_UPLOAD
442460
UploadBuffer(file_handle, write_buffer);
@@ -459,6 +477,16 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
459477
}
460478
file_handle.write_buffers_lock.unlock();
461479

480+
if (file_handle.initialized_multipart_upload == false) {
481+
// TODO (carlo): unclear how to handle kms_key_id, but given currently they are custom, leave the multiupload codepath in that case
482+
if (to_flush.size() == 1 && file_handle.auth_params.kms_key_id.empty()) {
483+
UploadSingleBuffer(file_handle, to_flush[0]);
484+
file_handle.upload_finalized= true;
485+
return;
486+
} else {
487+
file_handle.multipart_upload_id = InitializeMultipartUpload(file_handle);
488+
}
489+
}
462490
// Flush all buffers that aren't already uploading
463491
for (auto &write_buffer : to_flush) {
464492
if (!write_buffer->uploading) {
@@ -475,6 +503,10 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
475503

476504
void S3FileSystem::FinalizeMultipartUpload(S3FileHandle &file_handle) {
477505
auto &s3fs = (S3FileSystem &)file_handle.file_system;
506+
if (file_handle.upload_finalized) {
507+
return;
508+
}
509+
478510
file_handle.upload_finalized = true;
479511

480512
std::stringstream ss;
@@ -889,8 +921,6 @@ void S3FileHandle::Initialize(optional_ptr<FileOpener> opener) {
889921
part_size = ((minimum_part_size + Storage::DEFAULT_BLOCK_SIZE - 1) / Storage::DEFAULT_BLOCK_SIZE) *
890922
Storage::DEFAULT_BLOCK_SIZE;
891923
D_ASSERT(part_size * max_part_count >= config_params.max_file_size);
892-
893-
multipart_upload_id = s3fs.InitializeMultipartUpload(*this);
894924
}
895925
}
896926

test/configs/duckdb-tests.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
"test/sql/secrets/secret_autoloading_errors.test",
1212
"test/sql/copy/csv/zstd_crash.test"
1313
]
14+
},
15+
{
16+
"reason": "Improved from 1 PUT + 2 POST to 1 PUT",
17+
"paths": [
18+
"test/sql/copy/s3/metadata_cache.test"
19+
]
1420
}
1521
]
1622
}

test/sql/copy/no_head_on_write.test

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,18 @@ copy (select 1 as a) to 's3://test-bucket/test-file.parquet'
4848
query I
4949
select request.type FROM duckdb_logs_parsed('HTTP')
5050
----
51-
POST
5251
PUT
53-
POST
5452

5553
statement ok
5654
CALL truncate_duckdb_logs();
5755

5856
statement ok
59-
copy (select 1 as a) to 's3://test-bucket/test-file.csv'
57+
copy (select random() as a FROM range(8000000)) to 's3://test-bucket/test-file2.csv'
6058

6159
query I
6260
select request.type FROM duckdb_logs_parsed('HTTP')
6361
----
6462
POST
6563
PUT
64+
PUT
6665
POST

0 commit comments

Comments
 (0)