Skip to content

Commit e5f8a19

Browse files
authored
Add a multiplier to avoid receiver OOM in IoTConsensus (#16102)
1 parent 9d513a7 commit e5f8a19

File tree

6 files changed

+35
-2
lines changed

6 files changed

+35
-2
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest
2929
private final long startSyncIndex;
3030
private final long endSyncIndex;
3131
private final List<IConsensusRequest> insertNodes;
32+
private long memorySize;
3233

3334
public DeserializedBatchIndexedConsensusRequest(
3435
long startSyncIndex, long endSyncIndex, int size) {
@@ -52,6 +53,7 @@ public List<IConsensusRequest> getInsertNodes() {
5253

5354
public void add(IConsensusRequest insertNode) {
5455
this.insertNodes.add(insertNode);
56+
this.memorySize += insertNode.getMemorySize();
5557
}
5658

5759
@Override
@@ -82,4 +84,9 @@ public int hashCode() {
8284
public ByteBuffer serializeToByteBuffer() {
8385
return null;
8486
}
87+
88+
@Override
89+
public long getMemorySize() {
90+
return memorySize;
91+
}
8592
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.utils.RetryUtils;
2424
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
25+
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
2526
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
2627
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
2728
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
@@ -93,6 +94,10 @@ public void onComplete(TSyncLogEntriesRes response) {
9394
}
9495
completeBatch(batch);
9596
}
97+
if (response.isSetReceiverMemSize()) {
98+
LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize());
99+
LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize());
100+
}
96101
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - createTime);
97102
}
98103

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,16 @@ public void addTLogEntry(TLogEntry entry) {
6363
}
6464

6565
public boolean canAccumulate() {
66+
// When reading entries from the WAL, the memory size is calculated based on the serialized
67+
// size, which can be significantly smaller than the actual size.
68+
// Thus, we add a multiplier to sender's memory size to estimate the receiver's memory cost.
69+
// The multiplier is calculated based on the receiver's feedback.
70+
long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get();
71+
long senderMemSize = LogDispatcher.getSenderMemSizeSum().get();
72+
double multiplier = senderMemSize > 0 ? (double) receiverMemSize / senderMemSize : 1.0;
73+
multiplier = Math.max(multiplier, 1.0);
6674
return logEntries.size() < config.getReplication().getMaxLogEntriesNumPerBatch()
67-
&& memorySize < config.getReplication().getMaxSizePerBatch();
75+
&& ((long) (memorySize * multiplier)) < config.getReplication().getMaxSizePerBatch();
6876
}
6977

7078
public long getStartIndex() {

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public class LogDispatcher {
6969
private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
7070
private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
7171

72+
private static final AtomicLong senderMemSizeSum = new AtomicLong(0);
73+
private static final AtomicLong receiverMemSizeSum = new AtomicLong(0);
74+
7275
public LogDispatcher(
7376
IoTConsensusServerImpl impl,
7477
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) {
@@ -591,4 +594,12 @@ private void constructBatchIndexedFromConsensusRequest(
591594
request.getMemorySize()));
592595
}
593596
}
597+
598+
public static AtomicLong getReceiverMemSizeSum() {
599+
return receiverMemSizeSum;
600+
}
601+
602+
public static AtomicLong getSenderMemSizeSum() {
603+
return senderMemSizeSum;
604+
}
594605
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
129129
"execute TSyncLogEntriesReq for {} with result {}",
130130
req.consensusGroupId,
131131
writeStatus.subStatus);
132-
return new TSyncLogEntriesRes(writeStatus.subStatus);
132+
return new TSyncLogEntriesRes(writeStatus.subStatus)
133+
.setReceiverMemSize(deserializedRequest.getMemorySize());
133134
}
134135

135136
@Override

iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ struct TSyncLogEntriesReq {
3838

3939
struct TSyncLogEntriesRes {
4040
1: required list<common.TSStatus> statuses
41+
2: optional i64 receiverMemSize
4142
}
4243

4344
struct TInactivatePeerReq {

0 commit comments

Comments
 (0)