Skip to content

Commit b8d829c

Browse files
authored
[To dev/1.3] Pipe: Totally banned the receiver conversion (#16086) (#16105)
* Pipe: Totally banned the receiver conversion (#16086) * logger * ci-fix * partial * Delete client-go * fix * rename * fix-ci * revert-cp
1 parent 1ca6002 commit b8d829c

File tree

5 files changed

+30
-2
lines changed

5 files changed

+30
-2
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ private void prepareTypeConversionTest(
342342
senderSession.executeNonQueryStatement("flush");
343343
} else {
344344
// Send Tablet data to receiver
345+
// Write once to create data regions, guarantee that no any tsFiles will be sent
346+
consumer.accept(senderSession, receiverSession, tablet);
345347
createDataPipe(uuid, false);
346348
Thread.sleep(2000);
347349
// The actual implementation logic of inserting data
@@ -390,12 +392,13 @@ private void createDataPipe(String diff, boolean isTSFile) {
390392
String sql =
391393
String.format(
392394
"create pipe test%s"
393-
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='true')"
395+
+ " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')"
394396
+ " with processor ('processor'='do-nothing-processor')"
395397
+ " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
396398
diff,
397399
isTSFile ? "file" : "forced-log",
398400
!isTSFile,
401+
isTSFile,
399402
receiverEnv.getIP(),
400403
receiverEnv.getPort(),
401404
isTSFile ? "tsfile" : "tablet");

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.receiver.visitor;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2324
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
2425
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
2526
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
@@ -87,7 +88,8 @@ public Optional<TSStatus> visitLoadFile(
8788
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
8889
if (status.getCode() != TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
8990
// Ignore the error if it is caused by insufficient memory
90-
|| (status.getMessage() != null && status.getMessage().contains("memory"))) {
91+
|| (status.getMessage() != null && status.getMessage().contains("memory"))
92+
|| !PipeConfig.getInstance().isPipeReceiverLoadConversionEnabled()) {
9193
return Optional.empty();
9294
}
9395

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ public class CommonConfig {
298298
private double pipeReceiverActualToEstimatedMemoryRatio = 3;
299299

300300
private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB
301+
private boolean pipeReceiverLoadConversionEnabled = false;
301302

302303
private double pipeMetaReportMaxLogNumPerRound = 0.1;
303304
private int pipeMetaReportMaxLogIntervalRounds = 360;
@@ -1503,6 +1504,18 @@ public void setPipeReceiverReqDecompressedMaxLengthInBytes(
15031504
pipeReceiverReqDecompressedMaxLengthInBytes);
15041505
}
15051506

1507+
public boolean isPipeReceiverLoadConversionEnabled() {
1508+
return pipeReceiverLoadConversionEnabled;
1509+
}
1510+
1511+
public void setPipeReceiverLoadConversionEnabled(boolean pipeReceiverLoadConversionEnabled) {
1512+
if (this.pipeReceiverLoadConversionEnabled == pipeReceiverLoadConversionEnabled) {
1513+
return;
1514+
}
1515+
this.pipeReceiverLoadConversionEnabled = pipeReceiverLoadConversionEnabled;
1516+
logger.info("pipeReceiverConversionEnabled is set to {}.", pipeReceiverLoadConversionEnabled);
1517+
}
1518+
15061519
public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
15071520
return pipeReceiverReqDecompressedMaxLengthInBytes;
15081521
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,10 @@ public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
343343
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
344344
}
345345

346+
public boolean isPipeReceiverLoadConversionEnabled() {
347+
return COMMON_CONFIG.isPipeReceiverLoadConversionEnabled();
348+
}
349+
346350
/////////////////////////////// Logger ///////////////////////////////
347351

348352
public double getPipeMetaReportMaxLogNumPerRound() {
@@ -574,6 +578,7 @@ public void printAllConfigs() {
574578
LOGGER.info(
575579
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
576580
getPipeReceiverReqDecompressedMaxLengthInBytes());
581+
LOGGER.info("PipeReceiverLoadConversionEnabled: {}", isPipeReceiverLoadConversionEnabled());
577582

578583
LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound());
579584
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
432432
properties.getProperty(
433433
"pipe_receiver_req_decompressed_max_length_in_bytes",
434434
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));
435+
config.setPipeReceiverLoadConversionEnabled(
436+
Boolean.parseBoolean(
437+
properties.getProperty(
438+
"pipe_receiver_load_conversion_enabled",
439+
String.valueOf(config.isPipeReceiverLoadConversionEnabled()))));
435440

436441
config.setPipeMemoryAllocateMaxRetries(
437442
Integer.parseInt(

0 commit comments

Comments
 (0)