Skip to content

Commit e262890

Browse files
authored
[ISSUE #9776] Make SharedByteBuffer size configurable via MessageStoreConfig.maxMessageSize (#9775)
* Make SharedByteBuffer size configurable via MessageStoreConfig.maxMessageSize Change-Id: Ie3c291ba10b84963fb3ba0af90afa323d9b955ff * Fix checkstyle Change-Id: I75f9f767e30f33fc2ea4ceafd59b9d950875c765 * Fix UTs Change-Id: I57b3c904d37558e4301394fc1dd4188b0866718b * Fix UTs Change-Id: I87775116926d3f5271eb13f0a86c0a40446ae432 * Fix bugs Change-Id: Ib76596b91621b59d1e189642d081d259880f9ac8 * Fix comments Change-Id: I7bdd3b9f24172afe77a7023b9aa4109dc271c27b * refactor: make SharedByteBufferManager buffer count configurable Change-Id: Ia97908f7e96f23542e8acf1a2cc6c1407d3d8e87
1 parent 2eb2129 commit e262890

File tree

8 files changed

+204
-69
lines changed

8 files changed

+204
-69
lines changed

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.apache.rocketmq.store.kv.CompactionService;
104104
import org.apache.rocketmq.store.kv.CompactionStore;
105105
import org.apache.rocketmq.store.logfile.MappedFile;
106+
import org.apache.rocketmq.store.logfile.SharedByteBufferManager;
106107
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
107108
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
108109
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
@@ -240,6 +241,10 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br
240241

241242
this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
242243

244+
if (messageStoreConfig.isWriteWithoutMmap()) {
245+
SharedByteBufferManager.getInstance().init(messageStoreConfig.getMaxMessageSize(), messageStoreConfig.getSharedByteBufferNum());
246+
}
247+
243248
this.defaultStoreMetricsManager = new DefaultStoreMetricsManager();
244249

245250
this.scheduledExecutorService =

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,9 @@ public class MessageStoreConfig {
488488
*/
489489
private boolean enableAcceleratedRecovery = false;
490490

491+
// Shared byte buffer manager configuration
492+
private int sharedByteBufferNum = 16;
493+
491494
public String getRocksdbCompressionType() {
492495
return rocksdbCompressionType;
493496
}
@@ -2060,4 +2063,12 @@ public boolean isEnableRunningFlagsInFlush() {
20602063
public void setEnableRunningFlagsInFlush(boolean enableRunningFlagsInFlush) {
20612064
this.enableRunningFlagsInFlush = enableRunningFlagsInFlush;
20622065
}
2066+
2067+
public int getSharedByteBufferNum() {
2068+
return sharedByteBufferNum;
2069+
}
2070+
2071+
public void setSharedByteBufferNum(int sharedByteBufferNum) {
2072+
this.sharedByteBufferNum = sharedByteBufferNum;
2073+
}
20632074
}

store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java

Lines changed: 48 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,9 @@
3333
import java.nio.file.Paths;
3434
import java.nio.file.StandardCopyOption;
3535
import java.util.Iterator;
36-
import java.util.concurrent.ThreadLocalRandom;
3736
import java.util.concurrent.atomic.AtomicInteger;
3837
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3938
import java.util.concurrent.atomic.AtomicLong;
40-
import java.util.concurrent.locks.ReentrantLock;
4139
import java.util.function.Consumer;
4240
import org.apache.commons.lang3.SystemUtils;
4341
import org.apache.rocketmq.common.UtilAll;
@@ -116,29 +114,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
116114
*/
117115
private long stopTimestamp = -1;
118116

119-
private static int maxSharedNum = 16;
120-
private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
121117

122-
protected RunningFlags runningFlags;
123-
124-
static class SharedByteBuffer {
125-
private final ReentrantLock lock;
126-
private final ByteBuffer buffer;
127118

128-
public SharedByteBuffer(int size) {
129-
this.lock = new ReentrantLock();
130-
this.buffer = ByteBuffer.allocateDirect(size);
131-
}
119+
protected RunningFlags runningFlags;
132120

133-
public void release() {
134-
this.lock.unlock();
135-
}
136121

137-
public ByteBuffer acquire() {
138-
this.lock.lock();
139-
return buffer;
140-
}
141-
}
142122

143123
static {
144124
WROTE_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
@@ -156,18 +136,9 @@ public ByteBuffer acquire() {
156136
}
157137
}
158138
IS_LOADED_METHOD = isLoaded0method;
159-
160-
SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum];
161-
for (int i = 0; i < maxSharedNum; i++) {
162-
SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024);
163-
}
164139
}
165140

166-
private static SharedByteBuffer borrowSharedByteBuffer() {
167-
int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
168-
SharedByteBuffer buffer = SHARED_BYTE_BUFFER[idx];
169-
return buffer;
170-
}
141+
171142

172143
public DefaultMappedFile() {
173144
}
@@ -324,10 +295,10 @@ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final C
324295
long fileFromOffset = this.getFileFromOffset();
325296

326297
if (currentPos < this.fileSize) {
327-
SharedByteBuffer sharedByteBuffer = null;
298+
SharedByteBufferManager.SharedByteBuffer sharedByteBuffer = null;
328299
ByteBuffer byteBuffer;
329300
if (writeWithoutMmap) {
330-
sharedByteBuffer = borrowSharedByteBuffer();
301+
sharedByteBuffer = SharedByteBufferManager.getInstance().borrowSharedByteBuffer();
331302
byteBuffer = sharedByteBuffer.acquire();
332303
byteBuffer.position(0).limit(byteBuffer.capacity());
333304
fileFromOffset += currentPos;
@@ -336,24 +307,28 @@ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final C
336307
byteBuffer.position(currentPos);
337308
}
338309

339-
AppendMessageResult result = cb.doAppend(byteBuffer, fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
310+
try {
311+
AppendMessageResult result = cb.doAppend(byteBuffer, fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
312+
313+
if (sharedByteBuffer != null) {
314+
try {
315+
this.fileChannel.position(currentPos);
316+
byteBuffer.position(0).limit(result.getWroteBytes());
317+
this.fileChannel.write(byteBuffer);
318+
} catch (Throwable t) {
319+
log.error("Failed to write to mappedFile {}", this.fileName, t);
320+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
321+
}
322+
}
340323

341-
if (sharedByteBuffer != null) {
342-
try {
343-
this.fileChannel.position(currentPos);
344-
byteBuffer.position(0).limit(result.getWroteBytes());
345-
this.fileChannel.write(byteBuffer);
346-
} catch (Throwable t) {
347-
log.error("Failed to write to mappedFile {}", this.fileName, t);
348-
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
349-
} finally {
324+
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
325+
this.storeTimestamp = result.getStoreTimestamp();
326+
return result;
327+
} finally {
328+
if (sharedByteBuffer != null) {
350329
sharedByteBuffer.release();
351330
}
352331
}
353-
354-
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
355-
this.storeTimestamp = result.getStoreTimestamp();
356-
return result;
357332
}
358333
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
359334
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -380,10 +355,10 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina
380355
long fileFromOffset = this.getFileFromOffset();
381356

382357
if (currentPos < this.fileSize) {
383-
SharedByteBuffer sharedByteBuffer = null;
358+
SharedByteBufferManager.SharedByteBuffer sharedByteBuffer = null;
384359
ByteBuffer byteBuffer;
385360
if (writeWithoutMmap) {
386-
sharedByteBuffer = borrowSharedByteBuffer();
361+
sharedByteBuffer = SharedByteBufferManager.getInstance().borrowSharedByteBuffer();
387362
byteBuffer = sharedByteBuffer.acquire();
388363
byteBuffer.position(0).limit(byteBuffer.capacity());
389364
fileFromOffset += currentPos;
@@ -393,27 +368,31 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina
393368
}
394369

395370
AppendMessageResult result;
396-
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
397-
// traditional batch message
398-
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
399-
(MessageExtBatch) messageExt, putMessageContext);
400-
} else if (messageExt instanceof MessageExtBrokerInner) {
401-
// traditional single message or newly introduced inner-batch message
402-
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
403-
(MessageExtBrokerInner) messageExt, putMessageContext);
404-
} else {
405-
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
406-
}
407-
408-
if (sharedByteBuffer != null) {
409-
try {
410-
this.fileChannel.position(currentPos);
411-
byteBuffer.position(0).limit(result.getWroteBytes());
412-
this.fileChannel.write(byteBuffer);
413-
} catch (Throwable t) {
414-
log.error("Failed to write to mappedFile {}", this.fileName, t);
371+
try {
372+
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
373+
// traditional batch message
374+
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
375+
(MessageExtBatch) messageExt, putMessageContext);
376+
} else if (messageExt instanceof MessageExtBrokerInner) {
377+
// traditional single message or newly introduced inner-batch message
378+
result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize - currentPos,
379+
(MessageExtBrokerInner) messageExt, putMessageContext);
380+
} else {
415381
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
416-
} finally {
382+
}
383+
384+
if (sharedByteBuffer != null) {
385+
try {
386+
this.fileChannel.position(currentPos);
387+
byteBuffer.position(0).limit(result.getWroteBytes());
388+
this.fileChannel.write(byteBuffer);
389+
} catch (Throwable t) {
390+
log.error("Failed to write to mappedFile {}", this.fileName, t);
391+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
392+
}
393+
}
394+
} finally {
395+
if (sharedByteBuffer != null) {
417396
sharedByteBuffer.release();
418397
}
419398
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.store.logfile;
18+
19+
import java.nio.ByteBuffer;
20+
import java.util.concurrent.ThreadLocalRandom;
21+
import java.util.concurrent.locks.ReentrantLock;
22+
23+
/**
24+
* Shared byte buffer manager for managing some shared ByteBuffers Buffer size is set based on MessageStoreConfig's
25+
* maxMessageSize
26+
*/
27+
public class SharedByteBufferManager {
28+
29+
private static volatile SharedByteBufferManager instance;
30+
private static final Object LOCK = new Object();
31+
32+
private SharedByteBuffer[] sharedByteBuffers;
33+
private int bufferSize;
34+
private int maxSharedNum;
35+
private volatile boolean initialized = false;
36+
37+
private SharedByteBufferManager() {
38+
// Private constructor
39+
}
40+
41+
/**
42+
* Get singleton instance
43+
*/
44+
public static SharedByteBufferManager getInstance() {
45+
if (instance == null) {
46+
synchronized (LOCK) {
47+
if (instance == null) {
48+
instance = new SharedByteBufferManager();
49+
}
50+
}
51+
}
52+
return instance;
53+
}
54+
55+
/**
56+
* Initialize shared buffers with specified messageSize size and shared buffer number
57+
*
58+
* @param maxMessageSize max messageSize size
59+
* @param sharedBufferNum number of shared buffers
60+
*/
61+
public synchronized void init(int maxMessageSize, int sharedBufferNum) {
62+
if (!initialized) {
63+
//Reserve 64kb for encoding buffer outside body
64+
bufferSize = Integer.MAX_VALUE - maxMessageSize >= 64 * 1024 ?
65+
maxMessageSize + 64 * 1024 : Integer.MAX_VALUE;
66+
67+
this.maxSharedNum = sharedBufferNum;
68+
this.sharedByteBuffers = new SharedByteBuffer[maxSharedNum];
69+
for (int i = 0; i < maxSharedNum; i++) {
70+
this.sharedByteBuffers[i] = new SharedByteBuffer(bufferSize);
71+
}
72+
this.initialized = true;
73+
}
74+
}
75+
76+
/**
77+
* Borrow a shared buffer
78+
*
79+
* @return Shared buffer
80+
*/
81+
public SharedByteBuffer borrowSharedByteBuffer() {
82+
if (!initialized) {
83+
throw new IllegalStateException("SharedByteBufferManager not initialized");
84+
}
85+
int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
86+
return sharedByteBuffers[idx];
87+
}
88+
89+
/**
90+
* Get current buffer size
91+
*
92+
* @return Buffer size
93+
*/
94+
public int getBufferSize() {
95+
return bufferSize;
96+
}
97+
98+
/**
99+
* Check if initialized
100+
*
101+
* @return Whether initialized
102+
*/
103+
public boolean isInitialized() {
104+
return initialized;
105+
}
106+
107+
/**
108+
* Shared byte buffer class
109+
*/
110+
public static class SharedByteBuffer {
111+
private final ReentrantLock lock;
112+
private final ByteBuffer buffer;
113+
114+
public SharedByteBuffer(int size) {
115+
this.lock = new ReentrantLock();
116+
this.buffer = ByteBuffer.allocateDirect(size);
117+
}
118+
119+
public void release() {
120+
this.lock.unlock();
121+
}
122+
123+
public ByteBuffer acquire() {
124+
this.lock.lock();
125+
return buffer;
126+
}
127+
}
128+
}

store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public void setUp() throws Exception {
4040
storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis();
4141
fileName = storePath + File.separator + "00000000000000000000";
4242
UtilAll.ensureDirOK(storePath);
43+
44+
// Initialize SharedByteBufferManager for tests
45+
SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 4MB default, 16 shared buffers
4346
}
4447

4548
@After

store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public void setUp() throws Exception {
4343
storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis();
4444
fileName = storePath + File.separator + "00000000000000000000";
4545
UtilAll.ensureDirOK(storePath);
46+
47+
// Initialize SharedByteBufferManager for tests
48+
SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 4MB default, 16 shared buffers
4649
}
4750

4851
@After

store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public void setUp() throws Exception {
3838
storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis();
3939
fileName = storePath + File.separator + "00000000000000000000";
4040
UtilAll.ensureDirOK(storePath);
41+
42+
// Initialize SharedByteBufferManager for tests
43+
SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 4MB default, 16 shared buffers
4144
}
4245

4346
@After

store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ public void setUp() throws Exception {
3737
storePath = System.getProperty("user.home") + File.separator + "unitteststore" + System.currentTimeMillis();
3838
fileName = storePath + File.separator + "00000000000000000000";
3939
UtilAll.ensureDirOK(storePath);
40+
41+
// Initialize SharedByteBufferManager for tests
42+
SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); // 4MB default, 16 shared buffers
4043
}
4144

4245
@After

0 commit comments

Comments
 (0)