Skip to content

Commit 2738e7a

Browse files
authored
Pipe: Fix HistoricalDataRegionTsFileAndDeletionSource double-living parameter failure (apache#16667)
1 parent 51b1123 commit 2738e7a

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void testTsFileDecompositionWithMods() throws Exception {
112112
.append(",")
113113
.append(3.0f)
114114
.append(")");
115-
if (i % 100 != 0) {
115+
if (i % 50 != 0) {
116116
insertBuilder.append(",");
117117
} else {
118118
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder.toString());
@@ -254,7 +254,7 @@ public void testTsFileDecompositionWithMods2() throws Exception {
254254
.append(",")
255255
.append(3.0f)
256256
.append(")");
257-
if (i % 100 != 0) {
257+
if (i % 50 != 0) {
258258
insertBuilder.append(",");
259259
} else {
260260
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder.toString());
@@ -369,7 +369,7 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw
369369
StringBuilder insertBuilder1 = new StringBuilder(s1);
370370
for (int i = 1; i <= 20000; i++) {
371371
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
372-
if (i % 1000 != 0) {
372+
if (i % 50 != 0) {
373373
insertBuilder1.append(",");
374374
} else {
375375
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
@@ -386,7 +386,7 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw
386386
StringBuilder insertBuilder2 = new StringBuilder(s2);
387387
for (int i = 10001; i <= 30000; i++) {
388388
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
389-
if (i % 1000 != 0) {
389+
if (i % 50 != 0) {
390390
insertBuilder2.append(",");
391391
} else {
392392
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
@@ -403,7 +403,7 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw
403403
StringBuilder insertBuilder3 = new StringBuilder(s3);
404404
for (int i = 20001; i <= 40000; i++) {
405405
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
406-
if (i % 1000 != 0) {
406+
if (i % 50 != 0) {
407407
insertBuilder3.append(",");
408408
} else {
409409
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
@@ -420,7 +420,7 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw
420420
StringBuilder insertBuilder4 = new StringBuilder(s4);
421421
for (int i = 30001; i <= 50000; i++) {
422422
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
423-
if (i % 1000 != 0) {
423+
if (i % 50 != 0) {
424424
insertBuilder4.append(",");
425425
} else {
426426
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
@@ -437,7 +437,7 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw
437437
StringBuilder insertBuilder5 = new StringBuilder(s5);
438438
for (int i = 40001; i <= 60000; i++) {
439439
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
440-
if (i % 1000 != 0) {
440+
if (i % 50 != 0) {
441441
insertBuilder5.append(",");
442442
} else {
443443
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -373,12 +373,22 @@ public void customize(
373373

374374
skipIfNoPrivileges = getSkipIfNoPrivileges(parameters);
375375

376-
isForwardingPipeRequests =
376+
final boolean isDoubleLiving =
377377
parameters.getBooleanOrDefault(
378378
Arrays.asList(
379-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
380-
PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
381-
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
379+
PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_KEY,
380+
PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY),
381+
PipeSourceConstant.EXTRACTOR_MODE_DOUBLE_LIVING_DEFAULT_VALUE);
382+
if (isDoubleLiving) {
383+
isForwardingPipeRequests = false;
384+
} else {
385+
isForwardingPipeRequests =
386+
parameters.getBooleanOrDefault(
387+
Arrays.asList(
388+
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
389+
PipeSourceConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY),
390+
PipeSourceConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
391+
}
382392

383393
if (LOGGER.isInfoEnabled()) {
384394
LOGGER.info(

0 commit comments

Comments
 (0)