Skip to content

Commit 699c90e

Browse files
Load: add the function of transferring too many time partitions of files to tablets and fixed the problem that the data written to tablets is more than expected. (apache#16320)
* update * update * update * update * update * update * simplify * Update pom.xml * fix * fix * fix IT --------- Co-authored-by: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
1 parent 32bb39a commit 699c90e

File tree

9 files changed

+101
-15
lines changed

9 files changed

+101
-15
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,14 @@ public void setUp() throws Exception {
8383
tmpDir = new File(Files.createTempDirectory("load").toUri());
8484
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
8585
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
86+
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
87+
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1");
8688
EnvFactory.getEnv()
8789
.getConfig()
8890
.getDataNodeConfig()
8991
.setConnectionTimeoutInMS(connectionTimeoutInMS)
9092
.setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes);
93+
9194
EnvFactory.getEnv().initClusterEnvironment();
9295
}
9396

@@ -224,7 +227,7 @@ public void testLoad() throws Exception {
224227
generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL / 10_000, false);
225228
generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL / 10_000, false);
226229
generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL / 10_000, true);
227-
for (int i = 0; i < 10000; i++) {
230+
for (int i = 0; i < 1000; i++) {
228231
generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL - 10, true);
229232
}
230233
writtenPoint2 = generator.getTotalNumber();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,8 @@ public class IoTDBConfig {
11081108

11091109
private long loadMeasurementIdCacheSizeInBytes = 2 * 1024 * 1024L; // 2MB
11101110

1111+
private int loadTsFileSpiltPartitionMaxSize = 10;
1112+
11111113
private String[] loadActiveListeningDirs =
11121114
new String[] {
11131115
IoTDBConstant.EXT_FOLDER_NAME
@@ -3997,6 +3999,27 @@ public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
39973999
this.pipeReceiverFileDirs = pipeReceiverFileDirs;
39984000
}
39994001

4002+
public int getLoadTsFileSpiltPartitionMaxSize() {
4003+
return loadTsFileSpiltPartitionMaxSize;
4004+
}
4005+
4006+
public void setLoadTsFileSpiltPartitionMaxSize(int loadTsFileSpiltPartitionMaxSize) {
4007+
if (loadTsFileSpiltPartitionMaxSize <= 0) {
4008+
throw new IllegalArgumentException(
4009+
"loadTsFileSpiltPartitionMaxSize should be greater than or equal to 0");
4010+
}
4011+
4012+
if (this.loadTsFileSpiltPartitionMaxSize == loadTsFileSpiltPartitionMaxSize) {
4013+
return;
4014+
}
4015+
4016+
logger.info(
4017+
"Set loadTsFileSpiltPartitionMaxSize from {} to {}",
4018+
this.loadTsFileSpiltPartitionMaxSize,
4019+
loadTsFileSpiltPartitionMaxSize);
4020+
this.loadTsFileSpiltPartitionMaxSize = loadTsFileSpiltPartitionMaxSize;
4021+
}
4022+
40004023
public String[] getPipeReceiverFileDirs() {
40014024
return (Objects.isNull(this.pipeReceiverFileDirs) || this.pipeReceiverFileDirs.length == 0)
40024025
? new String[] {systemDir + File.separator + "pipe" + File.separator + "receiver"}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2404,6 +2404,12 @@ private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws IOE
24042404
properties.getProperty(
24052405
"load_active_listening_fail_dir",
24062406
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
2407+
2408+
conf.setLoadTsFileSpiltPartitionMaxSize(
2409+
Integer.parseInt(
2410+
properties.getProperty(
2411+
"load_tsfile_split_partition_max_size",
2412+
Integer.toString(conf.getLoadTsFileSpiltPartitionMaxSize()))));
24072413
}
24082414

24092415
private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
4141
import org.apache.tsfile.read.reader.IChunkReader;
4242
import org.apache.tsfile.read.reader.chunk.TableChunkReader;
43+
import org.apache.tsfile.utils.Binary;
4344
import org.apache.tsfile.utils.DateUtils;
4445
import org.apache.tsfile.utils.Pair;
4546
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -394,6 +395,13 @@ private void fillMeasurementValueColumns(
394395
for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
395396
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
396397
if (primitiveType == null) {
398+
switch (dataTypeList.get(i)) {
399+
case TEXT:
400+
case BLOB:
401+
case STRING:
402+
tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
403+
}
404+
tablet.getBitMaps()[i].mark(rowIndex);
397405
continue;
398406
}
399407

@@ -420,7 +428,11 @@ private void fillMeasurementValueColumns(
420428
case TEXT:
421429
case BLOB:
422430
case STRING:
423-
tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues());
431+
Binary binary = primitiveType.getBinary();
432+
tablet.addValue(
433+
rowIndex,
434+
i,
435+
binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() : binary.getValues());
424436
break;
425437
default:
426438
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
@@ -431,11 +443,20 @@ private void fillMeasurementValueColumns(
431443
private void fillDeviceIdColumns(
432444
final IDeviceID deviceID, final Tablet tablet, final int rowIndex) {
433445
final String[] deviceIdSegments = (String[]) deviceID.getSegments();
434-
for (int i = 1, totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
446+
int i = 1;
447+
for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
435448
if (deviceIdSegments[i] == null) {
449+
tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
450+
tablet.getBitMaps()[i - 1].mark(rowIndex);
436451
continue;
437452
}
438453
tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
439454
}
455+
456+
while (i <= deviceIdSize) {
457+
tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
458+
tablet.getBitMaps()[i - 1].mark(rowIndex);
459+
i++;
460+
}
440461
}
441462
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ private Map<IDeviceID, PartitionSplitInfo> collectSplitRanges() {
231231

232232
Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new LinkedHashMap<>();
233233

234-
for (int i = 1; i < times.length; i++) { // times are sorted in session API.
234+
for (int i = 1; i < rowCount; i++) { // times are sorted in session API.
235235
IDeviceID nextDeviceId = getDeviceID(i);
236236
if (times[i] >= upperBoundOfTimePartition || !currDeviceId.equals(nextDeviceId)) {
237237
final PartitionSplitInfo splitInfo =
@@ -253,7 +253,7 @@ private Map<IDeviceID, PartitionSplitInfo> collectSplitRanges() {
253253
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId, deviceID1 -> new PartitionSplitInfo());
254254
// the final range
255255
splitInfo.ranges.add(startLoc); // included
256-
splitInfo.ranges.add(times.length); // excluded
256+
splitInfo.ranges.add(rowCount); // excluded
257257
splitInfo.timePartitionSlots.add(timePartitionSlot);
258258

259259
return deviceIDSplitInfoMap;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,18 @@ public InsertTabletStatement(
124124
}
125125

126126
private Object convertTableColumn(final Object input) {
127-
return input instanceof LocalDate[]
128-
? Arrays.stream(((LocalDate[]) input))
129-
.map(date -> Objects.nonNull(date) ? DateUtils.parseDateExpressionToInt(date) : 0)
130-
.mapToInt(Integer::intValue)
131-
.toArray()
132-
: input;
127+
if (input instanceof LocalDate[]) {
128+
return Arrays.stream(((LocalDate[]) input))
129+
.map(date -> Objects.nonNull(date) ? DateUtils.parseDateExpressionToInt(date) : 0)
130+
.mapToInt(Integer::intValue)
131+
.toArray();
132+
} else if (input instanceof Binary[]) {
133+
return Arrays.stream(((Binary[]) input))
134+
.map(binary -> Objects.nonNull(binary) ? binary : Binary.EMPTY_VALUE)
135+
.toArray(Binary[]::new);
136+
}
137+
138+
return input;
133139
}
134140

135141
public InsertTabletStatement(InsertTabletNode node) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,19 @@ public class LoadTsFileDataTypeConverter {
6262

6363
private final SqlParser relationalSqlParser = new SqlParser();
6464
private final LoadTableStatementDataTypeConvertExecutionVisitor
65-
tableStatementDataTypeConvertExecutionVisitor =
66-
new LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
65+
tableStatementDataTypeConvertExecutionVisitor;
6766
private final LoadTreeStatementDataTypeConvertExecutionVisitor
68-
treeStatementDataTypeConvertExecutionVisitor =
69-
new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
67+
treeStatementDataTypeConvertExecutionVisitor;
7068

7169
public LoadTsFileDataTypeConverter(
7270
final MPPQueryContext context, final boolean isGeneratedByPipe) {
7371
this.context = context;
7472
this.isGeneratedByPipe = isGeneratedByPipe;
73+
74+
tableStatementDataTypeConvertExecutionVisitor =
75+
new LoadTableStatementDataTypeConvertExecutionVisitor(this::executeForTableModel);
76+
treeStatementDataTypeConvertExecutionVisitor =
77+
new LoadTreeStatementDataTypeConvertExecutionVisitor(this::executeForTreeModel);
7578
}
7679

7780
public Optional<TSStatus> convertForTableModel(final LoadTsFile loadTsFileTableStatement) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.utils.TimePartitionUtils;
24+
import org.apache.iotdb.db.conf.IoTDBConfig;
25+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2426
import org.apache.iotdb.db.exception.load.LoadFileException;
2527
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
2628
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
@@ -62,6 +64,8 @@
6264
public class TsFileSplitter {
6365
private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class);
6466

67+
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
68+
6569
private final File tsFile;
6670
private final TsFileDataConsumer consumer;
6771
private Map<Long, IChunkMetadata> offset2ChunkMetadata = new HashMap<>();
@@ -72,6 +76,7 @@ public class TsFileSplitter {
7276
private IDeviceID curDevice = null;
7377
private boolean isAligned;
7478
private int timeChunkIndexOfCurrentValueColumn = 0;
79+
private Set<TTimePartitionSlot> timePartitionSlots = new HashSet<>();
7580

7681
// Maintain the number of times the chunk of each measurement appears.
7782
private Map<String, Integer> valueColumn2TimeChunkIndex = new HashMap<>();
@@ -445,6 +450,14 @@ private void consumeAllAlignedChunkData(
445450
}
446451
}
447452
for (AlignedChunkData chunkData : chunkDataMap.keySet()) {
453+
timePartitionSlots.add(chunkData.getTimePartitionSlot());
454+
if (deletions.isEmpty()
455+
&& timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
456+
throw new LoadFileException(
457+
String.format(
458+
"Time partition slots size is greater than %s",
459+
CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
460+
}
448461
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
449462
throw new IllegalStateException(
450463
String.format(
@@ -457,6 +470,14 @@ private void consumeAllAlignedChunkData(
457470

458471
private void consumeChunkData(String measurement, long offset, ChunkData chunkData)
459472
throws LoadFileException {
473+
timePartitionSlots.add(chunkData.getTimePartitionSlot());
474+
if (deletions.isEmpty()
475+
&& timePartitionSlots.size() > CONFIG.getLoadTsFileSpiltPartitionMaxSize()) {
476+
throw new LoadFileException(
477+
String.format(
478+
"Time partition slots size is greater than %s",
479+
CONFIG.getLoadTsFileSpiltPartitionMaxSize()));
480+
}
460481
if (Boolean.FALSE.equals(consumer.apply(chunkData))) {
461482
throw new IllegalStateException(
462483
String.format(

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ public void testSplitInsertTablet() throws IllegalPathException {
204204
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
205205
insertTabletNode.setColumns(
206206
new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
207+
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
207208

208209
DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
209210
dataPartitionQueryParam.setDeviceID(
@@ -314,6 +315,7 @@ public void testInsertMultiTablets() throws IllegalPathException {
314315
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
315316
insertTabletNode.setColumns(
316317
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
318+
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
317319
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
318320

319321
insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 3"));
@@ -322,6 +324,7 @@ public void testInsertMultiTablets() throws IllegalPathException {
322324
insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32});
323325
insertTabletNode.setColumns(
324326
new Object[] {new int[] {10, 20, 30, 40, 50, 60, 70, 80, 90, 100}});
327+
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
325328
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, 2 * i);
326329
}
327330

0 commit comments

Comments
 (0)