Skip to content

Commit 2df8710

Browse files
authored
Pipe: Cleaned some questionable parameters & Fixed unstable testPipeAfterDataRegionLeaderStop (apache#16555)
* clean * move * fix * Update CommonConfig.java * s-cle * next * fix * fix * fix-unstable * fix
1 parent 18964f4 commit 2df8710

File tree

15 files changed

+232
-333
lines changed

15 files changed

+232
-333
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java

Lines changed: 170 additions & 172 deletions
Large diffs are not rendered by default.

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public enum TSStatusCode {
106106
WRITE_PROCESS_REJECT(606),
107107
OUT_OF_TTL(607),
108108
COMPACTION_ERROR(608),
109+
@Deprecated
109110
ALIGNED_TIMESERIES_ERROR(609),
110111
WAL_ERROR(610),
111112
DISK_SPACE_INSUFFICIENT(611),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,6 +1150,8 @@ public class IoTDBConfig {
11501150
private String loadDiskSelectStrategyForIoTV2AndPipe =
11511151
LoadDiskSelectorType.INHERIT_LOAD.getValue();
11521152

1153+
private boolean skipFailedTableSchemaCheck = false;
1154+
11531155
/** Pipe related */
11541156
/** initialized as empty, updated based on the latest `systemDir` during querying */
11551157
private String[] pipeReceiverFileDirs = new String[0];
@@ -3932,6 +3934,18 @@ public void setLoadDiskSelectStrategyForIoTV2AndPipe(
39323934
this.loadDiskSelectStrategyForIoTV2AndPipe = loadDiskSelectStrategyForIoTV2AndPipe;
39333935
}
39343936

3937+
public boolean isSkipFailedTableSchemaCheck() {
3938+
return skipFailedTableSchemaCheck;
3939+
}
3940+
3941+
public void setSkipFailedTableSchemaCheck(boolean skipFailedTableSchemaCheck) {
3942+
if (this.skipFailedTableSchemaCheck == skipFailedTableSchemaCheck) {
3943+
return;
3944+
}
3945+
this.skipFailedTableSchemaCheck = skipFailedTableSchemaCheck;
3946+
logger.info("skipFailedTableSchemaCheck is set to {}.", skipFailedTableSchemaCheck);
3947+
}
3948+
39353949
public long getLoadActiveListeningCheckIntervalSeconds() {
39363950
return loadActiveListeningCheckIntervalSeconds;
39373951
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2229,7 +2229,7 @@ public void reloadMetricProperties(TrimProperties properties) {
22292229
}
22302230
}
22312231

2232-
private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
2232+
private void loadLoadTsFileProps(TrimProperties properties) {
22332233
conf.setMaxAllocateMemoryRatioForLoad(
22342234
Double.parseDouble(
22352235
properties.getProperty(
@@ -2380,6 +2380,12 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
23802380
properties.getProperty(
23812381
"cache_last_values_memory_budget_in_byte",
23822382
String.valueOf(conf.getCacheLastValuesMemoryBudgetInByte()))));
2383+
2384+
conf.setSkipFailedTableSchemaCheck(
2385+
Boolean.parseBoolean(
2386+
properties.getProperty(
2387+
"skip_failed_table_schema_check",
2388+
String.valueOf(conf.isSkipFailedTableSchemaCheck()))));
23832389
}
23842390

23852391
private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOException {
@@ -2427,6 +2433,12 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE
24272433
properties.getProperty(
24282434
"load_tsfile_split_partition_max_size",
24292435
Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
2436+
2437+
conf.setSkipFailedTableSchemaCheck(
2438+
Boolean.parseBoolean(
2439+
properties.getProperty(
2440+
"skip_failed_table_schema_check",
2441+
String.valueOf(conf.isSkipFailedTableSchemaCheck()))));
24302442
}
24312443

24322444
private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,12 +786,12 @@ private void calculateInsertNodeQueueMemory(final PipeParameters sourceParameter
786786
final long remainingMemory =
787787
PipeDataNodeResourceManager.memory().getTotalFloatingMemorySizeInBytes()
788788
- allocatedMemorySizeInBytes;
789-
if (remainingMemory < PipeConfig.getInstance().PipeInsertNodeQueueMemory()) {
789+
if (remainingMemory < PipeConfig.getInstance().getPipeInsertNodeQueueMemory()) {
790790
final String message =
791791
String.format(
792792
"%s Need Floating memory: %d bytes, free Floating memory: %d bytes",
793793
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
794-
PipeConfig.getInstance().PipeInsertNodeQueueMemory(),
794+
PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
795795
remainingMemory);
796796
LOGGER.warn(message);
797797
throw new PipeException(message);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ public TsFileInsertionEventScanParser(
110110
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
111111
this.allocatedMemoryBlockForChunk =
112112
PipeDataNodeResourceManager.memory()
113-
.forceAllocateForTabletWithRetry(
114-
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
113+
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());
115114

116115
try {
117116
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath(), false, false);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public TsFileInsertionEventTableParser(
7777
this.allocatedMemoryBlockForChunk =
7878
PipeDataNodeResourceManager.memory()
7979
.forceAllocateForTabletWithRetry(
80-
PipeConfig.getInstance().getPipeMaxAlignedSeriesChunkSizeInOneBatch());
80+
PipeConfig.getInstance().getPipeMaxReaderChunkSize());
8181
this.allocatedMemoryBlockForBatchData =
8282
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize);
8383
this.allocatedMemoryBlockForChunkMeta =

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final T
133133
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
134134
.setMessage(context.getMessage());
135135
} else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
136-
|| context.getCode() == TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()
137136
|| context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
138137
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
139138
.setMessage(context.getMessage());
@@ -168,10 +167,6 @@ private TSStatus visitGeneralCreateMultiTimeseries(
168167
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
169168
&& status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
170169
&& status.getCode() != TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
171-
if (status.getCode() == TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
172-
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
173-
.setMessage(context.getMessage());
174-
}
175170
return visitStatement(statement, context);
176171
}
177172
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -638,12 +638,6 @@ public long getUsedMemorySizeInBytesOfTsFiles() {
638638
return usedMemorySizeInBytesOfTsFiles;
639639
}
640640

641-
public long getAllocatedMemorySizeInBytesOfBatch() {
642-
return (long)
643-
(PipeConfig.getInstance().getPipeDataStructureBatchMemoryProportion()
644-
* getTotalNonFloatingMemorySizeInBytes());
645-
}
646-
647641
public long getFreeMemorySizeInBytes() {
648642
return memoryBlock.getFreeMemoryInBytes();
649643
}

0 commit comments

Comments
 (0)