Skip to content

Commit 9ec7152

Browse files
[client]Fix memory leak if oom when decompress data in VectorLoader. (#2647)
1 parent 4e54416 commit 9ec7152

File tree

3 files changed

+419
-3
lines changed

3 files changed

+419
-3
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fluss.record;
21+
22+
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf;
23+
import org.apache.fluss.shaded.arrow.org.apache.arrow.util.Collections2;
24+
import org.apache.fluss.shaded.arrow.org.apache.arrow.util.Preconditions;
25+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
26+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.TypeLayout;
27+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
28+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionCodec;
29+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionUtil;
30+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionUtil.CodecType;
31+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.NoCompressionCodec;
32+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.ArrowFieldNode;
33+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
34+
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Field;
35+
36+
import java.util.ArrayList;
37+
import java.util.Iterator;
38+
import java.util.List;
39+
40+
/**
41+
* A patched version of Arrow's {@code VectorLoader} that ensures decompressed buffers are properly
42+
* released when an error (e.g. OOM) occurs during {@link #load(ArrowRecordBatch)}.
43+
*
44+
* <p>In the original Arrow implementation, the decompression loop runs <b>outside</b> the
45+
* try-finally block that guards {@code loadFieldBuffers}. This means if decompression succeeds for
46+
* the first N buffers of a field but fails on the (N+1)-th buffer, the already-decompressed buffers
47+
* in {@code ownBuffers} are never closed, leaking Direct Memory.
48+
*
49+
* <p>This workaround moves the decompression loop <b>inside</b> the try block so that the finally
50+
* clause always closes every buffer in {@code ownBuffers}, regardless of whether the load succeeds
51+
* or fails:
52+
*
53+
* <ul>
54+
* <li><b>Success path:</b> {@code loadFieldBuffers} retains each buffer (ref count +1), then the
55+
* finally close decrements it back (ref count -1). The field vector still holds the buffer.
56+
* <li><b>Error path:</b> The finally close decrements each already-decompressed buffer's ref
57+
* count to 0, immediately freeing the Direct Memory.
58+
* </ul>
59+
*
60+
* <p>TODO: This class should be removed once Apache Arrow fixes the buffer leak in their {@code
61+
* VectorLoader.loadBuffers()} method. See:
62+
*
63+
* <ul>
64+
* <li>Apache Arrow issue: <a
65+
* href="https://github.com/apache/arrow-java/issues/1037">arrow-java#1037</a>
66+
* <li>Fluss issue: <a href="https://github.com/apache/fluss/issues/2646">FLUSS-2646</a>
67+
* </ul>
68+
*/
69+
public class FlussVectorLoader {
70+
private final VectorSchemaRoot root;
71+
private final CompressionCodec.Factory factory;
72+
private boolean decompressionNeeded;
73+
74+
public FlussVectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) {
75+
this.root = root;
76+
this.factory = factory;
77+
}
78+
79+
public void load(ArrowRecordBatch recordBatch) {
80+
Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
81+
Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
82+
CompressionUtil.CodecType codecType =
83+
CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec());
84+
this.decompressionNeeded = codecType != CodecType.NO_COMPRESSION;
85+
CompressionCodec codec =
86+
this.decompressionNeeded
87+
? this.factory.createCodec(codecType)
88+
: NoCompressionCodec.INSTANCE;
89+
90+
for (FieldVector fieldVector : this.root.getFieldVectors()) {
91+
this.loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec);
92+
}
93+
94+
this.root.setRowCount(recordBatch.getLength());
95+
if (nodes.hasNext() || buffers.hasNext()) {
96+
throw new IllegalArgumentException(
97+
"not all nodes and buffers were consumed. nodes: "
98+
+ Collections2.toString(nodes)
99+
+ " buffers: "
100+
+ Collections2.toString(buffers));
101+
}
102+
}
103+
104+
private void loadBuffers(
105+
FieldVector vector,
106+
Field field,
107+
Iterator<ArrowBuf> buffers,
108+
Iterator<ArrowFieldNode> nodes,
109+
CompressionCodec codec) {
110+
Preconditions.checkArgument(
111+
nodes.hasNext(), "no more field nodes for field %s and vector %s", field, vector);
112+
ArrowFieldNode fieldNode = nodes.next();
113+
int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
114+
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
115+
116+
try {
117+
for (int j = 0; j < bufferLayoutCount; ++j) {
118+
ArrowBuf nextBuf = buffers.next();
119+
ArrowBuf bufferToAdd =
120+
nextBuf.writerIndex() > 0L
121+
? codec.decompress(vector.getAllocator(), nextBuf)
122+
: nextBuf;
123+
ownBuffers.add(bufferToAdd);
124+
if (this.decompressionNeeded) {
125+
nextBuf.getReferenceManager().retain();
126+
}
127+
}
128+
vector.loadFieldBuffers(fieldNode, ownBuffers);
129+
} catch (RuntimeException e) {
130+
throw new IllegalArgumentException(
131+
"Could not load buffers for field "
132+
+ field
133+
+ ". error message: "
134+
+ e.getMessage(),
135+
e);
136+
} finally {
137+
if (this.decompressionNeeded) {
138+
for (ArrowBuf buf : ownBuffers) {
139+
buf.close();
140+
}
141+
}
142+
}
143+
144+
List<Field> children = field.getChildren();
145+
if (!children.isEmpty()) {
146+
List<FieldVector> childrenFromFields = vector.getChildrenFromFields();
147+
Preconditions.checkArgument(
148+
children.size() == childrenFromFields.size(),
149+
"should have as many children as in the schema: found %s expected %s",
150+
childrenFromFields.size(),
151+
children.size());
152+
153+
for (int i = 0; i < childrenFromFields.size(); ++i) {
154+
Field child = children.get(i);
155+
FieldVector fieldVector = childrenFromFields.get(i);
156+
this.loadBuffers(fieldVector, child, buffers, nodes, codec);
157+
}
158+
}
159+
}
160+
}

fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.compression.ArrowCompressionFactory;
2222
import org.apache.fluss.exception.FlussRuntimeException;
2323
import org.apache.fluss.memory.MemorySegment;
24+
import org.apache.fluss.record.FlussVectorLoader;
2425
import org.apache.fluss.row.arrow.ArrowReader;
2526
import org.apache.fluss.row.arrow.vectors.ArrowArrayColumnVector;
2627
import org.apache.fluss.row.arrow.vectors.ArrowBigIntColumnVector;
@@ -86,7 +87,6 @@
8687
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ValueVector;
8788
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VarBinaryVector;
8889
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VarCharVector;
89-
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorLoader;
9090
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
9191
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
9292
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
@@ -172,8 +172,8 @@ public static ArrowReader createArrowReader(
172172
try (ReadChannel channel =
173173
new ReadChannel(new ByteBufferReadableChannel(arrowBatchBuffer));
174174
ArrowRecordBatch batch = deserializeRecordBatch(channel, allocator)) {
175-
VectorLoader vectorLoader =
176-
new VectorLoader(schemaRoot, ArrowCompressionFactory.INSTANCE);
175+
FlussVectorLoader vectorLoader =
176+
new FlussVectorLoader(schemaRoot, ArrowCompressionFactory.INSTANCE);
177177
vectorLoader.load(batch);
178178
List<ColumnVector> columnVectors = new ArrayList<>();
179179
List<FieldVector> fieldVectors = schemaRoot.getFieldVectors();

0 commit comments

Comments
 (0)