Skip to content

Commit f74159f

Browse files
author
xiao.dong
committed
fix arrow memory leak
1 parent d283d38 commit f74159f

File tree

4 files changed

+118
-4
lines changed

4 files changed

+118
-4
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ set(ICEBERG_SOURCES
4545
type.cc
4646
manifest_reader.cc
4747
manifest_reader_internal.cc
48+
arrow_struct_guard.cc
4849
util/murmurhash3_internal.cc
4950
util/timepoint.cc
5051
util/unreachable.cc

src/iceberg/arrow_struct_guard.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow_struct_guard.h"
21+
22+
namespace iceberg::internal {
23+
24+
ArrowArrayGuard::~ArrowArrayGuard() {
25+
if (array_ != nullptr) {
26+
ArrowArrayRelease(array_);
27+
}
28+
}
29+
30+
ArrowSchemaGuard::~ArrowSchemaGuard() {
31+
if (schema_ != nullptr) {
32+
ArrowSchemaRelease(schema_);
33+
}
34+
}
35+
36+
ArrowArrayViewGuard::~ArrowArrayViewGuard() {
37+
if (view_ != nullptr) {
38+
ArrowArrayViewReset(view_);
39+
}
40+
}
41+
42+
ArrowArrayBufferGuard::~ArrowArrayBufferGuard() {
43+
if (buffer_ != nullptr) {
44+
ArrowBufferReset(buffer_);
45+
}
46+
}
47+
48+
} // namespace iceberg::internal

src/iceberg/arrow_struct_guard.h

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <nanoarrow/nanoarrow.h>
23+
24+
#include "iceberg/arrow_c_data.h"
25+
26+
namespace iceberg::internal {
27+
28+
class ArrowArrayGuard {
29+
public:
30+
ArrowArrayGuard(ArrowArray* array) : array_(array) {}
31+
~ArrowArrayGuard();
32+
33+
private:
34+
ArrowArray* array_;
35+
};
36+
37+
class ArrowSchemaGuard {
38+
public:
39+
ArrowSchemaGuard(ArrowSchema* schema) : schema_(schema) {}
40+
~ArrowSchemaGuard();
41+
42+
private:
43+
ArrowSchema* schema_;
44+
};
45+
46+
class ArrowArrayViewGuard {
47+
public:
48+
ArrowArrayViewGuard(ArrowArrayView* view) : view_(view) {}
49+
~ArrowArrayViewGuard();
50+
51+
private:
52+
ArrowArrayView* view_;
53+
};
54+
55+
class ArrowArrayBufferGuard {
56+
public:
57+
ArrowArrayBufferGuard(ArrowBuffer* buffer) : buffer_(buffer) {}
58+
~ArrowArrayBufferGuard();
59+
60+
private:
61+
ArrowBuffer* buffer_;
62+
};
63+
64+
} // namespace iceberg::internal

src/iceberg/manifest_reader_internal.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include <nanoarrow/nanoarrow.h>
2525

26+
#include "iceberg/arrow_struct_guard.h"
2627
#include "iceberg/manifest_entry.h"
2728
#include "iceberg/manifest_list.h"
2829
#include "iceberg/schema.h"
@@ -51,6 +52,7 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ParseManifestListEntry(
5152
ArrowArrayView array_view;
5253
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
5354
ARROW_RETURN_IF_NOT_OK(status, error);
55+
internal::ArrowArrayViewGuard view_guard(&array_view);
5456
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
5557
ARROW_RETURN_IF_NOT_OK(status, error);
5658
status = ArrowArrayViewValidate(&array_view, NANOARROW_VALIDATION_LEVEL_FULL, &error);
@@ -65,8 +67,6 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ParseManifestListEntry(
6567
for (int64_t idx = 0; idx < array_in->n_children; idx++) {
6668
const auto& field = iceberg_schema.GetFieldByIndex(idx);
6769
if (!field.has_value()) {
68-
ArrowArrayRelease(array_in);
69-
ArrowArrayViewReset(&array_view);
7070
return InvalidArgument("Field not found in schema: {}", idx);
7171
}
7272
auto field_name = field.value().get().name();
@@ -197,8 +197,6 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ParseManifestListEntry(
197197
}
198198
}
199199
#undef PARSE_PRIMITIVE_FIELD
200-
ArrowArrayRelease(array_in);
201-
ArrowArrayViewReset(&array_view);
202200
return manifest_files;
203201
} // namespace iceberg
204202

@@ -213,6 +211,7 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ManifestListReaderImpl::Files
213211
return InvalidArgument("Get schema failed in reader:{}",
214212
arrow_schema.error().message);
215213
}
214+
internal::ArrowSchemaGuard schema_guard(&arrow_schema.value());
216215
auto schema = FromArrowSchema(arrow_schema.value(), std::nullopt);
217216
if (!schema.has_value()) {
218217
return InvalidArgument("Parse iceberg schema failed:{}", schema.error().message);
@@ -224,6 +223,7 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ManifestListReaderImpl::Files
224223
result.error().message);
225224
}
226225
if (result.value().has_value()) {
226+
internal::ArrowArrayGuard array_guard(&result.value().value());
227227
auto parse_result = ParseManifestListEntry(
228228
&arrow_schema.value(), &result.value().value(), *schema.value());
229229
if (!parse_result.has_value()) {
@@ -234,6 +234,7 @@ Result<std::vector<std::unique_ptr<ManifestFile>>> ManifestListReaderImpl::Files
234234
std::make_move_iterator(parse_result.value().begin()),
235235
std::make_move_iterator(parse_result.value().end()));
236236
} else {
237+
// eof
237238
break;
238239
}
239240
}

0 commit comments

Comments
 (0)