Skip to content

Commit 902aa35

Browse files
CaideyipiJackieTien97
authored andcommitted
[To dev/1.3] Pipe: Totally banned the receiver conversion (#16086) (#16105)
* Pipe: Totally banned the receiver conversion (#16086) * logger * ci-fix * partial * Delete client-go * fix * rename * fix-ci * revert-cp
1 parent 8c694e4 commit 902aa35

File tree

5 files changed

+31
-2
lines changed

5 files changed

+31
-2
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ private void prepareTypeConversionTest(
342342
senderSession.executeNonQueryStatement("flush");
343343
} else {
344344
// Send Tablet data to receiver
345+
// Write once to create data regions, guarantee that no any tsFiles will be sent
346+
consumer.accept(senderSession, receiverSession, tablet);
345347
createDataPipe(uuid, false);
346348
Thread.sleep(2000);
347349
// The actual implementation logic of inserting data
@@ -390,12 +392,13 @@ private void createDataPipe(String diff, boolean isTSFile) {
390392
String sql =
391393
String.format(
392394
"create pipe test%s"
393-
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
395+
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
394396
+ " with processor ('processor'='do-nothing-processor')"
395397
+ " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
396398
diff,
397399
isTSFile ? "file" : "forced-log",
398400
!isTSFile,
401+
isTSFile,
399402
receiverEnv.getIP(),
400403
receiverEnv.getPort(),
401404
isTSFile ? "tsfile" : "tablet");

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.receiver.visitor;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2324
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
2425
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
2526
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
@@ -87,7 +88,8 @@ public Optional<TSStatus> visitLoadFile(
8788
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
8889
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
8990
// Ignore the error if it is caused by insufficient memory
90-
|| (status.getMessage() != null && status.getMessage().contains("memory"))) {
91+
|| (status.getMessage() != null && status.getMessage().contains("memory"))
92+
|| !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
9193
return Optional.empty();
9294
}
9395

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ public class CommonConfig {
299299
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
300300
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
301301

302+
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
303+
private boolean pipeReceiverLoadConversionEnabled = false;
302304
private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; // Deprecated
303305
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated
304306
private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
@@ -1569,6 +1571,18 @@ public long getPipeMaxAllowedLinkedTsFileCount() {
15691571
return pipeMaxAllowedLinkedTsFileCount;
15701572
}
15711573

1574+
public boolean isPipeReceiverLoadConversionEnabled() {
1575+
return pipeReceiverLoadConversionEnabled;
1576+
}
1577+
1578+
public void setPipeReceiverLoadConversionEnabled(boolean pipeReceiverLoadConversionEnabled) {
1579+
if (this.pipeReceiverLoadConversionEnabled == pipeReceiverLoadConversionEnabled) {
1580+
return;
1581+
}
1582+
this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
1583+
logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled);
1584+
}
1585+
15721586
public void setPipeMaxAllowedLinkedTsFileCount(long pipeMaxAllowedLinkedTsFileCount) {
15731587
if (this.pipeMaxAllowedLinkedTsFileCount == pipeMaxAllowedLinkedTsFileCount) {
15741588
return;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ public long getPipeMaxAllowedLinkedTsFileCount() {
345345
return COMMON_CONFIG.getPipeMaxAllowedLinkedTsFileCount();
346346
}
347347

348+
public boolean isPipeReceiverLoadConversionEnabled() {
349+
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
350+
}
351+
348352
/////////////////////////////// Logger ///////////////////////////////
349353

350354
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -576,6 +580,7 @@ public void printAllConfigs() {
576580
"PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
577581
getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
578582
LOGGER.info("PipeMaxAllowedLinkedTsFileCount: {}", getPipeMaxAllowedLinkedTsFileCount());
583+
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());
579584

580585
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound());
581586
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
458458
properties.getProperty(
459459
"pipe_max_allowed_linked_tsfile_count",
460460
String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
461+
config.setPipeReceiverLoadConversionEnabled(
462+
Boolean.parseBoolean(
463+
properties.getProperty(
464+
"pipe_receiver_load_conversion_enabled",
465+
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
461466

462467
config.setPipeMemoryAllocateMaxRetries(
463468
Integer.parseInt(

0 commit comments

Comments
 (0)