Skip to content

Commit 73a0cd8

Browse files
authored
No filtering audit DBs in some procdure & update idle time after logging in
1 parent f39cf38 commit 73a0cd8

File tree

12 files changed

+111
-40
lines changed

12 files changed

+111
-40
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBDeletionTableIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,8 +1625,8 @@ private Void randomDeviceDeletion(
16251625
allDeviceUndeletedRanges.set(i, mergeRanges(deviceUndeletedRanges));
16261626
List<TimeRange> remainingRanges =
16271627
collectDataRanges(statement, currentWrittenTime, testNum);
1628-
LOGGER.debug("Expected ranges: {}", deviceUndeletedRanges);
1629-
LOGGER.debug("Remaining ranges: {}", remainingRanges);
1628+
LOGGER.info("Expected ranges: {}", deviceUndeletedRanges);
1629+
LOGGER.info("Remaining ranges: {}", remainingRanges);
16301630
fail(
16311631
String.format(
16321632
"Inconsistent number of points %d - %d", expectedCnt, set.getLong(1)));

integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import org.apache.iotdb.itbase.category.TableClusterIT;
2626
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
2727
import org.apache.iotdb.itbase.env.BaseEnv;
28+
import org.apache.iotdb.rpc.StatementExecutionException;
2829

2930
import org.apache.tsfile.read.common.Field;
31+
import org.awaitility.Awaitility;
3032
import org.fusesource.mqtt.client.BlockingConnection;
3133
import org.fusesource.mqtt.client.MQTT;
3234
import org.fusesource.mqtt.client.QoS;
@@ -38,6 +40,7 @@
3840

3941
import java.io.IOException;
4042
import java.util.List;
43+
import java.util.concurrent.TimeUnit;
4144

4245
import static org.junit.Assert.assertEquals;
4346
import static org.junit.Assert.fail;
@@ -90,19 +93,32 @@ public void testNoAttr() throws Exception {
9093
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
9194
session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
9295
String payload1 = "test1,tag1=t1,tag2=t2 field1=1,field2=1f,field3=1i32 1";
93-
connection.publish(DATABASE + "/myTopic", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
94-
Thread.sleep(1000);
95-
try (final SessionDataSet dataSet =
96-
session.executeQueryStatement(
97-
"select tag1,tag2,field1,field2,field3 from test1 where time = 1")) {
98-
assertEquals(5, dataSet.getColumnNames().size());
99-
List<Field> fields = dataSet.next().getFields();
100-
assertEquals("t1", fields.get(0).getStringValue());
101-
assertEquals("t2", fields.get(1).getStringValue());
102-
assertEquals(1d, fields.get(2).getDoubleV(), 0);
103-
assertEquals(1f, fields.get(3).getFloatV(), 0);
104-
assertEquals(1, fields.get(4).getIntV(), 0);
105-
}
96+
Awaitility.await()
97+
.atMost(3, TimeUnit.MINUTES)
98+
.pollInterval(1, TimeUnit.SECONDS)
99+
.until(
100+
() -> {
101+
connection.publish(
102+
DATABASE + "/myTopic", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
103+
try (final SessionDataSet dataSet =
104+
session.executeQueryStatement(
105+
"select tag1,tag2,field1,field2,field3 from test1 where time = 1")) {
106+
assertEquals(5, dataSet.getColumnNames().size());
107+
List<Field> fields = dataSet.next().getFields();
108+
assertEquals("t1", fields.get(0).getStringValue());
109+
assertEquals("t2", fields.get(1).getStringValue());
110+
assertEquals(1d, fields.get(2).getDoubleV(), 0);
111+
assertEquals(1f, fields.get(3).getFloatV(), 0);
112+
assertEquals(1, fields.get(4).getIntV(), 0);
113+
return true;
114+
} catch (StatementExecutionException e) {
115+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
116+
return false;
117+
} else {
118+
throw e;
119+
}
120+
}
121+
});
106122
}
107123
}
108124

@@ -112,21 +128,34 @@ public void testWithAttr() throws Exception {
112128
EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
113129
session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
114130
String payload1 = "test2,tag1=t1,tag2=t2 attr3=a3,attr4=a4 field1=1,field2=1f,field3=1i32 1";
115-
connection.publish(DATABASE + "/myTopic", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
116-
Thread.sleep(1000);
117-
try (final SessionDataSet dataSet =
118-
session.executeQueryStatement(
119-
"select tag1,tag2,attr3,attr4,field1,field2,field3 from test2 where time = 1")) {
120-
assertEquals(7, dataSet.getColumnNames().size());
121-
List<Field> fields = dataSet.next().getFields();
122-
assertEquals("t1", fields.get(0).getStringValue());
123-
assertEquals("t2", fields.get(1).getStringValue());
124-
assertEquals("a3", fields.get(2).getStringValue());
125-
assertEquals("a4", fields.get(3).getStringValue());
126-
assertEquals(1d, fields.get(4).getDoubleV(), 0);
127-
assertEquals(1f, fields.get(5).getFloatV(), 0);
128-
assertEquals(1, fields.get(6).getIntV(), 0);
129-
}
131+
Awaitility.await()
132+
.atMost(3, TimeUnit.MINUTES)
133+
.pollInterval(1, TimeUnit.SECONDS)
134+
.until(
135+
() -> {
136+
connection.publish(
137+
DATABASE + "/myTopic", payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
138+
try (final SessionDataSet dataSet =
139+
session.executeQueryStatement(
140+
"select tag1,tag2,attr3,attr4,field1,field2,field3 from test2 where time = 1")) {
141+
assertEquals(7, dataSet.getColumnNames().size());
142+
List<Field> fields = dataSet.next().getFields();
143+
assertEquals("t1", fields.get(0).getStringValue());
144+
assertEquals("t2", fields.get(1).getStringValue());
145+
assertEquals("a3", fields.get(2).getStringValue());
146+
assertEquals("a4", fields.get(3).getStringValue());
147+
assertEquals(1d, fields.get(4).getDoubleV(), 0);
148+
assertEquals(1f, fields.get(5).getFloatV(), 0);
149+
assertEquals(1, fields.get(6).getIntV(), 0);
150+
return true;
151+
} catch (StatementExecutionException e) {
152+
if (e.getMessage() != null && e.getMessage().contains("does not exist")) {
153+
return false;
154+
} else {
155+
throw e;
156+
}
157+
}
158+
});
130159
}
131160
}
132161
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2269,7 +2269,11 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
22692269
deleteTimeSeriesPatternPaths.add(path);
22702270
}
22712271
if (!canOptimize) {
2272-
return procedureManager.deleteTimeSeries(queryId, rawPatternTree, isGeneratedByPipe);
2272+
return procedureManager.deleteTimeSeries(
2273+
queryId,
2274+
rawPatternTree,
2275+
isGeneratedByPipe,
2276+
req.isSetMayDeleteAudit() && req.isMayDeleteAudit());
22732277
}
22742278
// check if the database is using template
22752279
try {
@@ -2286,7 +2290,10 @@ public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
22862290
deleteTimeSeriesPatternTree.constructTree();
22872291
status =
22882292
procedureManager.deleteTimeSeries(
2289-
queryId, deleteTimeSeriesPatternTree, isGeneratedByPipe);
2293+
queryId,
2294+
deleteTimeSeriesPatternTree,
2295+
isGeneratedByPipe,
2296+
req.isSetMayDeleteAudit() && req.isMayDeleteAudit());
22902297
}
22912298
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
22922299
// 2. delete database
@@ -2638,12 +2645,21 @@ private Map<TConsensusGroupId, TRegionReplicaSet> getRelatedSchemaRegionGroup(
26382645

26392646
/**
26402647
* Get all related dataRegion which may contains the data of specific timeseries matched by given
2641-
* patternTree
2648+
* patternTree. The audit db is excluded
26422649
*/
26432650
public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
26442651
final PathPatternTree patternTree) {
2652+
return getRelatedDataRegionGroup(patternTree, false);
2653+
}
2654+
2655+
/**
2656+
* Get all related dataRegion which may contains the data of specific timeseries matched by given
2657+
* patternTree
2658+
*/
2659+
public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup(
2660+
final PathPatternTree patternTree, boolean needAuditDB) {
26452661
return getRelatedDataRegionGroup(
2646-
getSchemaPartition(patternTree, false).getSchemaPartitionTable());
2662+
getSchemaPartition(patternTree, needAuditDB).getSchemaPartitionTable());
26472663
}
26482664

26492665
public Map<TConsensusGroupId, TRegionReplicaSet> getRelatedDataRegionGroup4TableModel(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,10 @@ public TSStatus deleteDatabases(
308308
}
309309

310310
public TSStatus deleteTimeSeries(
311-
String queryId, PathPatternTree patternTree, boolean isGeneratedByPipe) {
311+
String queryId,
312+
PathPatternTree patternTree,
313+
boolean isGeneratedByPipe,
314+
boolean mayDeleteAudit) {
312315
DeleteTimeSeriesProcedure procedure = null;
313316
synchronized (this) {
314317
boolean hasOverlappedTask = false;
@@ -336,7 +339,8 @@ public TSStatus deleteTimeSeries(
336339
TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
337340
"Some other task is deleting some target timeseries.");
338341
}
339-
procedure = new DeleteTimeSeriesProcedure(queryId, patternTree, isGeneratedByPipe);
342+
procedure =
343+
new DeleteTimeSeriesProcedure(queryId, patternTree, isGeneratedByPipe, mayDeleteAudit);
340344
this.executor.submitProcedure(procedure);
341345
}
342346
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class DeleteTimeSeriesProcedure
7171

7272
private PathPatternTree patternTree;
7373
private transient ByteBuffer patternTreeBytes;
74+
private boolean mayDeleteAudit;
7475

7576
private transient String requestMessage;
7677

@@ -86,10 +87,14 @@ public DeleteTimeSeriesProcedure(final boolean isGeneratedByPipe) {
8687
}
8788

8889
public DeleteTimeSeriesProcedure(
89-
final String queryId, final PathPatternTree patternTree, final boolean isGeneratedByPipe) {
90+
final String queryId,
91+
final PathPatternTree patternTree,
92+
final boolean isGeneratedByPipe,
93+
boolean mayDeleteAudit) {
9094
super(isGeneratedByPipe);
9195
this.queryId = queryId;
9296
setPatternTree(patternTree);
97+
this.mayDeleteAudit = mayDeleteAudit;
9398
}
9499

95100
@Override
@@ -233,7 +238,7 @@ private void executeDeleteData(
233238
}
234239

235240
final Map<TConsensusGroupId, TRegionReplicaSet> relatedDataRegionGroup =
236-
env.getConfigManager().getRelatedDataRegionGroup(patternTree);
241+
env.getConfigManager().getRelatedDataRegionGroup(patternTree, mayDeleteAudit);
237242

238243
// Target timeSeries has no data
239244
if (relatedDataRegionGroup.isEmpty()) {
@@ -359,6 +364,7 @@ public void serialize(final DataOutputStream stream) throws IOException {
359364
super.serialize(stream);
360365
ReadWriteIOUtils.write(queryId, stream);
361366
patternTree.serialize(stream);
367+
ReadWriteIOUtils.write(mayDeleteAudit, stream);
362368
}
363369

364370
@Override
@@ -370,6 +376,9 @@ public void deserialize(final ByteBuffer byteBuffer) {
370376
|| getCurrentState() == DeleteTimeSeriesState.DELETE_DATA) {
371377
LOGGER.info("Successfully restored, will set mods to the data regions anyway");
372378
}
379+
if (byteBuffer.hasRemaining()) {
380+
mayDeleteAudit = ReadWriteIOUtils.readBoolean(byteBuffer);
381+
}
373382
}
374383

375384
@Override

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/receiver/PipeEnrichedProcedureTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void deleteTimeseriesTest() throws IllegalPathException, IOException {
117117
patternTree.appendPathPattern(new PartialPath("root.sg2.*.s1"));
118118
patternTree.constructTree();
119119
DeleteTimeSeriesProcedure deleteTimeSeriesProcedure =
120-
new DeleteTimeSeriesProcedure(queryId, patternTree, true);
120+
new DeleteTimeSeriesProcedure(queryId, patternTree, true, false);
121121

122122
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
123123
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);

iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedureTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public void serializeDeserializeTest() throws IllegalPathException, IOException
4343
patternTree.appendPathPattern(new PartialPath("root.sg2.*.s1"));
4444
patternTree.constructTree();
4545
DeleteTimeSeriesProcedure deleteTimeSeriesProcedure =
46-
new DeleteTimeSeriesProcedure(queryId, patternTree, false);
46+
new DeleteTimeSeriesProcedure(queryId, patternTree, false, false);
4747

4848
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
4949
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ public BasicOpenSessionResp login(
223223
openSessionResp.getMessage(),
224224
username,
225225
session);
226+
updateIdleTime();
226227
if (enableLoginLock) {
227228
loginLockManager.clearFailure(userId, session.getClientAddress());
228229
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2775,6 +2775,7 @@ public SettableFuture<ConfigTaskResult> deleteTimeSeries(
27752775
new TDeleteTimeSeriesReq(
27762776
queryId,
27772777
serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList()));
2778+
req.setMayDeleteAudit(deleteTimeSeriesStatement.isMayDeleteAudit());
27782779
try (ConfigNodeClient client =
27792780
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
27802781
TSStatus tsStatus;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DeleteTimeSeriesStatement.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
public class DeleteTimeSeriesStatement extends Statement implements IConfigStatement {
3232

33+
private boolean mayDeleteAudit = false;
3334
List<PartialPath> pathPatternList;
3435

3536
public DeleteTimeSeriesStatement() {
@@ -64,4 +65,12 @@ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
6465
public QueryType getQueryType() {
6566
return QueryType.WRITE;
6667
}
68+
69+
public void setMayDeleteAudit(boolean mayDeleteAudit) {
70+
this.mayDeleteAudit = mayDeleteAudit;
71+
}
72+
73+
public boolean isMayDeleteAudit() {
74+
return mayDeleteAudit;
75+
}
6776
}

0 commit comments

Comments
 (0)