Skip to content

Commit ad9f51e

Browse files
committed
Merge branch '25.8' of github.com:ClickHouse/ClickHouse into backport/25.8/86166
2 parents 14ddb7a + 6f57cd8 commit ad9f51e

17 files changed

+341
-44
lines changed

src/IO/ReadBuffer.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ bool ReadBuffer::next()
107107
}
108108
else
109109
{
110+
/// It might happen that we need to skip all data in the buffer,
111+
/// in this case we should call next() one more time to load new data.
112+
if (nextimpl_working_buffer_offset == working_buffer.size())
113+
{
114+
pos = working_buffer.end();
115+
nextimpl_working_buffer_offset = 0;
116+
return next();
117+
}
118+
110119
pos = working_buffer.begin() + std::min(nextimpl_working_buffer_offset, working_buffer.size());
111120
chassert(position() < working_buffer.end());
112121
}

src/IO/S3/copyS3File.cpp

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ namespace
117117
size_t num_parts;
118118
size_t normal_part_size;
119119
String multipart_upload_id;
120-
std::atomic<bool> multipart_upload_aborted = false;
120+
std::atomic<bool> upload_part_failed = false;
121121
Strings part_tags;
122122

123123
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
@@ -177,8 +177,6 @@ namespace
177177

178178
void completeMultipartUpload()
179179
{
180-
if (multipart_upload_aborted)
181-
return;
182180

183181
LOG_TRACE(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
184182

@@ -246,8 +244,6 @@ namespace
246244
blob_storage_log->addEvent(BlobStorageLogElement::EventType::MultiPartUploadAbort,
247245
dest_bucket, dest_key, /* local_path_ */ {}, /* data_size */ 0,
248246
outcome.IsSuccess() ? nullptr : &outcome.GetError());
249-
250-
multipart_upload_aborted = true;
251247
}
252248

253249
void checkObjectAfterUpload()
@@ -269,8 +265,8 @@ namespace
269265
{
270266
for (size_t part_number = 1; position < end_position; ++part_number)
271267
{
272-
if (multipart_upload_aborted)
273-
break; /// No more part uploads.
268+
if (upload_part_failed)
269+
break;
274270

275271
size_t next_position = std::min(position + normal_part_size, end_position);
276272
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
@@ -288,9 +284,13 @@ namespace
288284
catch (...)
289285
{
290286
tryLogCurrentException(log, fmt::format("While performing multipart upload of {}", dest_key));
291-
// Multipart upload failed because it wasn't possible to schedule all the tasks.
292-
// To avoid execution of already scheduled tasks we abort MultipartUpload.
293-
abortMultipartUpload();
287+
// Multipart upload failed because not all tasks could be scheduled.
288+
// waitForAllBackgroundTasks will rethrow the actual exception.
289+
{
290+
std::lock_guard lock(bg_tasks_mutex);
291+
if (!bg_exception)
292+
bg_exception = std::current_exception();
293+
}
294294
waitForAllBackgroundTasks();
295295
throw;
296296
}
@@ -399,12 +399,10 @@ namespace
399399
}
400400
catch (...)
401401
{
402+
tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number));
402403
std::lock_guard lock(bg_tasks_mutex);
403404
if (!bg_exception)
404-
{
405-
tryLogCurrentException(log, fmt::format("While writing part #{}", task->part_number));
406405
bg_exception = std::current_exception(); /// The exception will be rethrown after all background tasks stop working.
407-
}
408406
}
409407
task_finish_notify();
410408
}, Priority{});
@@ -428,8 +426,8 @@ namespace
428426

429427
void processUploadTask(UploadPartTask & task)
430428
{
431-
if (multipart_upload_aborted)
432-
return; /// Already aborted.
429+
if (upload_part_failed)
430+
return; /// Skipped: another upload task failed.
433431

434432
auto request = makeUploadPartRequest(task.part_number, task.part_offset, task.part_size);
435433
auto tag = processUploadPartRequest(*request);
@@ -452,15 +450,14 @@ namespace
452450

453451
std::unique_lock lock(bg_tasks_mutex);
454452
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
455-
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
453+
bg_tasks_condvar.wait(
454+
lock,
455+
[this]()
456+
{ return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
456457

457458
auto exception = TSA_SUPPRESS_WARNING_FOR_READ(bg_exception);
458459
if (exception)
459460
{
460-
/// abortMultipartUpload() might be called already, see processUploadPartRequest().
461-
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
462-
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
463-
/// all storage consumed by all parts.
464461
abortMultipartUpload();
465462

466463
std::rethrow_exception(exception);
@@ -643,7 +640,7 @@ namespace
643640

644641
if (!outcome.IsSuccess())
645642
{
646-
abortMultipartUpload();
643+
upload_part_failed = true;
647644
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
648645
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
649646
}
@@ -861,7 +858,7 @@ namespace
861858
auto outcome = client_ptr->UploadPartCopy(req);
862859
if (!outcome.IsSuccess())
863860
{
864-
abortMultipartUpload();
861+
upload_part_failed = true;
865862
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
866863
}
867864

src/Storages/IPartitionStrategy.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,16 @@ std::shared_ptr<IPartitionStrategy> PartitionStrategyFactory::get(StrategyType s
232232
globbed_path,
233233
partition_columns_in_data_file);
234234
case StrategyType::NONE:
235+
{
236+
if (!partition_columns_in_data_file && strategy == PartitionStrategyFactory::StrategyType::NONE)
237+
{
238+
throw Exception(
239+
ErrorCodes::BAD_ARGUMENTS,
240+
"Partition strategy `none` cannot be used with partition_columns_in_data_file=0");
241+
}
235242
/// Unreachable for plain object storage, used only by Data Lakes for now
236243
return nullptr;
244+
}
237245
}
238246
}
239247

src/Storages/ObjectStorage/DataLakes/Iceberg/AvroForIcebergDeserializer.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
namespace DB::ErrorCodes
1717
{
1818
extern const int ICEBERG_SPECIFICATION_VIOLATION;
19+
extern const int INCORRECT_DATA;
1920
}
2021

2122
namespace DB::Iceberg
@@ -24,9 +25,8 @@ namespace DB::Iceberg
2425
using namespace DB;
2526

2627
AvroForIcebergDeserializer::AvroForIcebergDeserializer(
27-
std::unique_ptr<ReadBufferFromFileBase> buffer_,
28-
const std::string & manifest_file_path_,
29-
const DB::FormatSettings & format_settings)
28+
std::unique_ptr<ReadBufferFromFileBase> buffer_, const std::string & manifest_file_path_, const DB::FormatSettings & format_settings)
29+
try
3030
: buffer(std::move(buffer_))
3131
, manifest_file_path(manifest_file_path_)
3232
{
@@ -51,6 +51,10 @@ AvroForIcebergDeserializer::AvroForIcebergDeserializer(
5151
parsed_column = std::move(columns[0]);
5252
parsed_column_data_type = std::dynamic_pointer_cast<const DataTypeTuple>(data_type);
5353
}
54+
catch (const std::exception & e)
55+
{
56+
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read Iceberg avro manifest file '{}': {}", manifest_file_path_, e.what());
57+
}
5458

5559
size_t AvroForIcebergDeserializer::rows() const
5660
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,24 @@ IcebergIterator::IcebergIterator(
269269
{
270270
while (!blocking_queue.isFinished())
271271
{
272-
auto info = data_files_iterator.next();
273-
if (!info.has_value())
272+
std::optional<ManifestFileEntry> entry;
273+
try
274+
{
275+
entry = data_files_iterator.next();
276+
}
277+
catch (...)
278+
{
279+
std::lock_guard lock(exception_mutex);
280+
if (!exception)
281+
{
282+
exception = std::current_exception();
283+
}
284+
blocking_queue.finish();
274285
break;
275-
while (!blocking_queue.push(std::move(info.value())))
286+
}
287+
if (!entry.has_value())
288+
break;
289+
while (!blocking_queue.push(std::move(entry.value())))
276290
{
277291
if (blocking_queue.isFinished())
278292
{
@@ -321,6 +335,15 @@ ObjectInfoPtr IcebergIterator::next(size_t)
321335
}
322336
return object_info;
323337
}
338+
{
339+
std::lock_guard lock(exception_mutex);
340+
if (exception)
341+
{
342+
auto exception_message = getExceptionMessage(exception, true, true);
343+
auto exception_code = getExceptionErrorCode(exception);
344+
throw DB::Exception(exception_code, "Iceberg iterator is failed with exception: {}", exception_message);
345+
}
346+
}
324347
return nullptr;
325348
}
326349

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ class IcebergIterator : public IObjectIterator
109109
const String compression_method;
110110
std::vector<Iceberg::ManifestFileEntry> position_deletes_files;
111111
std::vector<Iceberg::ManifestFileEntry> equality_deletes_files;
112+
std::exception_ptr exception;
113+
std::mutex exception_mutex;
112114
};
113115
}
114116

src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,11 @@ void StorageObjectStorageConfiguration::initialize(
8282
}
8383
else if (configuration_to_initialize.partition_strategy_type == PartitionStrategyFactory::StrategyType::NONE)
8484
{
85-
// Promote to wildcard in case it is not data lake to make it backwards compatible
86-
configuration_to_initialize.partition_strategy_type = PartitionStrategyFactory::StrategyType::WILDCARD;
85+
if (configuration_to_initialize.getRawPath().hasPartitionWildcard())
86+
{
87+
// Promote to wildcard in case it is not data lake to make it backwards compatible
88+
configuration_to_initialize.partition_strategy_type = PartitionStrategyFactory::StrategyType::WILDCARD;
89+
}
8790
}
8891

8992
if (configuration_to_initialize.format == "auto")

tests/integration/test_backup_restore_s3/test.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,50 @@ def test_backup_to_s3_native_copy_multipart():
487487
)
488488

489489

490+
@pytest.fixture(scope="module")
491+
def init_broken_s3():
492+
yield start_s3_mock(cluster, "broken_s3", "8083")
493+
494+
495+
@pytest.fixture(scope="function")
496+
def broken_s3(init_broken_s3):
497+
init_broken_s3.reset()
498+
yield init_broken_s3
499+
500+
501+
def test_backup_to_s3_copy_multipart_check_error_message(broken_s3):
502+
storage_policy = "policy_s3"
503+
size = 10000000
504+
backup_name = new_backup_name()
505+
backup_destination = f"S3('http://resolver:8083/root/data/backups/multipart/{backup_name}', 'minio', '{minio_secret_key}')"
506+
node = cluster.instances["node"]
507+
508+
node.query(
509+
f"""
510+
DROP TABLE IF EXISTS data SYNC;
511+
CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}';
512+
INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size} {format_settings(None)};
513+
OPTIMIZE TABLE data FINAL;
514+
"""
515+
)
516+
517+
try:
518+
backup_query_id = uuid.uuid4().hex
519+
broken_s3.setup_at_part_upload(after=20, count=1)
520+
error = node.query_and_get_error(
521+
f"BACKUP TABLE data TO {backup_destination} {format_settings(None)}",
522+
query_id=backup_query_id,
523+
)
524+
525+
assert "mock s3 injected unretryable error" in error, error
526+
finally:
527+
node.query(
528+
"""
529+
DROP TABLE data SYNC;
530+
"""
531+
)
532+
533+
490534
def test_incremental_backup_append_table_def():
491535
backup_name = f"S3('http://minio1:9001/root/data/backups/{new_backup_name()}', 'minio', '{minio_secret_key}')"
492536

@@ -918,17 +962,6 @@ def test_backup_restore_s3_plain():
918962
instance.query("DROP TABLE sample_restored SYNC")
919963

920964

921-
@pytest.fixture(scope="module")
922-
def init_broken_s3():
923-
yield start_s3_mock(cluster, "broken_s3", "8083")
924-
925-
926-
@pytest.fixture(scope="function")
927-
def broken_s3(init_broken_s3):
928-
init_broken_s3.reset()
929-
yield init_broken_s3
930-
931-
932965
@pytest.mark.parametrize(
933966
"to_disk",
934967
[

0 commit comments

Comments
 (0)