Skip to content

Commit eb4217e

Browse files
committed
GH-48105: [C++][Parquet][IPC] Cap allocated memory when fuzzing
1 parent 5112de2 commit eb4217e

File tree

10 files changed

+247
-8
lines changed

10 files changed

+247
-8
lines changed

cpp/src/arrow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ set(ARROW_UTIL_SRCS
511511
util/float16.cc
512512
util/formatting.cc
513513
util/future.cc
514+
util/fuzz_internal.cc
514515
util/hashing.cc
515516
util/int_util.cc
516517
util/io_util.cc

cpp/src/arrow/ipc/file_fuzz.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
#include "arrow/ipc/reader.h"
2121
#include "arrow/status.h"
22+
#include "arrow/util/fuzz_internal.h"
2223
#include "arrow/util/macros.h"
2324

2425
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
2526
auto status = arrow::ipc::internal::FuzzIpcFile(data, static_cast<int64_t>(size));
26-
ARROW_UNUSED(status);
27+
arrow::internal::NoteFuzzStatus(status, data, static_cast<int64_t>(size));
2728
return 0;
2829
}

cpp/src/arrow/ipc/reader.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "arrow/util/checked_cast.h"
5454
#include "arrow/util/compression.h"
5555
#include "arrow/util/endian.h"
56+
#include "arrow/util/fuzz_internal.h"
5657
#include "arrow/util/key_value_metadata.h"
5758
#include "arrow/util/logging_internal.h"
5859
#include "arrow/util/parallel.h"
@@ -2618,14 +2619,21 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
26182619
return st;
26192620
}
26202621

2622+
IpcReadOptions FuzzingOptions() {
2623+
IpcReadOptions options;
2624+
options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
2625+
return options;
2626+
}
2627+
26212628
} // namespace
26222629

26232630
Status FuzzIpcStream(const uint8_t* data, int64_t size) {
26242631
auto buffer = std::make_shared<Buffer>(data, size);
26252632
io::BufferReader buffer_reader(buffer);
26262633

26272634
std::shared_ptr<RecordBatchReader> batch_reader;
2628-
ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchStreamReader::Open(&buffer_reader));
2635+
ARROW_ASSIGN_OR_RAISE(batch_reader,
2636+
RecordBatchStreamReader::Open(&buffer_reader, FuzzingOptions()));
26292637
Status st;
26302638

26312639
while (true) {
@@ -2645,7 +2653,8 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
26452653
io::BufferReader buffer_reader(buffer);
26462654

26472655
std::shared_ptr<RecordBatchFileReader> batch_reader;
2648-
ARROW_ASSIGN_OR_RAISE(batch_reader, RecordBatchFileReader::Open(&buffer_reader));
2656+
ARROW_ASSIGN_OR_RAISE(batch_reader,
2657+
RecordBatchFileReader::Open(&buffer_reader, FuzzingOptions()));
26492658
Status st;
26502659

26512660
const int n_batches = batch_reader->num_record_batches();

cpp/src/arrow/ipc/stream_fuzz.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
#include "arrow/ipc/reader.h"
2121
#include "arrow/status.h"
22+
#include "arrow/util/fuzz_internal.h"
2223
#include "arrow/util/macros.h"
2324

2425
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
2526
auto status = arrow::ipc::internal::FuzzIpcStream(data, static_cast<int64_t>(size));
26-
ARROW_UNUSED(status);
27+
arrow::internal::NoteFuzzStatus(status, data, static_cast<int64_t>(size));
2728
return 0;
2829
}

cpp/src/arrow/memory_pool.h

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "arrow/result.h"
2727
#include "arrow/status.h"
2828
#include "arrow/type_fwd.h"
29+
#include "arrow/util/macros.h"
2930
#include "arrow/util/visibility.h"
3031

3132
namespace arrow {
@@ -245,6 +246,64 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
245246
std::unique_ptr<ProxyMemoryPoolImpl> impl_;
246247
};
247248

249+
/// EXPERIMENTAL MemoryPool wrapper with an upper limit
250+
class ARROW_EXPORT CappedMemoryPool : public MemoryPool {
251+
public:
252+
CappedMemoryPool(MemoryPool* wrapped_pool, int64_t bytes_allocated_limit)
253+
: wrapped_(wrapped_pool), bytes_allocated_limit_(bytes_allocated_limit) {}
254+
255+
using MemoryPool::Allocate;
256+
using MemoryPool::Reallocate;
257+
258+
Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
259+
const auto attempted = size + wrapped_->bytes_allocated();
260+
if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
261+
return OutOfMemory(attempted);
262+
}
263+
return wrapped_->Allocate(size, alignment, out);
264+
}
265+
266+
Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
267+
uint8_t** ptr) override {
268+
const auto attempted = new_size - old_size + wrapped_->bytes_allocated();
269+
if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
270+
return OutOfMemory(attempted);
271+
}
272+
return wrapped_->Reallocate(old_size, new_size, alignment, ptr);
273+
}
274+
275+
void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
276+
return wrapped_->Free(buffer, size, alignment);
277+
}
278+
279+
void ReleaseUnused() override { wrapped_->ReleaseUnused(); }
280+
281+
void PrintStats() override { wrapped_->PrintStats(); }
282+
283+
int64_t bytes_allocated() const override { return wrapped_->bytes_allocated(); }
284+
285+
int64_t max_memory() const override { return wrapped_->max_memory(); }
286+
287+
int64_t total_bytes_allocated() const override {
288+
return wrapped_->total_bytes_allocated();
289+
}
290+
291+
int64_t num_allocations() const override { return wrapped_->num_allocations(); }
292+
293+
std::string backend_name() const override { return wrapped_->backend_name(); }
294+
295+
private:
296+
Status OutOfMemory(int64_t value) {
297+
return Status::OutOfMemory(
298+
"MemoryPool bytes_allocated cap exceeded: "
299+
"limit=",
300+
bytes_allocated_limit_, ", attempted=", value);
301+
}
302+
303+
MemoryPool* wrapped_;
304+
int64_t bytes_allocated_limit_;
305+
};
306+
248307
/// \brief Return a process-wide memory pool based on the system allocator.
249308
ARROW_EXPORT MemoryPool* system_memory_pool();
250309

cpp/src/arrow/memory_pool_test.cc

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <algorithm>
1919
#include <cstdint>
20+
#include <memory>
2021

2122
#include <gtest/gtest.h>
2223

@@ -290,4 +291,92 @@ TEST(Jemalloc, GetAllocationStats) {
290291
#endif
291292
}
292293

294+
class TestCappedMemoryPool : public ::arrow::TestMemoryPoolBase {
295+
public:
296+
MemoryPool* memory_pool() override { return InitPool(/*limit=*/ 1'000'000'000LL); }
297+
298+
MemoryPool* InitPool(int64_t limit) {
299+
proxy_memory_pool_ = std::make_shared<ProxyMemoryPool>(default_memory_pool());
300+
capped_memory_pool_ = std::make_shared<CappedMemoryPool>(proxy_memory_pool_.get(), limit);
301+
return capped_memory_pool_.get();
302+
}
303+
304+
protected:
305+
std::shared_ptr<MemoryPool> proxy_memory_pool_;
306+
std::shared_ptr<CappedMemoryPool> capped_memory_pool_;
307+
};
308+
309+
TEST_F(TestCappedMemoryPool, MemoryTracking) { this->TestMemoryTracking(); }
310+
311+
TEST_F(TestCappedMemoryPool, OOM) {
312+
// CappedMemoryPool rejects the huge allocation without hitting the underlying
313+
// allocator, so this should work even under Address Sanitizer.
314+
this->TestOOM();
315+
}
316+
317+
TEST_F(TestCappedMemoryPool, Reallocate) { this->TestReallocate(); }
318+
319+
TEST_F(TestCappedMemoryPool, Alignment) { this->TestAlignment(); }
320+
321+
TEST_F(TestCappedMemoryPool, AllocateLimit) {
322+
auto pool = InitPool(/*limit=*/ 1000);
323+
324+
uint8_t* data1;
325+
uint8_t* data2;
326+
ASSERT_OK(pool->Allocate(600, &data1));
327+
ASSERT_EQ(600, pool->bytes_allocated());
328+
ASSERT_EQ(600, pool->total_bytes_allocated());
329+
ASSERT_EQ(600, pool->max_memory());
330+
331+
ASSERT_OK(pool->Allocate(400, &data2));
332+
ASSERT_EQ(1000, pool->bytes_allocated());
333+
ASSERT_EQ(1000, pool->total_bytes_allocated());
334+
ASSERT_EQ(1000, pool->max_memory());
335+
pool->Free(data2, 400);
336+
ASSERT_EQ(600, pool->bytes_allocated());
337+
ASSERT_EQ(1000, pool->total_bytes_allocated());
338+
ASSERT_EQ(1000, pool->max_memory());
339+
340+
ASSERT_OK(pool->Allocate(300, &data2));
341+
ASSERT_EQ(900, pool->bytes_allocated());
342+
ASSERT_EQ(1300, pool->total_bytes_allocated());
343+
ASSERT_EQ(1000, pool->max_memory());
344+
pool->Free(data2, 300);
345+
ASSERT_EQ(600, pool->bytes_allocated());
346+
ASSERT_EQ(1300, pool->total_bytes_allocated());
347+
ASSERT_EQ(1000, pool->max_memory());
348+
349+
ASSERT_RAISES(OutOfMemory, pool->Allocate(401, &data2));
350+
ASSERT_EQ(600, pool->bytes_allocated());
351+
ASSERT_EQ(1300, pool->total_bytes_allocated());
352+
ASSERT_EQ(1000, pool->max_memory());
353+
354+
pool->Free(data1, 600);
355+
}
356+
357+
TEST_F(TestCappedMemoryPool, ReallocateLimit) {
358+
auto pool = InitPool(/*limit=*/ 1000);
359+
360+
uint8_t* data1;
361+
uint8_t* data2;
362+
ASSERT_OK(pool->Allocate(600, &data1));
363+
ASSERT_OK(pool->Allocate(400, &data2));
364+
ASSERT_EQ(1000, pool->bytes_allocated());
365+
ASSERT_EQ(1000, pool->total_bytes_allocated());
366+
ASSERT_EQ(1000, pool->max_memory());
367+
368+
ASSERT_OK(pool->Reallocate(400, 300, &data2));
369+
ASSERT_EQ(900, pool->bytes_allocated());
370+
ASSERT_EQ(1000, pool->total_bytes_allocated());
371+
ASSERT_EQ(1000, pool->max_memory());
372+
373+
ASSERT_RAISES(OutOfMemory, pool->Reallocate(300, 401, &data2));
374+
ASSERT_EQ(900, pool->bytes_allocated());
375+
ASSERT_EQ(1000, pool->total_bytes_allocated());
376+
ASSERT_EQ(1000, pool->max_memory());
377+
378+
pool->Free(data1, 600);
379+
pool->Free(data2, 300);
380+
}
381+
293382
} // namespace arrow
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/util/fuzz_internal.h"
19+
20+
#include "arrow/memory_pool.h"
21+
#include "arrow/status.h"
22+
#include "arrow/util/logging_internal.h"
23+
24+
namespace arrow::internal {
25+
26+
MemoryPool* fuzzing_memory_pool() {
27+
static auto pool = std::make_shared<::arrow::CappedMemoryPool>(
28+
::arrow::default_memory_pool(), /*bytes_allocated_limit=*/kFuzzingMemoryLimit);
29+
return pool.get();
30+
}
31+
32+
void NoteFuzzStatus(const Status& st, const uint8_t* data, int64_t size) {
33+
if (st.IsOutOfMemory()) {
34+
ARROW_LOG(WARNING) << "Fuzzing input with size=" << size
35+
<< " hit allocation failure: " << st.ToString();
36+
}
37+
}
38+
39+
} // namespace arrow::internal

cpp/src/arrow/util/fuzz_internal.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstdint>
21+
22+
#include "arrow/type_fwd.h"
23+
#include "arrow/util/macros.h"
24+
25+
namespace arrow::internal {
26+
27+
// The default rss_limit_mb on OSS-Fuzz is 2560 MB and we want to fail allocations
28+
// before that limit is reached, otherwise the fuzz target gets killed (GH-48105).
29+
constexpr int64_t kFuzzingMemoryLimit = 2200LL * 1000 * 1000;
30+
31+
/// Return a memory pool that will not allocate more than kFuzzingMemoryLimit bytes.
32+
ARROW_EXPORT MemoryPool* fuzzing_memory_pool();
33+
34+
// Optionally log the outcome of fuzzing an input
35+
ARROW_EXPORT void NoteFuzzStatus(const Status&, const uint8_t* data, int64_t size);
36+
37+
} // namespace arrow::internal

cpp/src/parquet/arrow/fuzz.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
// under the License.
1717

1818
#include "arrow/status.h"
19+
#include "arrow/util/fuzz_internal.h"
1920
#include "parquet/arrow/reader.h"
2021

2122
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
2223
auto status = parquet::arrow::internal::FuzzReader(data, static_cast<int64_t>(size));
23-
ARROW_UNUSED(status);
24+
arrow::internal::NoteFuzzStatus(status, data, static_cast<int64_t>(size));
2425
return 0;
2526
}

cpp/src/parquet/arrow/reader.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
#include "arrow/buffer.h"
2929
#include "arrow/extension_type.h"
3030
#include "arrow/io/memory.h"
31+
#include "arrow/memory_pool.h"
3132
#include "arrow/record_batch.h"
3233
#include "arrow/table.h"
3334
#include "arrow/type.h"
3435
#include "arrow/type_traits.h"
3536
#include "arrow/util/async_generator.h"
3637
#include "arrow/util/bit_util.h"
3738
#include "arrow/util/future.h"
39+
#include "arrow/util/fuzz_internal.h"
3840
#include "arrow/util/iterator.h"
3941
#include "arrow/util/logging_internal.h"
4042
#include "arrow/util/parallel.h"
@@ -1398,7 +1400,7 @@ namespace internal {
13981400

13991401
namespace {
14001402

1401-
Status FuzzReader(std::unique_ptr<FileReader> reader) {
1403+
Status FuzzReadData(std::unique_ptr<FileReader> reader) {
14021404
auto st = Status::OK();
14031405
for (int i = 0; i < reader->num_row_groups(); ++i) {
14041406
std::shared_ptr<Table> table;
@@ -1418,7 +1420,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
14181420

14191421
auto buffer = std::make_shared<::arrow::Buffer>(data, size);
14201422
auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
1421-
auto pool = ::arrow::default_memory_pool();
1423+
auto pool = ::arrow::internal::fuzzing_memory_pool();
14221424

14231425
// Read Parquet file metadata only once, which will reduce iteration time slightly
14241426
std::shared_ptr<FileMetaData> pq_md;
@@ -1440,7 +1442,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
14401442

14411443
std::unique_ptr<FileReader> reader;
14421444
RETURN_NOT_OK(FileReader::Make(pool, std::move(pq_file_reader), properties, &reader));
1443-
st &= FuzzReader(std::move(reader));
1445+
st &= FuzzReadData(std::move(reader));
14441446
}
14451447
return st;
14461448
}

0 commit comments

Comments
 (0)