Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest
private final long startSyncIndex;
private final long endSyncIndex;
private final List<IConsensusRequest> insertNodes;
private long memorySize;

public DeserializedBatchIndexedConsensusRequest(
long startSyncIndex, long endSyncIndex, int size) {
Expand All @@ -52,6 +53,7 @@ public List<IConsensusRequest> getInsertNodes() {

public void add(IConsensusRequest insertNode) {
this.insertNodes.add(insertNode);
this.memorySize += insertNode.getMemorySize();
}

@Override
Expand Down Expand Up @@ -82,4 +84,9 @@ public int hashCode() {
public ByteBuffer serializeToByteBuffer() {
return null;
}

@Override
public long getMemorySize() {
return memorySize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
Expand Down Expand Up @@ -93,6 +94,10 @@ public void onComplete(TSyncLogEntriesRes response) {
}
completeBatch(batch);
}
if (response.isSetReceiverMemSize()) {
LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize());
LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize());
}
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - createTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,16 @@ public void addTLogEntry(TLogEntry entry) {
}

public boolean canAccumulate() {
// When reading entries from the WAL, the memory size is calculated based on the serialized
// size, which can be significantly smaller than the actual size.
// Thus, we add a multiplier to sender's memory size to estimate the receiver's memory cost.
// The multiplier is calculated based on the receiver's feedback.
long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get();
long senderMemSize = LogDispatcher.getSenderMemSizeSum().get();
double multiplier = senderMemSize > 0 ? (double) receiverMemSize / senderMemSize : 1.0;
multiplier = Math.max(multiplier, 1.0);
return logEntries.size() < config.getReplication().getMaxLogEntriesNumPerBatch()
&& memorySize < config.getReplication().getMaxSizePerBatch();
&& ((long) (memorySize * multiplier)) < config.getReplication().getMaxSizePerBatch();
}

public long getStartIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class LogDispatcher {
private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
private final AtomicLong logEntriesFromQueue = new AtomicLong(0);

private static final AtomicLong senderMemSizeSum = new AtomicLong(0);
private static final AtomicLong receiverMemSizeSum = new AtomicLong(0);

public LogDispatcher(
IoTConsensusServerImpl impl,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) {
Expand Down Expand Up @@ -591,4 +594,12 @@ private void constructBatchIndexedFromConsensusRequest(
request.getMemorySize()));
}
}

public static AtomicLong getReceiverMemSizeSum() {
return receiverMemSizeSum;
}

public static AtomicLong getSenderMemSizeSum() {
return senderMemSizeSum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
"execute TSyncLogEntriesReq for {} with result {}",
req.consensusGroupId,
writeStatus.subStatus);
return new TSyncLogEntriesRes(writeStatus.subStatus);
return new TSyncLogEntriesRes(writeStatus.subStatus)
.setReceiverMemSize(deserializedRequest.getMemorySize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct TSyncLogEntriesReq {

struct TSyncLogEntriesRes {
1: required list<common.TSStatus> statuses
2: optional i64 receiverMemSize
}

struct TInactivatePeerReq {
Expand Down
Loading