Skip to content

Commit cd0c0a5

Browse files
authored
Load: Support converting mini TsFile into Tablets & Adjust the way to extract metrics & Fix file not delete when analysis cast happens & Pipe IT: Ignore IoTDBPipeProcessorIT.testTumblingTimeSamplingProcessor (#14784) (#15199)
1 parent e70d2ec commit cd0c0a5

File tree

10 files changed

+408
-142
lines changed

10 files changed

+408
-142
lines changed

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeProcessorIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import org.junit.Assert;
3434
import org.junit.Before;
35+
import org.junit.Ignore;
3536
import org.junit.Test;
3637
import org.junit.experimental.categories.Category;
3738
import org.junit.runner.RunWith;
@@ -74,6 +75,7 @@ public void setUp() {
7475
receiverEnv.initClusterEnvironment();
7576
}
7677

78+
@Ignore
7779
@Test
7880
public void testTumblingTimeSamplingProcessor() throws Exception {
7981
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,7 @@ public class IoTDBConfig {
11371137
/** Load related */
11381138
private double maxAllocateMemoryRatioForLoad = 0.8;
11391139

1140+
private int loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount = 4096;
11401141
private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
11411142
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
11421143
0L; // 0 means that the decision will be adaptive based on the number of sequences
@@ -1152,6 +1153,8 @@ public class IoTDBConfig {
11521153

11531154
private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s
11541155

1156+
private long loadTabletConversionThresholdBytes = -1;
1157+
11551158
private boolean loadActiveListeningEnable = true;
11561159

11571160
private String[] loadActiveListeningDirs =
@@ -3958,6 +3961,16 @@ public void setMaxAllocateMemoryRatioForLoad(double maxAllocateMemoryRatioForLoa
39583961
this.maxAllocateMemoryRatioForLoad = maxAllocateMemoryRatioForLoad;
39593962
}
39603963

3964+
public int getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount() {
3965+
return loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount;
3966+
}
3967+
3968+
public void setLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount(
3969+
int loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount) {
3970+
this.loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount =
3971+
loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount;
3972+
}
3973+
39613974
public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() {
39623975
return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
39633976
}
@@ -4026,6 +4039,14 @@ public void setLoadWriteThroughputBytesPerSecond(double loadWriteThroughputBytes
40264039
this.loadWriteThroughputBytesPerSecond = loadWriteThroughputBytesPerSecond;
40274040
}
40284041

4042+
public long getLoadTabletConversionThresholdBytes() {
4043+
return loadTabletConversionThresholdBytes;
4044+
}
4045+
4046+
public void setLoadTabletConversionThresholdBytes(long loadTabletConversionThresholdBytes) {
4047+
this.loadTabletConversionThresholdBytes = loadTabletConversionThresholdBytes;
4048+
}
4049+
40294050
public int getLoadActiveListeningMaxThreadNum() {
40304051
return loadActiveListeningMaxThreadNum;
40314052
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,6 +2362,12 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
23622362
properties.getProperty(
23632363
"max_allocate_memory_ratio_for_load",
23642364
String.valueOf(conf.getMaxAllocateMemoryRatioForLoad()))));
2365+
conf.setLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount(
2366+
Integer.parseInt(
2367+
properties.getProperty(
2368+
"load_tsfile_analyze_schema_batch_read_time_series_metadata_count",
2369+
String.valueOf(
2370+
conf.getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount()))));
23652371
conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
23662372
Integer.parseInt(
23672373
properties.getProperty(
@@ -2398,6 +2404,11 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
23982404
properties.getProperty(
23992405
"load_write_throughput_bytes_per_second",
24002406
String.valueOf(conf.getLoadWriteThroughputBytesPerSecond()))));
2407+
conf.setLoadTabletConversionThresholdBytes(
2408+
Long.parseLong(
2409+
properties.getProperty(
2410+
"load_tablet_conversion_threshold_bytes",
2411+
String.valueOf(conf.getLoadTabletConversionThresholdBytes()))));
24012412

24022413
conf.setLoadActiveListeningEnable(
24032414
Boolean.parseBoolean(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@
157157
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
158158
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
159159
import org.apache.iotdb.db.schemaengine.template.Template;
160-
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
161160
import org.apache.iotdb.db.utils.constant.SqlConstant;
162161
import org.apache.iotdb.rpc.RpcUtils;
163162
import org.apache.iotdb.rpc.TSStatusCode;
@@ -216,7 +215,6 @@
216215
import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
217216
import static org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
218217
import static org.apache.iotdb.db.schemaengine.schemaregion.view.visitor.GetSourcePathsVisitor.getSourcePaths;
219-
import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
220218
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME_HEADER;
221219

222220
/** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
@@ -3039,10 +3037,9 @@ private InsertBaseStatement removeLogicalView(
30393037
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
30403038
context.setQueryType(QueryType.WRITE);
30413039

3042-
final long startTime = System.nanoTime();
30433040
try (final LoadTsFileAnalyzer loadTsfileAnalyzer =
30443041
new LoadTsFileAnalyzer(loadTsFileStatement, context, partitionFetcher, schemaFetcher)) {
3045-
return loadTsfileAnalyzer.analyzeFileByFile(loadTsFileStatement.isDeleteAfterLoad());
3042+
return loadTsfileAnalyzer.analyzeFileByFile(new Analysis());
30463043
} catch (final Exception e) {
30473044
final String exceptionMessage =
30483045
String.format(
@@ -3054,9 +3051,6 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC
30543051
analysis.setFinishQueryAfterAnalyze(true);
30553052
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR, exceptionMessage));
30563053
return analysis;
3057-
} finally {
3058-
LoadTsFileCostMetricsSet.getInstance()
3059-
.recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
30603054
}
30613055
}
30623056

0 commit comments

Comments
 (0)