Skip to content

Commit 0181ff1

Browse files
Backport ClickHouse#86725 to 25.8: Fix misleading specified upload does not exist error reporting because the original exception is lost in multipart upload
1 parent a786e8d commit 0181ff1

File tree

2 files changed

+63
-33
lines changed

2 files changed

+63
-33
lines changed

src/IO/S3/copyS3File.cpp

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ namespace
117117
size_t num_parts;
118118
size_t normal_part_size;
119119
String multipart_upload_id;
120-
std::atomic<bool> multipart_upload_aborted = false;
120+
std::atomic<bool> upload_part_failed = false;
121121
Strings part_tags;
122122

123123
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
@@ -177,8 +177,6 @@ namespace
177177

178178
void completeMultipartUpload()
179179
{
180-
if (multipart_upload_aborted)
181-
return;
182180

183181
LOG_TRACE(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
184182

@@ -246,8 +244,6 @@ namespace
246244
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadAbort,
247245
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
248246
outcome.IsSuccess() ? nullptr : &outcome.GetError());
249-
250-
multipart_upload_aborted = true;
251247
}
252248

253249
void checkObjectAfterUpload()
@@ -269,8 +265,8 @@ namespace
269265
{
270266
for (size_t part_number = 1; position < end_position; ++part_number)
271267
{
272-
if (multipart_upload_aborted)
273-
break; /// No more part uploads.
268+
if (upload_part_failed)
269+
break;
274270

275271
size_t next_position = std::min(position + normal_part_size, end_position);
276272
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
@@ -288,9 +284,13 @@ namespace
288284
catch (...)
289285
{
290286
tryLogCurrentException(log, fmt::format("While performing multipart upload of {}", dest_key));
291-
// Multipart upload failed because it wasn't possible to schedule all the tasks.
292-
// To avoid execution of already scheduled tasks we abort MultipartUpload.
293-
abortMultipartUpload();
287+
// Multipart upload failed because not all tasks could be scheduled.
288+
// waitForAllBackgroundTasks will rethrow the actual exception.
289+
{
290+
std::lock_guard lock(bg_tasks_mutex);
291+
if (!bg_exception)
292+
bg_exception = std::current_exception();
293+
}
294294
waitForAllBackgroundTasks();
295295
throw;
296296
}
@@ -399,12 +399,10 @@ namespace
399399
}
400400
catch (...)
401401
{
402+
tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number));
402403
std::lock_guard lock(bg_tasks_mutex);
403404
if (!bg_exception)
404-
{
405-
tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number));
406405
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
407-
}
408406
}
409407
task_finish_notify();
410408
}, Priority{});
@@ -428,8 +426,8 @@ namespace
428426

429427
void processUploadTask(UploadPartTask & task)
430428
{
431-
if (multipart_upload_aborted)
432-
return; /// Already aborted.
429+
if (upload_part_failed)
430+
return; /// Skipped: another upload task failed.
433431

434432
auto request = makeUploadPartRequest(task.part_number, task.part_offset, task.part_size);
435433
auto tag = processUploadPartRequest(*request);
@@ -452,15 +450,14 @@ namespace
452450

453451
std::unique_lock lock(bg_tasks_mutex);
454452
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
455-
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
453+
bg_tasks_condvar.wait(
454+
lock,
455+
[this]()
456+
{ return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
456457

457458
auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
458459
if (exception)
459460
{
460-
/// abortMultipartUpload() might be called already, see processUploadPartRequest().
461-
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
462-
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
463-
/// all storage consumed by all parts.
464461
abortMultipartUpload();
465462

466463
std::rethrow_exception(exception);
@@ -643,7 +640,7 @@ namespace
643640

644641
if (!outcome.IsSuccess())
645642
{
646-
abortMultipartUpload();
643+
upload_part_failed = true;
647644
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
648645
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
649646
}
@@ -861,7 +858,7 @@ namespace
861858
auto outcome = client_ptr->UploadPartCopy(req);
862859
if (!outcome.IsSuccess())
863860
{
864-
abortMultipartUpload();
861+
upload_part_failed = true;
865862
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
866863
}
867864

tests/integration/test_backup_restore_s3/test.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,50 @@ def test_backup_to_s3_native_copy_multipart():
487487
)
488488

489489

490+
@pytest.fixture(scope="module")
491+
def init_broken_s3():
492+
yield start_s3_mock(cluster, "broken_s3", "8083")
493+
494+
495+
@pytest.fixture(scope="function")
496+
def broken_s3(init_broken_s3):
497+
init_broken_s3.reset()
498+
yield init_broken_s3
499+
500+
501+
def test_backup_to_s3_copy_multipart_check_error_message(broken_s3):
502+
storage_policy = "policy_s3"
503+
size = 10000000
504+
backup_name = new_backup_name()
505+
backup_destination = f"S3('http://resolver:8083/root/data/backups/multipart/{backup_name}', 'minio', '{minio_secret_key}')"
506+
node = cluster.instances["node"]
507+
508+
node.query(
509+
f"""
510+
DROP TABLE IF EXISTS data SYNC;
511+
CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}';
512+
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size} {format_settings(None)};
513+
OPTIMIZE TABLE data FINAL;
514+
"""
515+
)
516+
517+
try:
518+
backup_query_id = uuid.uuid4().hex
519+
broken_s3.setup_at_part_upload(after=20, count=1)
520+
error = node.query_and_get_error(
521+
f"BACKUP TABLE data TO {backup_destination} {format_settings(None)}",
522+
query_id=backup_query_id,
523+
)
524+
525+
assert "mock s3 injected unretryable error" in error, error
526+
finally:
527+
node.query(
528+
"""
529+
DROP TABLE data SYNC;
530+
"""
531+
)
532+
533+
490534
def test_incremental_backup_append_table_def():
491535
backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', '{minio_secret_key}')"
492536

@@ -918,17 +962,6 @@ def test_backup_restore_s3_plain():
918962
instance.query("DROP TABLE sample_restored SYNC")
919963

920964

921-
@pytest.fixture(scope="module")
922-
def init_broken_s3():
923-
yield start_s3_mock(cluster, "broken_s3", "8083")
924-
925-
926-
@pytest.fixture(scope="function")
927-
def broken_s3(init_broken_s3):
928-
init_broken_s3.reset()
929-
yield init_broken_s3
930-
931-
932965
@pytest.mark.parametrize(
933966
"to_disk",
934967
[

0 commit comments

Comments
 (0)