Skip to content

Commit 8472c71

Browse files
authored
Fixed the issue where devices in the cache may be repeatedly fetched
1 parent 77a91e0 commit 8472c71

File tree

2 files changed

+149
-13
lines changed

2 files changed

+149
-13
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.query.view.recent;
21+
22+
import org.apache.iotdb.commons.cluster.NodeStatus;
23+
import org.apache.iotdb.isession.ISession;
24+
import org.apache.iotdb.isession.ITableSession;
25+
import org.apache.iotdb.isession.SessionDataSet;
26+
import org.apache.iotdb.it.env.EnvFactory;
27+
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
28+
import org.apache.iotdb.rpc.IoTDBConnectionException;
29+
import org.apache.iotdb.rpc.StatementExecutionException;
30+
31+
import org.junit.After;
32+
import org.junit.Assert;
33+
import org.junit.Before;
34+
import org.junit.Test;
35+
36+
import java.util.Collections;
37+
38+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
39+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
40+
41+
public class IoTDBTableViewQueryWithCachedDeviceIT {
42+
43+
protected static final String DATABASE_NAME = "test";
44+
45+
protected static String[] createTreeNonAlignedDataSqls = {
46+
"CREATE TIMESERIES root.db.battery.b1.voltage INT32",
47+
"CREATE TIMESERIES root.db.battery.b1.current FLOAT",
48+
"INSERT INTO root.db.battery.b1(time, voltage, current) values (1, 1, 1)",
49+
"CREATE TIMESERIES root.db.battery.b2.voltage INT32",
50+
"CREATE TIMESERIES root.db.battery.b2.current FLOAT",
51+
"INSERT INTO root.db.battery.b2(time, voltage, current) values (1, 2, 2)",
52+
"CREATE TIMESERIES root.db.battery.b3.voltage INT32",
53+
"CREATE TIMESERIES root.db.battery.b3.current FLOAT",
54+
"INSERT INTO root.db.battery.b3(time, voltage, current) values (1, 3, 3)",
55+
"CREATE TIMESERIES root.db.battery.b4.voltage INT32",
56+
"CREATE TIMESERIES root.db.battery.b4.current FLOAT",
57+
"INSERT INTO root.db.battery.b4(time, voltage, current) values (1, 4, 4)",
58+
"FLUSH",
59+
};
60+
61+
protected static String[] createTableSqls = {
62+
"CREATE DATABASE " + DATABASE_NAME,
63+
"USE " + DATABASE_NAME,
64+
"CREATE VIEW view1 (battery TAG, voltage INT32 FIELD, current FLOAT FIELD) as root.db.battery.**",
65+
};
66+
67+
@Before
68+
public void setUp() throws Exception {
69+
EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(128 * 1024);
70+
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockSizeInByte(4 * 1024);
71+
EnvFactory.getEnv().initClusterEnvironment();
72+
prepareData(createTreeNonAlignedDataSqls);
73+
prepareTableData(createTableSqls);
74+
}
75+
76+
@After
77+
public void tearDown() throws Exception {
78+
EnvFactory.getEnv().cleanClusterEnvironment();
79+
}
80+
81+
@Test
82+
public void test1() throws IoTDBConnectionException, StatementExecutionException {
83+
int count1 = 0;
84+
int count2 = 0;
85+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
86+
session.executeNonQueryStatement("use " + DATABASE_NAME);
87+
SessionDataSet sessionDataSet =
88+
session.executeQueryStatement(
89+
"select * from view1 where battery = 'b1' or battery = 'b2' or battery = 'b3'");
90+
while (sessionDataSet.hasNext()) {
91+
sessionDataSet.next();
92+
count1++;
93+
}
94+
}
95+
EnvFactory.getEnv().shutdownAllDataNodes();
96+
for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) {
97+
EnvFactory.getEnv()
98+
.ensureNodeStatus(
99+
Collections.singletonList(dataNodeWrapper),
100+
Collections.singletonList(NodeStatus.Unknown));
101+
}
102+
EnvFactory.getEnv().startAllDataNodes();
103+
for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) {
104+
EnvFactory.getEnv()
105+
.ensureNodeStatus(
106+
Collections.singletonList(dataNodeWrapper),
107+
Collections.singletonList(NodeStatus.Running));
108+
}
109+
110+
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
111+
session.executeNonQueryStatement(
112+
"INSERT INTO root.db.battery.b3(time, voltage, current) aligned values (2, 1, 1)");
113+
}
114+
115+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
116+
session.executeNonQueryStatement("use " + DATABASE_NAME);
117+
SessionDataSet sessionDataSet =
118+
session.executeQueryStatement(
119+
"select * from view1 where (battery = 'b1' or battery = 'b2' or battery = 'b3') and time = 1");
120+
while (sessionDataSet.hasNext()) {
121+
sessionDataSet.next();
122+
count2++;
123+
}
124+
}
125+
Assert.assertEquals(count1, count2);
126+
}
127+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -307,25 +307,34 @@ public boolean parseFilter4TraverseDevice(
307307
index2FilterMapList.size()
308308
- tagSingleMatchIndexList.size()
309309
+ tagSingleMatchPredicateNotInCache.size());
310-
int idx1 = 0;
311-
int idx2 = 0;
312-
for (int i = 0; i < index2FilterMapList.size(); i++) {
313-
if (idx1 >= tagSingleMatchIndexList.size() || i != tagSingleMatchIndexList.get(idx1)) {
314-
tagPredicateForFetch.add(
315-
index2FilterMapList.get(i).values().stream()
316-
.flatMap(Collection::stream)
317-
.collect(Collectors.toList()));
318-
} else {
319-
idx1++;
320-
if (idx2 >= tagSingleMatchPredicateNotInCache.size()
321-
|| i == tagSingleMatchPredicateNotInCache.get(idx2)) {
310+
if (!isDirectDeviceQuery) {
311+
int idx1 = 0;
312+
int idx2 = 0;
313+
for (int i = 0; i < index2FilterMapList.size(); i++) {
314+
if (idx1 >= tagSingleMatchIndexList.size() || i != tagSingleMatchIndexList.get(idx1)) {
322315
tagPredicateForFetch.add(
323316
index2FilterMapList.get(i).values().stream()
324317
.flatMap(Collection::stream)
325318
.collect(Collectors.toList()));
326-
idx2++;
319+
} else {
320+
idx1++;
321+
if (idx2 < tagSingleMatchPredicateNotInCache.size()
322+
&& i == tagSingleMatchPredicateNotInCache.get(idx2)) {
323+
tagPredicateForFetch.add(
324+
index2FilterMapList.get(i).values().stream()
325+
.flatMap(Collection::stream)
326+
.collect(Collectors.toList()));
327+
idx2++;
328+
}
327329
}
328330
}
331+
} else {
332+
for (Map<Integer, List<SchemaFilter>> integerListMap : index2FilterMapList) {
333+
tagPredicateForFetch.add(
334+
integerListMap.values().stream()
335+
.flatMap(Collection::stream)
336+
.collect(Collectors.toList()));
337+
}
329338
}
330339
statement.setTagDeterminedFilterList(tagPredicateForFetch);
331340
statement.setTagFuzzyPredicate(compactedTagFuzzyPredicate);

0 commit comments

Comments
 (0)