Skip to content

Commit 345efb7

Browse files
authored
Fix memory leak when allocation failure in IoTConsensus queue (#16966)
* Fix memory leak when allocation failure in IoTConsensus queue. * spotless
1 parent b30fd34 commit 345efb7

File tree

2 files changed

+218
-0
lines changed

2 files changed

+218
-0
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ private boolean reserve(long size, boolean fromQueue) {
9898
result = queueMemorySizeInByte.addAndGet(size) < maxMemorySizeForQueueInByte;
9999
if (!result) {
100100
queueMemorySizeInByte.addAndGet(-size);
101+
memorySizeInByte.addAndGet(-size);
101102
}
102103
} else {
103104
syncMemorySizeInByte.addAndGet(size);
@@ -172,6 +173,16 @@ long getSyncMemorySizeInByte() {
172173
return syncMemorySizeInByte.get();
173174
}
174175

176+
@TestOnly
177+
public Long getMaxMemorySizeInByte() {
178+
return maxMemorySizeInByte;
179+
}
180+
181+
@TestOnly
182+
public Long getMaxMemorySizeForQueueInByte() {
183+
return maxMemorySizeForQueueInByte;
184+
}
185+
175186
private static final IoTConsensusMemoryManager INSTANCE = new IoTConsensusMemoryManager();
176187

177188
public static IoTConsensusMemoryManager getInstance() {
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.consensus.iot.logdispatcher;
21+
22+
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
23+
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
24+
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
25+
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
26+
27+
import org.junit.Test;
28+
29+
import java.nio.ByteBuffer;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.List;
33+
34+
import static org.junit.Assert.assertEquals;
35+
import static org.junit.Assert.assertFalse;
36+
import static org.junit.Assert.assertTrue;
37+
38+
public class IoTConsensusMemoryManagerTest {
39+
40+
@Test
41+
public void testAllocateQueue() {
42+
IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance();
43+
long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte();
44+
45+
long occupiedMemory = 0;
46+
IndexedConsensusRequest request;
47+
List<IndexedConsensusRequest> requestList = new ArrayList<>();
48+
while (occupiedMemory <= maxMemory) {
49+
request =
50+
new IndexedConsensusRequest(
51+
0,
52+
Collections.singletonList(
53+
new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4 * 1024 * 1024]))));
54+
request.buildSerializedRequests();
55+
long requestSize = request.getMemorySize();
56+
if (occupiedMemory + requestSize < maxMemory) {
57+
boolean reserved = memoryManager.reserve(request);
58+
assertTrue(reserved);
59+
occupiedMemory += requestSize;
60+
assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte());
61+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
62+
requestList.add(request);
63+
} else {
64+
assertFalse(memoryManager.reserve(request));
65+
break;
66+
}
67+
}
68+
assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
69+
70+
for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
71+
memoryManager.free(indexedConsensusRequest);
72+
occupiedMemory -= indexedConsensusRequest.getMemorySize();
73+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
74+
assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte());
75+
}
76+
}
77+
78+
@Test
79+
public void testAllocateBatch() {
80+
IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance();
81+
long maxMemory = memoryManager.getQueueMemorySizeInByte();
82+
83+
long occupiedMemory = 0;
84+
85+
Batch batch;
86+
int batchSize = 5;
87+
List<Batch> batchList = new ArrayList<>();
88+
while (occupiedMemory < maxMemory) {
89+
batch = new Batch(IoTConsensusConfig.newBuilder().build());
90+
for (int i = 0; i < batchSize; i++) {
91+
IndexedConsensusRequest request;
92+
request =
93+
new IndexedConsensusRequest(
94+
0,
95+
Collections.singletonList(
96+
new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[1024 * 1024]))));
97+
batch.addTLogEntry(
98+
new TLogEntry(
99+
request.getSerializedRequests(),
100+
request.getSearchIndex(),
101+
false,
102+
request.getMemorySize()));
103+
}
104+
105+
long requestSize = batch.getMemorySize();
106+
if (occupiedMemory + requestSize < maxMemory) {
107+
assertTrue(memoryManager.reserve(batch));
108+
occupiedMemory += requestSize;
109+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
110+
batchList.add(batch);
111+
} else {
112+
assertFalse(memoryManager.reserve(batch));
113+
}
114+
}
115+
assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
116+
117+
for (Batch b : batchList) {
118+
memoryManager.free(b);
119+
occupiedMemory -= b.getMemorySize();
120+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
121+
}
122+
}
123+
124+
@Test
125+
public void testAllocateMixed() {
126+
IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance();
127+
long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte();
128+
129+
long occupiedMemory = 0;
130+
IndexedConsensusRequest request;
131+
List<IndexedConsensusRequest> requestList = new ArrayList<>();
132+
Batch batch;
133+
int batchSize = 5;
134+
List<Batch> batchList = new ArrayList<>();
135+
136+
int i = 0;
137+
while (occupiedMemory <= maxMemory) {
138+
if (i % 2 == 0) {
139+
request =
140+
new IndexedConsensusRequest(
141+
0,
142+
Collections.singletonList(
143+
new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4 * 1024 * 1024]))));
144+
request.buildSerializedRequests();
145+
long requestSize = request.getMemorySize();
146+
if (occupiedMemory + requestSize < maxMemory) {
147+
boolean reserved = memoryManager.reserve(request);
148+
assertTrue(reserved);
149+
occupiedMemory += requestSize;
150+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
151+
requestList.add(request);
152+
} else {
153+
assertFalse(memoryManager.reserve(request));
154+
break;
155+
}
156+
} else {
157+
batch = new Batch(IoTConsensusConfig.newBuilder().build());
158+
for (int j = 0; j < batchSize; j++) {
159+
IndexedConsensusRequest batchRequest;
160+
batchRequest =
161+
new IndexedConsensusRequest(
162+
0,
163+
Collections.singletonList(
164+
new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[1024 * 1024]))));
165+
batch.addTLogEntry(
166+
new TLogEntry(
167+
batchRequest.getSerializedRequests(),
168+
batchRequest.getSearchIndex(),
169+
false,
170+
batchRequest.getMemorySize()));
171+
}
172+
173+
long requestSize = batch.getMemorySize();
174+
if (occupiedMemory + requestSize < maxMemory) {
175+
assertTrue(memoryManager.reserve(batch));
176+
occupiedMemory += requestSize;
177+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
178+
batchList.add(batch);
179+
} else {
180+
assertFalse(memoryManager.reserve(batch));
181+
}
182+
}
183+
i++;
184+
}
185+
assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
186+
187+
while (!requestList.isEmpty() || !batchList.isEmpty()) {
188+
if (!requestList.isEmpty()) {
189+
request = requestList.remove(0);
190+
memoryManager.free(request);
191+
occupiedMemory -= request.getMemorySize();
192+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
193+
i--;
194+
}
195+
if (!batchList.isEmpty()) {
196+
batch = batchList.remove(0);
197+
memoryManager.free(batch);
198+
occupiedMemory -= batch.getMemorySize();
199+
assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
200+
i--;
201+
}
202+
}
203+
assertEquals(0, i);
204+
assertEquals(0, memoryManager.getMemorySizeInByte());
205+
assertEquals(0, memoryManager.getQueueMemorySizeInByte());
206+
}
207+
}

0 commit comments

Comments
 (0)