Skip to content

Commit 3b4f4a7

Browse files
authored
Merge pull request #101 from carlopi/fixes_wasm_single_thread
Fixes for DuckDB-Wasm single thread mode
2 parents e9bb991 + d54ddf3 commit 3b4f4a7

File tree

1 file changed

+25
-5
lines changed

1 file changed

+25
-5
lines changed

extension/httpfs/s3fs.cpp

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222

2323
#include <iostream>
2424
#include <thread>
25+
#ifdef EMSCRIPTEN
26+
#define SAME_THREAD_UPLOAD
27+
#endif
2528

2629
namespace duckdb {
2730

@@ -72,6 +75,7 @@ static HTTPHeaders create_s3_header(string url, string query, string host, strin
7275
hash_str canonical_request_hash_str;
7376
if (content_type.length() > 0) {
7477
signed_headers += "content-type;";
78+
res["content-type"] = content_type;
7579
}
7680
signed_headers += "host;x-amz-content-sha256;x-amz-date";
7781
if (auth_params.session_token.length() > 0) {
@@ -342,8 +346,10 @@ void S3FileSystem::NotifyUploadsInProgress(S3FileHandle &file_handle) {
342346
}
343347
// Note that there are 2 cv's because otherwise we might deadlock when the final flushing thread is notified while
344348
// another thread is still waiting for an upload thread
349+
#ifndef SAME_THREAD_UPLOAD
345350
file_handle.uploads_in_progress_cv.notify_one();
346351
file_handle.final_flush_cv.notify_one();
352+
#endif
347353
}
348354

349355
void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer) {
@@ -363,10 +369,15 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuf
363369
static_cast<int>(res->status));
364370
}
365371

366-
if (!res->headers.HasHeader("ETag")) {
372+
if (!res->headers.HasHeader("ETag") && !res->headers.HasHeader("etag")) {
367373
throw IOException("Unexpected response when uploading part to S3");
368374
}
369-
etag = res->headers.GetHeaderValue("ETag");
375+
376+
if (res->headers.HasHeader("ETag")) {
377+
etag = res->headers.GetHeaderValue("ETag");
378+
} else if (res->headers.HasHeader("etag")) {
379+
etag = res->headers.GetHeaderValue("etag");
380+
}
370381
} catch (std::exception &ex) {
371382
ErrorData error(ex);
372383
if (error.Type() != ExceptionType::IO && error.Type() != ExceptionType::HTTP) {
@@ -420,18 +431,25 @@ void S3FileSystem::FlushBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuff
420431

421432
{
422433
unique_lock<mutex> lck(file_handle.uploads_in_progress_lock);
423-
// check if there are upload threads available
434+
// check if there are upload threads available
435+
#ifndef SAME_THREAD_UPLOAD
424436
if (file_handle.uploads_in_progress >= file_handle.config_params.max_upload_threads) {
425437
// there are not - wait for one to become available
426438
file_handle.uploads_in_progress_cv.wait(lck, [&file_handle] {
427439
return file_handle.uploads_in_progress < file_handle.config_params.max_upload_threads;
428440
});
429441
}
442+
#endif
430443
file_handle.uploads_in_progress++;
431444
}
432445

433-
thread upload_thread(UploadBuffer, std::ref(file_handle), write_buffer);
434-
upload_thread.detach();
446+
#ifdef SAME_THREAD_UPLOAD
447+
UploadBuffer(file_handle, write_buffer);
448+
return;
449+
#endif
450+
451+
thread upload_thread(UploadBuffer, std::ref(file_handle), write_buffer);
452+
upload_thread.detach();
435453
}
436454

437455
// Note that FlushAll currently does not allow to continue writing afterwards. Therefore, FinalizeMultipartUpload should
@@ -453,7 +471,9 @@ void S3FileSystem::FlushAllBuffers(S3FileHandle &file_handle) {
453471
}
454472
}
455473
unique_lock<mutex> lck(file_handle.uploads_in_progress_lock);
474+
#ifndef SAME_THREAD_UPLOAD
456475
file_handle.final_flush_cv.wait(lck, [&file_handle] { return file_handle.uploads_in_progress == 0; });
476+
#endif
457477

458478
file_handle.RethrowIOError();
459479
}

0 commit comments

Comments
 (0)