Skip to content

Commit ba027d0

Browse files
authored
Pipe: Changed the default value about batch delay configuration (#15657)
* Fix * Fix * Refactor batch * Update CommonConfig.java * Update PipeConnectorSubtask.java * Update CommonConfig.java * Update CommonConfig.java
1 parent 313a91e commit ba027d0

File tree

7 files changed

+49
-60
lines changed

7 files changed

+49
-60
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2323
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2424
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
25-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2625
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
2726
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2827
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
@@ -64,9 +63,6 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
6463
// when no event can be pulled.
6564
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
6665
new PipeHeartbeatEvent("cron", false);
67-
private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
68-
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds() * 1000;
69-
private long lastHeartbeatEventInjectTime = System.currentTimeMillis();
7066

7167
public PipeConnectorSubtask(
7268
final String taskID,
@@ -105,12 +101,8 @@ protected boolean executeOnce() {
105101
}
106102

107103
try {
108-
if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
109-
> CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
110-
transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
111-
}
112-
113104
if (Objects.isNull(event)) {
105+
transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
114106
return false;
115107
}
116108

@@ -187,8 +179,6 @@ private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
187179
e);
188180
}
189181

190-
lastHeartbeatEventInjectTime = System.currentTimeMillis();
191-
192182
event.onTransferred();
193183
PipeDataRegionConnectorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
194184
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@
4444
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
4545
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
4646
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
47+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE;
4748
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
4849
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY;
4950
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
50-
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
5151
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
5252
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE;
5353
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE;
@@ -93,15 +93,14 @@ public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
9393
final Integer requestMaxDelayInMillis =
9494
parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, SINK_IOTDB_BATCH_DELAY_MS_KEY);
9595
if (Objects.isNull(requestMaxDelayInMillis)) {
96-
final int requestMaxDelayInSeconds =
96+
final int requestMaxDelayConfig =
9797
parameters.getIntOrDefault(
9898
Arrays.asList(
9999
CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY, SINK_IOTDB_BATCH_DELAY_SECONDS_KEY),
100100
usingTsFileBatch
101-
? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE
102-
: CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
103-
requestMaxDelayInMs =
104-
requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : requestMaxDelayInSeconds * 1000;
101+
? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE * 1000
102+
: CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE);
103+
requestMaxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE : requestMaxDelayConfig;
105104
} else {
106105
requestMaxDelayInMs =
107106
requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : requestMaxDelayInMillis;
@@ -123,20 +122,18 @@ public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
123122
* duplicated.
124123
*
125124
* @param event the given {@link Event}
126-
* @return {@link Pair}<{@link TEndPoint}, {@link PipeTabletEventPlainBatch}> not null means this
127-
* {@link PipeTabletEventPlainBatch} can be transferred. the first element is the leader
128-
* endpoint to transfer to (might be null), the second element is the batch to be transferred.
129125
*/
130-
public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
131-
final TabletInsertionEvent event) throws IOException, WALPipeException {
126+
public synchronized void onEvent(final TabletInsertionEvent event)
127+
throws IOException, WALPipeException {
132128
if (!(event instanceof EnrichedEvent)) {
133129
LOGGER.warn(
134130
"Unsupported event {} type {} when building transfer request", event, event.getClass());
135-
return null;
131+
return;
136132
}
137133

138134
if (!useLeaderCache) {
139-
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : null;
135+
defaultBatch.onEvent(event);
136+
return;
140137
}
141138

142139
String deviceId = null;
@@ -147,35 +144,38 @@ public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
147144
}
148145

149146
if (Objects.isNull(deviceId)) {
150-
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : null;
147+
defaultBatch.onEvent(event);
148+
return;
151149
}
152150

153151
final TEndPoint endPoint =
154152
IoTDBDataNodeCacheLeaderClientManager.LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
155153
if (Objects.isNull(endPoint)) {
156-
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : null;
154+
defaultBatch.onEvent(event);
155+
return;
157156
}
158-
159-
final PipeTabletEventPlainBatch batch =
160-
endPointToBatch.computeIfAbsent(
157+
endPointToBatch
158+
.computeIfAbsent(
161159
endPoint,
162-
k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes));
163-
return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
160+
k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes))
161+
.onEvent(event);
164162
}
165163

166164
/** Get all batches that have at least 1 event. */
167-
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>> getAllNonEmptyBatches() {
168-
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyBatches = new ArrayList<>();
169-
if (!defaultBatch.isEmpty()) {
170-
nonEmptyBatches.add(new Pair<>(null, defaultBatch));
165+
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
166+
getAllNonEmptyAndShouldEmitBatches() {
167+
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyAndShouldEmitBatches =
168+
new ArrayList<>();
169+
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
170+
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
171171
}
172172
endPointToBatch.forEach(
173173
(endPoint, batch) -> {
174-
if (!batch.isEmpty()) {
175-
nonEmptyBatches.add(new Pair<>(endPoint, batch));
174+
if (!batch.isEmpty() && batch.shouldEmit()) {
175+
nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
176176
}
177177
});
178-
return nonEmptyBatches;
178+
return nonEmptyAndShouldEmitBatches;
179179
}
180180

181181
public boolean isEmpty() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
5050
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY;
5151
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
52+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE;
5253
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
53-
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
5454
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
5555
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_SECONDS_KEY;
5656
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
@@ -95,7 +95,7 @@ protected PipeConsensusTransferBatchReqBuilder(
9595
final long requestMaxBatchSizeInBytes =
9696
parameters.getLongOrDefault(
9797
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
98-
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
98+
CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE);
9999

100100
allocatedMemoryBlock =
101101
PipeDataNodeResourceManager.memory()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,17 +178,16 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
178178
}
179179

180180
if (isTabletBatchModeEnabled) {
181-
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
182-
tabletBatchBuilder.onEvent(tabletInsertionEvent);
183-
transferInBatchWithoutCheck(endPointAndBatch);
181+
tabletBatchBuilder.onEvent(tabletInsertionEvent);
182+
transferBatchedEventsIfNecessary();
184183
} else {
185184
transferInEventWithoutCheck(tabletInsertionEvent);
186185
}
187186
}
188187

189188
private void transferInBatchWithoutCheck(
190189
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
191-
throws IOException, WriteProcessException, InterruptedException {
190+
throws IOException, WriteProcessException {
192191
if (Objects.isNull(endPointAndBatch)) {
193192
return;
194193
}
@@ -419,14 +418,13 @@ public void transfer(final Event event) throws Exception {
419418
}
420419

421420
/** Try its best to commit data in order. Flush can also be a trigger to transfer batched data. */
422-
private void transferBatchedEventsIfNecessary()
423-
throws IOException, WriteProcessException, InterruptedException {
421+
private void transferBatchedEventsIfNecessary() throws IOException, WriteProcessException {
424422
if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
425423
return;
426424
}
427425

428426
for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
429-
tabletBatchBuilder.getAllNonEmptyBatches()) {
427+
tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
430428
transferInBatchWithoutCheck(endPointAndBatch);
431429
}
432430
}
@@ -552,7 +550,8 @@ private void transferQueuedEventsIfNecessary(final boolean forced) {
552550
private void retryTransfer(final TabletInsertionEvent tabletInsertionEvent) {
553551
if (isTabletBatchModeEnabled) {
554552
try {
555-
transferInBatchWithoutCheck(tabletBatchBuilder.onEvent(tabletInsertionEvent));
553+
tabletBatchBuilder.onEvent(tabletInsertionEvent);
554+
transferBatchedEventsIfNecessary();
556555
if (tabletInsertionEvent instanceof EnrichedEvent) {
557556
((EnrichedEvent) tabletInsertionEvent)
558557
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,8 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
119119

120120
try {
121121
if (isTabletBatchModeEnabled) {
122-
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
123-
tabletBatchBuilder.onEvent(tabletInsertionEvent);
124-
if (Objects.nonNull(endPointAndBatch)) {
125-
doTransferWrapper(endPointAndBatch);
126-
}
122+
tabletBatchBuilder.onEvent(tabletInsertionEvent);
123+
doTransferWrapper();
127124
} else {
128125
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
129126
doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
@@ -243,9 +240,9 @@ private void doTransfer(final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
243240
}
244241

245242
private void doTransferWrapper() throws IOException, WriteProcessException {
246-
for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyBatch :
247-
tabletBatchBuilder.getAllNonEmptyBatches()) {
248-
doTransferWrapper(nonEmptyBatch);
243+
for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyAndShouldEmitBatch :
244+
tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
245+
doTransferWrapper(nonEmptyAndShouldEmitBatch);
249246
}
250247
}
251248

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,14 @@ public class CommonConfig {
218218
private int pipeDataStructureTabletSizeInBytes = 2097152;
219219
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.2;
220220
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.2;
221-
private double pipeDataStructureWalMemoryProportion = 0.2;
222-
private double PipeDataStructureBatchMemoryProportion = 0.2;
221+
private double pipeDataStructureWalMemoryProportion = 0.3;
222+
private double PipeDataStructureBatchMemoryProportion = 0.1;
223223
private double pipeTotalFloatingMemoryProportion = 0.2;
224224

225225
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000;
226226
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L;
227-
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
227+
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;
228+
228229
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
229230
private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
230231

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,12 @@ public class PipeConnectorConstant {
7171

7272
public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY = "connector.batch.max-delay-ms";
7373
public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY = "sink.batch.max-delay-ms";
74+
public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 10;
7475

7576
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = "connector.batch.size-bytes";
7677
public static final String SINK_IOTDB_BATCH_SIZE_KEY = "sink.batch.size-bytes";
77-
public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
78+
public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = MB;
79+
public static final long CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
7880
public static final long CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE = 80 * MB;
7981

8082
public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";

0 commit comments

Comments
 (0)