Skip to content

Commit a631115

Browse files
Merge pull request ClickHouse#86988 from ClickHouse/backport/25.8/86932
Backport ClickHouse#86932 to 25.8: Process exception thrown from background pool
2 parents b459b60 + 38964fa commit a631115

File tree

8 files changed

+120
-6
lines changed

8 files changed

+120
-6
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/AvroForIcebergDeserializer.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
namespace DB::ErrorCodes
1717
{
1818
extern const int ICEBERG_SPECIFICATION_VIOLATION;
19+
extern const int INCORRECT_DATA;
1920
}
2021

2122
namespace DB::Iceberg
@@ -24,9 +25,8 @@ namespace DB::Iceberg
2425
using namespace DB;
2526

2627
AvroForIcebergDeserializer::AvroForIcebergDeserializer(
27-
std::unique_ptr<ReadBufferFromFileBase> buffer_,
28-
const std::string & manifest_file_path_,
29-
const DB::FormatSettings & format_settings)
28+
std::unique_ptr<ReadBufferFromFileBase> buffer_, const std::string & manifest_file_path_, const DB::FormatSettings & format_settings)
29+
try
3030
: buffer(std::move(buffer_))
3131
, manifest_file_path(manifest_file_path_)
3232
{
@@ -51,6 +51,10 @@ AvroForIcebergDeserializer::AvroForIcebergDeserializer(
5151
parsed_column = std::move(columns[0]);
5252
parsed_column_data_type = std::dynamic_pointer_cast<const DataTypeTuple>(data_type);
5353
}
54+
catch (const std::exception & e)
55+
{
56+
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot read Iceberg avro manifest file '{}': {}", manifest_file_path_, e.what());
57+
}
5458

5559
size_t AvroForIcebergDeserializer::rows() const
5660
{

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,24 @@ IcebergIterator::IcebergIterator(
269269
{
270270
while (!blocking_queue.isFinished())
271271
{
272-
auto info = data_files_iterator.next();
273-
if (!info.has_value())
272+
std::optional<ManifestFileEntry> entry;
273+
try
274+
{
275+
entry = data_files_iterator.next();
276+
}
277+
catch (...)
278+
{
279+
std::lock_guard lock(exception_mutex);
280+
if (!exception)
281+
{
282+
exception = std::current_exception();
283+
}
284+
blocking_queue.finish();
274285
break;
275-
while (!blocking_queue.push(std::move(info.value())))
286+
}
287+
if (!entry.has_value())
288+
break;
289+
while (!blocking_queue.push(std::move(entry.value())))
276290
{
277291
if (blocking_queue.isFinished())
278292
{
@@ -321,6 +335,15 @@ ObjectInfoPtr IcebergIterator::next(size_t)
321335
}
322336
return object_info;
323337
}
338+
{
339+
std::lock_guard lock(exception_mutex);
340+
if (exception)
341+
{
342+
auto exception_message = getExceptionMessage(exception, true, true);
343+
auto exception_code = getExceptionErrorCode(exception);
344+
throw DB::Exception(exception_code, "Iceberg iterator is failed with exception: {}", exception_message);
345+
}
346+
}
324347
return nullptr;
325348
}
326349

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ class IcebergIterator : public IObjectIterator
109109
const String compression_method;
110110
std::vector<Iceberg::ManifestFileEntry> position_deletes_files;
111111
std::vector<Iceberg::ManifestFileEntry> equality_deletes_files;
112+
std::exception_ptr exception;
113+
std::mutex exception_mutex;
112114
};
113115
}
114116

tests/queries/0_stateless/03611_verify_exception_in_iceberg_iterator.reference

Whitespace-only changes.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
-- Tags: no-fasttest
3+
-- Tag no-fasttest: Depends on AWS
4+
5+
-- Verify that exception is thrown when Iceberg table contains corrupted Avro manifest files.
6+
-- This test verifies not so much thrown error during avro files parsing but rather
7+
-- that we correctly process exceptions thrown from iceberg iterator in background execution.
8+
9+
SELECT * FROM icebergS3('http://localhost:11111/test/corrupted_avro_files_test/', 'clickhouse', 'clickhouse') SETTINGS use_iceberg_metadata_files_cache = False; -- { serverError INCORRECT_DATA}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Intentionally corrupted file which pretends to have avro format.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{
2+
"format-version" : 2,
3+
"table-uuid" : "a5920f9f-12d0-4773-87fc-569c00676258",
4+
"location" : "s3a://test/corrupted_avro_files_test",
5+
"last-sequence-number" : 1,
6+
"last-updated-ms" : 1740666694866,
7+
"last-column-id" : 2,
8+
"current-schema-id" : 0,
9+
"schemas" : [ {
10+
"type" : "struct",
11+
"schema-id" : 0,
12+
"fields" : [ {
13+
"id" : 1,
14+
"name" : "order_number",
15+
"required" : false,
16+
"type" : "long"
17+
}, {
18+
"id" : 2,
19+
"name" : "product_code",
20+
"required" : false,
21+
"type" : "string"
22+
} ]
23+
} ],
24+
"default-spec-id" : 0,
25+
"partition-specs" : [ {
26+
"spec-id" : 0,
27+
"fields" : [ ]
28+
} ],
29+
"last-partition-id" : 999,
30+
"default-sort-order-id" : 0,
31+
"sort-orders" : [ {
32+
"order-id" : 0,
33+
"fields" : [ ]
34+
} ],
35+
"properties" : {
36+
"owner" : "divanik",
37+
"option.format-version" : "2",
38+
"write.parquet.compression-codec" : "zstd"
39+
},
40+
"current-snapshot-id" : 3858160944799047644,
41+
"refs" : {
42+
"main" : {
43+
"snapshot-id" : 3858160944799047644,
44+
"type" : "branch"
45+
}
46+
},
47+
"snapshots" : [ {
48+
"sequence-number" : 1,
49+
"snapshot-id" : 3858160944799047644,
50+
"timestamp-ms" : 1740666694866,
51+
"summary" : {
52+
"operation" : "append",
53+
"spark.app.id" : "local-1740666666602",
54+
"added-data-files" : "1",
55+
"added-records" : "1",
56+
"added-files-size" : "701",
57+
"changed-partition-count" : "1",
58+
"total-records" : "1",
59+
"total-files-size" : "701",
60+
"total-data-files" : "1",
61+
"total-delete-files" : "0",
62+
"total-position-deletes" : "0",
63+
"total-equality-deletes" : "0"
64+
},
65+
"manifest-list" : "s3a://test/corrupted_avro_files_test/metadata/snap-3858160944799047644-1-83521e92-07a7-450d-ba44-3a179b730c85.avro",
66+
"schema-id" : 0
67+
} ],
68+
"statistics" : [ ],
69+
"partition-statistics" : [ ],
70+
"snapshot-log" : [ {
71+
"timestamp-ms" : 1740666694866,
72+
"snapshot-id" : 3858160944799047644
73+
} ],
74+
"metadata-log" : []
75+
}

0 commit comments

Comments
 (0)