Skip to content

Commit 5c4c947

Browse files
authored
Pipe: Removed the useless ser-de in receiver raw req & Improved the handling logic for rowCount and null value bitmaps in insertTabletNode (#16133)
* req * test * fix * test * partial * bug-fix * completion
1 parent 5a9f628 commit 5c4c947

File tree

12 files changed

+96
-109
lines changed

12 files changed

+96
-109
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public PipeConvertedInsertRowStatement(final InsertRowStatement insertRowStateme
5353
measurements = insertRowStatement.getMeasurements();
5454
dataTypes = insertRowStatement.getDataTypes();
5555
columnCategories = insertRowStatement.getColumnCategories();
56-
idColumnIndices = insertRowStatement.getIdColumnIndices();
56+
tagColumnIndices = insertRowStatement.getTagColumnIndices();
5757
attrColumnIndices = insertRowStatement.getAttrColumnIndices();
5858
writeToTable = insertRowStatement.isWriteToTable();
5959
databaseName = insertRowStatement.getDatabaseName().orElse(null);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public PipeConvertedInsertTabletStatement(
4646
devicePath = insertTabletStatement.getDevicePath();
4747
isAligned = insertTabletStatement.isAligned();
4848
columnCategories = insertTabletStatement.getColumnCategories();
49-
idColumnIndices = insertTabletStatement.getIdColumnIndices();
49+
tagColumnIndices = insertTabletStatement.getTagColumnIndices();
5050
attrColumnIndices = insertTabletStatement.getAttrColumnIndices();
5151
writeToTable = insertTabletStatement.isWriteToTable();
5252
databaseName = insertTabletStatement.getDatabaseName().orElse(null);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,13 @@
2222
import org.apache.iotdb.commons.exception.MetadataException;
2323
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
2424
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
25-
import org.apache.iotdb.commons.utils.PathUtils;
2625
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
27-
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
2826
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
2927
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
30-
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
31-
import org.apache.iotdb.session.util.SessionUtils;
3228

3329
import org.apache.tsfile.utils.PublicBAOS;
3430
import org.apache.tsfile.utils.ReadWriteIOUtils;
3531
import org.apache.tsfile.write.record.Tablet;
36-
import org.apache.tsfile.write.schema.IMeasurementSchema;
3732
import org.slf4j.Logger;
3833
import org.slf4j.LoggerFactory;
3934

@@ -68,22 +63,7 @@ public InsertTabletStatement constructStatement() {
6863
return new InsertTabletStatement();
6964
}
7065

71-
final TSInsertTabletReq request = new TSInsertTabletReq();
72-
73-
for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
74-
request.addToMeasurements(measurementSchema.getMeasurementName());
75-
request.addToTypes(measurementSchema.getType().ordinal());
76-
}
77-
78-
request.setPrefixPath(tablet.getDeviceId());
79-
request.setIsAligned(isAligned);
80-
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
81-
request.setValues(SessionUtils.getValueBuffer(tablet));
82-
request.setSize(tablet.getRowSize());
83-
request.setMeasurements(
84-
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
85-
86-
return StatementGenerator.createStatement(request);
66+
return new InsertTabletStatement(tablet, isAligned, null);
8767
} catch (final MetadataException e) {
8868
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
8969
return null;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,21 @@
2222
import org.apache.iotdb.commons.exception.MetadataException;
2323
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
2424
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
25-
import org.apache.iotdb.commons.utils.PathUtils;
2625
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
2726
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
28-
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
2927
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
3028
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
31-
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
32-
import org.apache.iotdb.session.util.SessionUtils;
3329

3430
import org.apache.tsfile.utils.PublicBAOS;
3531
import org.apache.tsfile.utils.ReadWriteIOUtils;
3632
import org.apache.tsfile.write.record.Tablet;
37-
import org.apache.tsfile.write.schema.IMeasurementSchema;
3833
import org.slf4j.Logger;
3934
import org.slf4j.LoggerFactory;
4035

4136
import java.io.DataOutputStream;
4237
import java.io.IOException;
4338
import java.nio.ByteBuffer;
4439
import java.util.Objects;
45-
import java.util.stream.Collectors;
4640

4741
import static org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty;
4842

@@ -70,35 +64,7 @@ public InsertTabletStatement constructStatement() {
7064
return new InsertTabletStatement();
7165
}
7266

73-
final TSInsertTabletReq request = new TSInsertTabletReq();
74-
75-
for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) {
76-
request.addToMeasurements(measurementSchema.getMeasurementName());
77-
request.addToTypes(measurementSchema.getType().ordinal());
78-
}
79-
80-
request.setPrefixPath(tablet.getDeviceId());
81-
request.setIsAligned(isAligned);
82-
request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
83-
request.setValues(SessionUtils.getValueBuffer(tablet));
84-
request.setSize(tablet.getRowSize());
85-
86-
// Tree model
87-
if (Objects.isNull(dataBaseName)) {
88-
request.setMeasurements(
89-
PathUtils.checkIsLegalSingleMeasurementsAndUpdate(request.getMeasurements()));
90-
return StatementGenerator.createStatement(request);
91-
}
92-
93-
// Table model
94-
request.setWriteToTable(true);
95-
request.columnCategories =
96-
tablet.getColumnTypes().stream()
97-
.map(t -> (byte) t.ordinal())
98-
.collect(Collectors.toList());
99-
final InsertTabletStatement statement = StatementGenerator.createStatement(request);
100-
statement.setDatabaseName(dataBaseName);
101-
return statement;
67+
return new InsertTabletStatement(tablet, isAligned, dataBaseName);
10268
} catch (final MetadataException e) {
10369
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
10470
return null;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.stream.Collectors;
5454

5555
public abstract class InsertNode extends SearchNode {
56-
5756
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
5857

5958
/**
@@ -69,7 +68,7 @@ public abstract class InsertNode extends SearchNode {
6968
protected TSDataType[] dataTypes;
7069

7170
protected TsTableColumnCategory[] columnCategories;
72-
protected List<Integer> idColumnIndices;
71+
protected List<Integer> tagColumnIndices;
7372
protected int measurementColumnCnt = -1;
7473

7574
protected int failedMeasurementNumber = 0;
@@ -339,7 +338,7 @@ public int getFailedMeasurementNumber() {
339338
public boolean allMeasurementFailed() {
340339
if (measurements != null) {
341340
return failedMeasurementNumber
342-
>= measurements.length - (idColumnIndices == null ? 0 : idColumnIndices.size());
341+
>= measurements.length - (tagColumnIndices == null ? 0 : tagColumnIndices.size());
343342
}
344343
return true;
345344
}
@@ -398,10 +397,10 @@ public TsTableColumnCategory[] getColumnCategories() {
398397
public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
399398
this.columnCategories = columnCategories;
400399
if (columnCategories != null) {
401-
idColumnIndices = new ArrayList<>();
400+
tagColumnIndices = new ArrayList<>();
402401
for (int i = 0; i < columnCategories.length; i++) {
403402
if (columnCategories[i].equals(TsTableColumnCategory.TAG)) {
404-
idColumnIndices.add(i);
403+
tagColumnIndices.add(i);
405404
}
406405
}
407406
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import static org.apache.iotdb.db.utils.CommonUtils.isAlive;
7575

7676
public class InsertTabletNode extends InsertNode implements WALEntryValue {
77-
7877
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not supported.";
7978

8079
protected long[] times; // times should be sorted. It is done in the session API.
@@ -528,17 +527,17 @@ private void writeDataTypes(DataOutputStream stream) throws IOException {
528527
}
529528
}
530529

531-
private void writeTimes(ByteBuffer buffer) {
530+
private void writeTimes(final ByteBuffer buffer) {
532531
ReadWriteIOUtils.write(rowCount, buffer);
533-
for (long time : times) {
534-
ReadWriteIOUtils.write(time, buffer);
532+
for (int i = 0; i < rowCount; ++i) {
533+
ReadWriteIOUtils.write(times[i], buffer);
535534
}
536535
}
537536

538-
private void writeTimes(DataOutputStream stream) throws IOException {
537+
private void writeTimes(final DataOutputStream stream) throws IOException {
539538
ReadWriteIOUtils.write(rowCount, stream);
540-
for (long time : times) {
541-
ReadWriteIOUtils.write(time, stream);
539+
for (int i = 0; i < rowCount; ++i) {
540+
ReadWriteIOUtils.write(times[i], stream);
542541
}
543542
}
544543

@@ -556,7 +555,7 @@ private void writeBitMaps(ByteBuffer buffer) {
556555
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), buffer);
557556
} else {
558557
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), buffer);
559-
buffer.put(bitMaps[i].getByteArray());
558+
buffer.put(bitMaps[i].getByteArray(), 0, BitMap.getSizeOfBytes(rowCount));
560559
}
561560
}
562561
}
@@ -576,7 +575,7 @@ private void writeBitMaps(DataOutputStream stream) throws IOException {
576575
ReadWriteIOUtils.write(BytesUtils.boolToByte(false), stream);
577576
} else {
578577
ReadWriteIOUtils.write(BytesUtils.boolToByte(true), stream);
579-
stream.write(bitMaps[i].getByteArray());
578+
stream.write(bitMaps[i].getByteArray(), 0, BitMap.getSizeOfBytes(rowCount));
580579
}
581580
}
582581
}
@@ -643,7 +642,7 @@ private void serializeColumn(TSDataType dataType, Object column, ByteBuffer buff
643642
case STRING:
644643
Binary[] binaryValues = (Binary[]) column;
645644
for (int j = 0; j < rowCount; j++) {
646-
if (binaryValues[j] != null) {
645+
if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
647646
ReadWriteIOUtils.write(binaryValues[j], buffer);
648647
} else {
649648
ReadWriteIOUtils.write(0, buffer);
@@ -695,7 +694,7 @@ private void serializeColumn(TSDataType dataType, Object column, DataOutputStrea
695694
case BLOB:
696695
Binary[] binaryValues = (Binary[]) column;
697696
for (int j = 0; j < rowCount; j++) {
698-
if (binaryValues[j] != null) {
697+
if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
699698
ReadWriteIOUtils.write(binaryValues[j], stream);
700699
} else {
701700
ReadWriteIOUtils.write(0, stream);
@@ -751,6 +750,7 @@ public void subDeserialize(ByteBuffer buffer) {
751750
bitMaps =
752751
QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize, rowCount).orElse(null);
753752
}
753+
754754
columns =
755755
QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes, measurementSize, rowCount);
756756
isAligned = buffer.get() == 1;
@@ -966,7 +966,7 @@ private void serializeColumn(
966966
case BLOB:
967967
Binary[] binaryValues = (Binary[]) column;
968968
for (int j = start; j < end; j++) {
969-
if (binaryValues[j] != null) {
969+
if (binaryValues[j] != null && binaryValues[j].getValues() != null) {
970970
buffer.putInt(binaryValues[j].getLength());
971971
buffer.put(binaryValues[j].getValues());
972972
} else {
@@ -1310,10 +1310,10 @@ public String toString() {
13101310
+ Arrays.toString(measurements)
13111311
+ ", rowCount="
13121312
+ rowCount
1313-
+ ", timeRange=[,"
1314-
+ times[0]
1315-
+ ", "
1316-
+ times[times.length - 1]
1313+
+ ", timeRange=["
1314+
+ (Objects.nonNull(times) && times.length > 0
1315+
? times[0] + ", " + times[times.length - 1]
1316+
: "")
13171317
+ "]"
13181318
+ '}';
13191319
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ public RelationalInsertRowNode(
8787
@Override
8888
public IDeviceID getDeviceID() {
8989
if (deviceID == null) {
90-
String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
90+
String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
9191
deviceIdSegments[0] = this.getTableName();
92-
for (int i = 0; i < idColumnIndices.size(); i++) {
93-
final Integer columnIndex = idColumnIndices.get(i);
92+
for (int i = 0; i < tagColumnIndices.size(); i++) {
93+
final Integer columnIndex = tagColumnIndices.get(i);
9494
deviceIdSegments[i + 1] =
9595
getValues()[columnIndex] != null ? getValues()[columnIndex].toString() : null;
9696
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ public IDeviceID getDeviceID(int rowIdx) {
5959
deviceIDs = new IDeviceID[getInsertRowNodeList().size()];
6060
}
6161
if (deviceIDs[rowIdx] == null) {
62-
String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
62+
String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
6363
deviceIdSegments[0] = this.getTableName();
64-
for (int i = 0; i < idColumnIndices.size(); i++) {
65-
final Integer columnIndex = idColumnIndices.get(i);
64+
for (int i = 0; i < tagColumnIndices.size(); i++) {
65+
final Integer columnIndex = tagColumnIndices.get(i);
6666
deviceIdSegments[i + 1] =
6767
((Object[]) getInsertRowNodeList().get(i).getValues()[columnIndex])[rowIdx].toString();
6868
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ public IDeviceID getDeviceID(int rowIdx) {
115115
deviceIDs = new IDeviceID[1];
116116
}
117117
if (deviceIDs[0] == null) {
118-
String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
118+
String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
119119
deviceIdSegments[0] = this.getTableName();
120-
for (int i = 0; i < idColumnIndices.size(); i++) {
121-
final Integer columnIndex = idColumnIndices.get(i);
120+
for (int i = 0; i < tagColumnIndices.size(); i++) {
121+
final Integer columnIndex = tagColumnIndices.get(i);
122122
Object idSeg = ((Object[]) columns[columnIndex])[0];
123123
boolean isNull =
124124
bitMaps != null && bitMaps[columnIndex] != null && bitMaps[columnIndex].isMarked(0);
@@ -132,10 +132,10 @@ public IDeviceID getDeviceID(int rowIdx) {
132132
deviceIDs = new IDeviceID[rowCount];
133133
}
134134
if (deviceIDs[rowIdx] == null) {
135-
String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
135+
String[] deviceIdSegments = new String[tagColumnIndices.size() + 1];
136136
deviceIdSegments[0] = this.getTableName();
137-
for (int i = 0; i < idColumnIndices.size(); i++) {
138-
final Integer columnIndex = idColumnIndices.get(i);
137+
for (int i = 0; i < tagColumnIndices.size(); i++) {
138+
final Integer columnIndex = tagColumnIndices.get(i);
139139
Object idSeg = ((Object[]) columns[columnIndex])[rowIdx];
140140
boolean isNull =
141141
bitMaps != null

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public abstract class InsertBaseStatement extends Statement implements Accountab
8787
protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
8888

8989
protected TsTableColumnCategory[] columnCategories;
90-
protected List<Integer> idColumnIndices;
90+
protected List<Integer> tagColumnIndices;
9191
protected List<Integer> attrColumnIndices;
9292
protected boolean writeToTable = false;
9393

@@ -319,19 +319,19 @@ public void setColumnCategory(TsTableColumnCategory columnCategory, int i) {
319319
columnCategories = new TsTableColumnCategory[measurements.length];
320320
}
321321
this.columnCategories[i] = columnCategory;
322-
this.idColumnIndices = null;
322+
this.tagColumnIndices = null;
323323
}
324324

325-
public List<Integer> getIdColumnIndices() {
326-
if (idColumnIndices == null && columnCategories != null) {
327-
idColumnIndices = new ArrayList<>();
325+
public List<Integer> getTagColumnIndices() {
326+
if (tagColumnIndices == null && columnCategories != null) {
327+
tagColumnIndices = new ArrayList<>();
328328
for (int i = 0; i < columnCategories.length; i++) {
329329
if (columnCategories[i].equals(TsTableColumnCategory.TAG)) {
330-
idColumnIndices.add(i);
330+
tagColumnIndices.add(i);
331331
}
332332
}
333333
}
334-
return idColumnIndices;
334+
return tagColumnIndices;
335335
}
336336

337337
public List<Integer> getAttrColumnIndices() {
@@ -432,7 +432,7 @@ public void removeAttributeColumns() {
432432
subRemoveAttributeColumns(columnsToKeep);
433433

434434
// to reconstruct indices
435-
idColumnIndices = null;
435+
tagColumnIndices = null;
436436
attrColumnIndices = null;
437437
}
438438

@@ -601,7 +601,7 @@ public void insertColumn(final int pos, final ColumnSchema columnSchema) {
601601
System.arraycopy(
602602
columnCategories, pos, tmpCategories, pos + 1, columnCategories.length - pos);
603603
columnCategories = tmpCategories;
604-
idColumnIndices = null;
604+
tagColumnIndices = null;
605605
}
606606
}
607607

@@ -626,7 +626,7 @@ public void swapColumn(int src, int target) {
626626
if (inputLocations != null) {
627627
CommonUtils.swapArray(inputLocations, src, target);
628628
}
629-
idColumnIndices = null;
629+
tagColumnIndices = null;
630630
}
631631

632632
public boolean isWriteToTable() {
@@ -707,7 +707,7 @@ public long ramBytesUsed() {
707707
+ RamUsageEstimator.shallowSizeOf(dataTypes)
708708
+ RamUsageEstimator.shallowSizeOf(columnCategories)
709709
// We assume that the integers are all cached by JVM
710-
+ shallowSizeOfList(idColumnIndices)
710+
+ shallowSizeOfList(tagColumnIndices)
711711
+ shallowSizeOfList(attrColumnIndices)
712712
+ shallowSizeOfList(logicalViewSchemaList)
713713
+ (Objects.nonNull(logicalViewSchemaList)

0 commit comments

Comments
 (0)