Skip to content

Commit 2741406

Browse files
Backport ClickHouse#92390 to 25.8: Fix possible crash in aggregate functions after MEMORY_LIMIT_EXCEEDED
1 parent 092655c commit 2741406

File tree

3 files changed

+101
-7
lines changed

3 files changed

+101
-7
lines changed

src/Columns/ColumnAggregateFunction.cpp

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
#include <IO/WriteBufferFromArena.h>
1212
#include <IO/WriteBufferFromString.h>
1313
#include <Processors/Transforms/ColumnGathererTransform.h>
14+
#include <base/defines.h>
1415
#include <Common/AlignedBuffer.h>
1516
#include <Common/Arena.h>
17+
#include <Common/FailPoint.h>
1618
#include <Common/FieldVisitorToString.h>
1719
#include <Common/HashTable/Hash.h>
1820
#include <Common/SipHash.h>
@@ -32,6 +34,12 @@ namespace ErrorCodes
3234
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
3335
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
3436
extern const int NOT_IMPLEMENTED;
37+
extern const int MEMORY_LIMIT_EXCEEDED;
38+
}
39+
40+
namespace FailPoints
41+
{
42+
extern const char column_aggregate_function_ensureOwnership_exception[];
3543
}
3644

3745

@@ -226,29 +234,44 @@ void ColumnAggregateFunction::ensureOwnership()
226234
size_t size_of_state = func->sizeOfData();
227235
size_t align_of_state = func->alignOfData();
228236

237+
bool inject_memory_limit_exceeded = false;
238+
/// Avoid checking failpoint in a loop, since it uses atomic's
239+
fiu_do_on(FailPoints::column_aggregate_function_ensureOwnership_exception,
240+
{
241+
inject_memory_limit_exceeded = true;
242+
});
243+
244+
Container new_data;
245+
new_data.resize_exact(size);
246+
229247
size_t rollback_pos = 0;
230248
try
231249
{
232250
for (size_t i = 0; i < size; ++i)
233251
{
234-
ConstAggregateDataPtr old_place = data[i];
235-
data[i] = arena.alignedAlloc(size_of_state, align_of_state);
236-
func->create(data[i]);
252+
new_data[i] = arena.alignedAlloc(size_of_state, align_of_state);
253+
func->create(new_data[i]);
237254
++rollback_pos;
238-
func->merge(data[i], old_place, &arena);
255+
256+
if (unlikely(inject_memory_limit_exceeded))
257+
throw Exception(ErrorCodes::MEMORY_LIMIT_EXCEEDED, "Failpoint triggered");
258+
259+
func->merge(new_data[i], data[i], &arena);
239260
}
240261
}
241262
catch (...)
242263
{
243264
/// If we failed to take ownership, destroy all temporary data.
244-
245265
if (!func->hasTrivialDestructor())
266+
{
246267
for (size_t i = 0; i < rollback_pos; ++i)
247-
func->destroy(data[i]);
248-
268+
func->destroy(new_data[i]);
269+
}
249270
throw;
250271
}
251272

273+
data = std::move(new_data);
274+
252275
/// Now we own all data.
253276
src.reset();
254277
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#include <base/types.h>
2+
#include <AggregateFunctions/DDSketch.h>
3+
#include <AggregateFunctions/AggregateFunctionFactory.h>
4+
#include <AggregateFunctions/IAggregateFunction.h>
5+
#include <Columns/ColumnAggregateFunction.h>
6+
#include <Columns/ColumnsNumber.h>
7+
#include <DataTypes/DataTypesNumber.h>
8+
#include <IO/ReadBufferFromMemory.h>
9+
#include <IO/ReadBufferFromString.h>
10+
#include <IO/WriteBufferFromVector.h>
11+
#include <Common/Arena.h>
12+
#include <Common/Base64.h>
13+
#include <Common/FailPoint.h>
14+
#include <Common/tests/gtest_global_register.h>
15+
16+
#include <gtest/gtest.h>
17+
18+
namespace DB::FailPoints
19+
{
20+
extern const char column_aggregate_function_ensureOwnership_exception[];
21+
}
22+
23+
TEST(ColumnAggregateFunction, EnsureOwnershipExceptionLeavesCorruptedState)
24+
{
25+
tryRegisterAggregateFunctions();
26+
27+
using namespace DB;
28+
29+
// Create the aggregate function quantileDD with relative accuracy 0.01
30+
AggregateFunctionFactory & factory = AggregateFunctionFactory::instance();
31+
DataTypes argument_types = {std::make_shared<DataTypeFloat64>()};
32+
Array params = {Field(0.01), Field(0.5)};
33+
AggregateFunctionProperties properties;
34+
auto aggregate_function = factory.get("quantileDD", NullsAction::EMPTY, argument_types, params, properties);
35+
36+
// Create a source column with some data
37+
auto src_column = ColumnAggregateFunction::create(aggregate_function);
38+
Arena arena_src;
39+
auto data_column = ColumnFloat64::create();
40+
data_column->insert(Field(1.0));
41+
data_column->insert(Field(2.0));
42+
data_column->insert(Field(3.0));
43+
const IColumn * columns[1] = {data_column.get()};
44+
45+
for (size_t i = 0; i < 3; ++i)
46+
{
47+
src_column->insertDefault();
48+
aggregate_function->add(src_column->getData()[i], columns, i, &arena_src);
49+
}
50+
51+
// Create a view column from the source - this sets src pointer
52+
auto view_column = src_column->cloneEmpty();
53+
view_column->insertRangeFrom(*src_column, 0, 3);
54+
55+
// Enable failpoint that will trigger an exception during ensureOwnership
56+
// This will happen after at least one state is created and destroyed
57+
FailPointInjection::enableFailPoint(FailPoints::column_aggregate_function_ensureOwnership_exception);
58+
59+
// Try to insert - this will call ensureOwnership() which will throw
60+
// After the exception, previously, data[] points to destroyed memory where mapping == nullptr
61+
ASSERT_THROW({
62+
view_column->insertDefault();
63+
}, Exception);
64+
65+
// Disable failpoint
66+
FailPointInjection::disableFailPoint(FailPoints::column_aggregate_function_ensureOwnership_exception);
67+
68+
/// Previously leads to a crash
69+
view_column->insertDefault();
70+
}

src/Common/FailPoint.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ static struct InitFiu
8888
ONCE(execute_query_calling_empty_set_result_func_on_exception) \
8989
ONCE(receive_timeout_on_table_status_response) \
9090
ONCE(delta_kernel_fail_literal_visitor) \
91+
ONCE(column_aggregate_function_ensureOwnership_exception) \
9192
REGULAR(keepermap_fail_drop_data) \
9293
REGULAR(lazy_pipe_fds_fail_close) \
9394
PAUSEABLE(infinite_sleep) \

0 commit comments

Comments
 (0)