Skip to content

Commit 6d2a1cf

Browse files
authored
[IOTDB-4282] Supply deserialization for DeleteDataPlan in MultiLeaderConsensus (#7205)
1 parent 3a9fec2 commit 6d2a1cf

File tree

18 files changed

+328
-49
lines changed

18 files changed

+328
-49
lines changed

server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected PlanNode getPlanNode(IConsensusRequest request) {
5353
if (request instanceof ByteBufferConsensusRequest) {
5454
node = PlanNodeType.deserialize(request.serializeToByteBuffer());
5555
} else if (request instanceof MultiLeaderConsensusRequest) {
56-
node = WALEntry.deserializeInsertNode(request.serializeToByteBuffer());
56+
node = WALEntry.deserializeForConsensus(request.serializeToByteBuffer());
5757
} else if (request instanceof PlanNode) {
5858
node = (PlanNode) request;
5959
} else {

server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
149149
try {
150150
for (PartialPath path : node.getPathList()) {
151151
dataRegion.deleteByDevice(
152-
path, node.getDeleteStartTime(), node.getDeleteEndTime(), Long.MAX_VALUE, null);
152+
path, node.getDeleteStartTime(), node.getDeleteEndTime(), node.getSearchIndex(), null);
153153
}
154154
return StatusUtils.OK;
155155
} catch (IOException e) {

server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
import org.apache.iotdb.db.metadata.idtable.IDTable;
6969
import org.apache.iotdb.db.metadata.idtable.IDTableManager;
7070
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
71+
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
72+
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
7173
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
7274
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
7375
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -2185,15 +2187,15 @@ private void separateTsFile(
21852187
* @param pattern Must be a pattern start with a precise device path
21862188
* @param startTime
21872189
* @param endTime
2188-
* @param planIndex
2190+
* @param searchIndex
21892191
* @param timePartitionFilter
21902192
* @throws IOException
21912193
*/
21922194
public void deleteByDevice(
21932195
PartialPath pattern,
21942196
long startTime,
21952197
long endTime,
2196-
long planIndex,
2198+
long searchIndex,
21972199
TimePartitionFilter timePartitionFilter)
21982200
throws IOException {
21992201
// If there are still some old version tsfiles, the delete won't succeeded.
@@ -2225,7 +2227,7 @@ public void deleteByDevice(
22252227

22262228
// write log to impacted working TsFileProcessors
22272229
List<WALFlushListener> walListeners =
2228-
logDeleteInWAL(startTime, endTime, pattern, timePartitionFilter);
2230+
logDeletionInWAL(startTime, endTime, searchIndex, pattern, timePartitionFilter);
22292231

22302232
for (WALFlushListener walFlushListener : walListeners) {
22312233
if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
@@ -2293,6 +2295,42 @@ private List<WALFlushListener> logDeleteInWAL(
22932295
return walFlushListeners;
22942296
}
22952297

2298+
private List<WALFlushListener> logDeletionInWAL(
2299+
long startTime,
2300+
long endTime,
2301+
long searchIndex,
2302+
PartialPath path,
2303+
TimePartitionFilter timePartitionFilter) {
2304+
long timePartitionStartId = StorageEngine.getTimePartition(startTime);
2305+
long timePartitionEndId = StorageEngine.getTimePartition(endTime);
2306+
List<WALFlushListener> walFlushListeners = new ArrayList<>();
2307+
if (config.getWalMode() == WALMode.DISABLE) {
2308+
return walFlushListeners;
2309+
}
2310+
DeleteDataNode deleteDataNode =
2311+
new DeleteDataNode(new PlanNodeId(""), Collections.singletonList(path), startTime, endTime);
2312+
deleteDataNode.setSearchIndex(searchIndex);
2313+
for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) {
2314+
if (timePartitionStartId <= entry.getKey()
2315+
&& entry.getKey() <= timePartitionEndId
2316+
&& (timePartitionFilter == null
2317+
|| timePartitionFilter.satisfy(storageGroupName, entry.getKey()))) {
2318+
WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
2319+
walFlushListeners.add(walFlushListener);
2320+
}
2321+
}
2322+
for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) {
2323+
if (timePartitionStartId <= entry.getKey()
2324+
&& entry.getKey() <= timePartitionEndId
2325+
&& (timePartitionFilter == null
2326+
|| timePartitionFilter.satisfy(storageGroupName, entry.getKey()))) {
2327+
WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode);
2328+
walFlushListeners.add(walFlushListener);
2329+
}
2330+
}
2331+
return walFlushListeners;
2332+
}
2333+
22962334
private boolean canSkipDelete(
22972335
TsFileResource tsFileResource,
22982336
Set<PartialPath> devicePaths,

server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
5050
import org.apache.iotdb.db.metadata.path.AlignedPath;
5151
import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils;
52+
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
5253
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
5354
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
5455
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -854,6 +855,10 @@ WALFlushListener logDeleteInWAL(DeletePlan deletePlan) {
854855
return walNode.log(workMemTable.getMemTableId(), deletePlan);
855856
}
856857

858+
WALFlushListener logDeleteDataNodeInWAL(DeleteDataNode deleteDataNode) {
859+
return walNode.log(workMemTable.getMemTableId(), deleteDataNode);
860+
}
861+
857862
public TsFileResource getTsFileResource() {
858863
return tsFileResource;
859864
}

server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ public static PlanNode deserializeFromWAL(DataInputStream stream) throws IOExcep
161161
return InsertTabletNode.deserializeFromWAL(stream);
162162
case 14:
163163
return InsertRowNode.deserializeFromWAL(stream);
164+
case 44:
165+
return DeleteDataNode.deserializeFromWAL(stream);
164166
default:
165167
throw new IllegalArgumentException("Invalid node type: " + nodeType);
166168
}
@@ -173,6 +175,8 @@ public static PlanNode deserializeFromWAL(ByteBuffer buffer) {
173175
return InsertTabletNode.deserializeFromWAL(buffer);
174176
case 14:
175177
return InsertRowNode.deserializeFromWAL(buffer);
178+
case 44:
179+
return DeleteDataNode.deserializeFromWAL(buffer);
176180
default:
177181
throw new IllegalArgumentException("Invalid node type: " + nodeType);
178182
}

server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
23+
import org.apache.iotdb.commons.exception.IllegalPathException;
2324
import org.apache.iotdb.commons.partition.DataPartition;
2425
import org.apache.iotdb.commons.path.PartialPath;
2526
import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
@@ -31,8 +32,12 @@
3132
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
3233
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
3334
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
35+
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
36+
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
37+
import org.apache.iotdb.db.wal.utils.WALWriteUtils;
3438
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
3539

40+
import java.io.DataInputStream;
3641
import java.io.DataOutputStream;
3742
import java.io.IOException;
3843
import java.nio.ByteBuffer;
@@ -44,13 +49,22 @@
4449
import java.util.stream.Collectors;
4550

4651
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
52+
import static org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.NO_CONSENSUS_INDEX;
4753

48-
public class DeleteDataNode extends WritePlanNode {
54+
public class DeleteDataNode extends WritePlanNode implements WALEntryValue {
55+
/** byte: type, integer: pathList.size(), long: deleteStartTime, deleteEndTime, searchIndex */
56+
private static final int FIXED_SERIALIZED_SIZE = Short.BYTES + Integer.BYTES + Long.BYTES * 3;
4957

5058
private final List<PartialPath> pathList;
5159
private final long deleteStartTime;
5260
private final long deleteEndTime;
5361

62+
/**
63+
* this index is used by wal search, its order should be protected by the upper layer, and the
64+
* value should start from 1
65+
*/
66+
protected long searchIndex = NO_CONSENSUS_INDEX;
67+
5468
private TRegionReplicaSet regionReplicaSet;
5569

5670
public DeleteDataNode(
@@ -86,6 +100,15 @@ public long getDeleteEndTime() {
86100
return deleteEndTime;
87101
}
88102

103+
public long getSearchIndex() {
104+
return searchIndex;
105+
}
106+
107+
/** Search index should start from 1 */
108+
public void setSearchIndex(long searchIndex) {
109+
this.searchIndex = searchIndex;
110+
}
111+
89112
@Override
90113
public List<PlanNode> getChildren() {
91114
return new ArrayList<>();
@@ -109,6 +132,67 @@ public List<String> getOutputColumnNames() {
109132
return null;
110133
}
111134

135+
@Override
136+
public int serializedSize() {
137+
int size = FIXED_SERIALIZED_SIZE;
138+
for (PartialPath path : pathList) {
139+
size += ReadWriteIOUtils.sizeToWrite(path.getFullPath());
140+
}
141+
return size;
142+
}
143+
144+
@Override
145+
public void serializeToWAL(IWALByteBufferView buffer) {
146+
buffer.putShort(PlanNodeType.DELETE_DATA.getNodeType());
147+
buffer.putLong(searchIndex);
148+
buffer.putInt(pathList.size());
149+
for (PartialPath path : pathList) {
150+
WALWriteUtils.write(path.getFullPath(), buffer);
151+
}
152+
buffer.putLong(deleteStartTime);
153+
buffer.putLong(deleteEndTime);
154+
}
155+
156+
public static DeleteDataNode deserializeFromWAL(DataInputStream stream) throws IOException {
157+
long searchIndex = stream.readLong();
158+
int size = stream.readInt();
159+
List<PartialPath> pathList = new ArrayList<>(size);
160+
for (int i = 0; i < size; i++) {
161+
try {
162+
pathList.add(new PartialPath(ReadWriteIOUtils.readString(stream)));
163+
} catch (IllegalPathException e) {
164+
throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
165+
}
166+
}
167+
long deleteStartTime = stream.readLong();
168+
long deleteEndTime = stream.readLong();
169+
170+
DeleteDataNode deleteDataNode =
171+
new DeleteDataNode(new PlanNodeId(""), pathList, deleteStartTime, deleteEndTime);
172+
deleteDataNode.setSearchIndex(searchIndex);
173+
return deleteDataNode;
174+
}
175+
176+
public static DeleteDataNode deserializeFromWAL(ByteBuffer buffer) {
177+
long searchIndex = buffer.getLong();
178+
int size = buffer.getInt();
179+
List<PartialPath> pathList = new ArrayList<>(size);
180+
for (int i = 0; i < size; i++) {
181+
try {
182+
pathList.add(new PartialPath(ReadWriteIOUtils.readString(buffer)));
183+
} catch (IllegalPathException e) {
184+
throw new IllegalArgumentException("Cannot deserialize InsertRowNode", e);
185+
}
186+
}
187+
long deleteStartTime = buffer.getLong();
188+
long deleteEndTime = buffer.getLong();
189+
190+
DeleteDataNode deleteDataNode =
191+
new DeleteDataNode(new PlanNodeId(""), pathList, deleteStartTime, deleteEndTime);
192+
deleteDataNode.setSearchIndex(searchIndex);
193+
return deleteDataNode;
194+
}
195+
112196
@Override
113197
protected void serializeAttributes(ByteBuffer byteBuffer) {
114198
PlanNodeType.DELETE_DATA.serialize(byteBuffer);

server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.conf.CommonDescriptor;
2525
import org.apache.iotdb.db.conf.IoTDBConfig;
2626
import org.apache.iotdb.db.conf.IoTDBDescriptor;
27+
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
2728
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
2829
import org.apache.iotdb.db.utils.MmapUtil;
2930
import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
@@ -234,12 +235,14 @@ private boolean handleInfoEntry(WALEntry walEntry) {
234235
}
235236
// update search index
236237
long searchIndex = DEFAULT_SEARCH_INDEX;
237-
if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
238-
|| walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
239-
InsertNode insertNode = (InsertNode) walEntry.getValue();
240-
if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
241-
searchIndex = insertNode.getSearchIndex();
242-
currentSearchIndex = insertNode.getSearchIndex();
238+
if (walEntry.getType().needSearch()) {
239+
if (walEntry.getType() == WALEntryType.DELETE_DATA_NODE) {
240+
searchIndex = ((DeleteDataNode) walEntry.getValue()).getSearchIndex();
241+
} else {
242+
searchIndex = ((InsertNode) walEntry.getValue()).getSearchIndex();
243+
}
244+
if (searchIndex != DEFAULT_SEARCH_INDEX) {
245+
currentSearchIndex = searchIndex;
243246
currentFileStatus = WALFileStatus.CONTAINS_SEARCH_INDEX;
244247
}
245248
}

server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntry.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.db.engine.memtable.IMemTable;
2424
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
2525
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
26+
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
2627
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
2728
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
2829
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -74,6 +75,8 @@ public WALEntry(long memTableId, WALEntryValue value, boolean wait) {
7475
this.type = WALEntryType.INSERT_ROW_NODE;
7576
} else if (value instanceof InsertTabletNode) {
7677
this.type = WALEntryType.INSERT_TABLET_NODE;
78+
} else if (value instanceof DeleteDataNode) {
79+
this.type = WALEntryType.DELETE_DATA_NODE;
7780
} else {
7881
throw new RuntimeException("Unknown WALEntry type");
7982
}
@@ -93,9 +96,6 @@ public static WALEntry deserialize(DataInputStream stream)
9396
throws IllegalPathException, IOException {
9497
byte typeNum = stream.readByte();
9598
WALEntryType type = WALEntryType.valueOf(typeNum);
96-
if (type == null) {
97-
throw new IOException("unrecognized wal entry type " + typeNum);
98-
}
9999

100100
// handle signal
101101
switch (type) {
@@ -127,6 +127,9 @@ public static WALEntry deserialize(DataInputStream stream)
127127
case INSERT_TABLET_NODE:
128128
value = (InsertTabletNode) PlanNodeType.deserializeFromWAL(stream);
129129
break;
130+
case DELETE_DATA_NODE:
131+
value = (DeleteDataNode) PlanNodeType.deserializeFromWAL(stream);
132+
break;
130133
}
131134
return new WALInfoEntry(type, memTableId, value);
132135
}
@@ -135,7 +138,7 @@ public static WALEntry deserialize(DataInputStream stream)
135138
* This deserialization method is only for multi-leader consensus and just deserializes
136139
* InsertRowNode and InsertTabletNode
137140
*/
138-
public static PlanNode deserializeInsertNode(ByteBuffer buffer) {
141+
public static PlanNode deserializeForConsensus(ByteBuffer buffer) {
139142
logger.debug(
140143
"buffer capacity is: {}, limit is: {}, position is: {}",
141144
buffer.capacity(),

server/src/main/java/org/apache/iotdb/db/wal/buffer/WALEntryType.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public enum WALEntryType {
3333
INSERT_ROW_NODE((byte) 4),
3434
/** {@link org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode} */
3535
INSERT_TABLET_NODE((byte) 5),
36+
/** {@link org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode} */
37+
DELETE_DATA_NODE((byte) 6),
3638
// endregion
3739
// region signal entry type
3840
/** signal wal buffer has been closed */
@@ -54,12 +56,17 @@ public byte getCode() {
5456
return code;
5557
}
5658

59+
/** Returns true when this type should be searched */
60+
public boolean needSearch() {
61+
return this == INSERT_TABLET_NODE || this == INSERT_ROW_NODE || this == DELETE_DATA_NODE;
62+
}
63+
5764
public static WALEntryType valueOf(byte code) {
5865
for (WALEntryType type : WALEntryType.values()) {
5966
if (type.code == code) {
6067
return type;
6168
}
6269
}
63-
return null;
70+
throw new IllegalArgumentException("Invalid WALEntryType code: " + code);
6471
}
6572
}

server/src/main/java/org/apache/iotdb/db/wal/buffer/WALInfoEntry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void serialize(IWALByteBufferView buffer) {
8282
case INSERT_ROW_PLAN:
8383
case INSERT_ROW_NODE:
8484
case DELETE_PLAN:
85+
case DELETE_DATA_NODE:
8586
case MEMORY_TABLE_SNAPSHOT:
8687
value.serializeToWAL(buffer);
8788
break;

0 commit comments

Comments
 (0)