Skip to content

Commit 1a0fdb0

Browse files
CRZbulabulaJackieTien97
authored andcommitted
Use reference time position for PartitionTableAutoCleaner
(cherry picked from commit d481812)
1 parent c94db16 commit 1a0fdb0

File tree

4 files changed

+210
-18
lines changed

4 files changed

+210
-18
lines changed

integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionTableAutoCleanIT.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
24-
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2524
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
2625
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
2726
import org.apache.iotdb.it.env.EnvFactory;
@@ -50,7 +49,11 @@ public class IoTDBPartitionTableAutoCleanIT {
5049
private static final long TEST_TTL_CHECK_INTERVAL = 5_000;
5150

5251
private static final TTimePartitionSlot TEST_CURRENT_TIME_SLOT =
53-
TimePartitionUtils.getCurrentTimePartitionSlot();
52+
new TTimePartitionSlot()
53+
.setStartTime(
54+
System.currentTimeMillis()
55+
/ TEST_TIME_PARTITION_INTERVAL
56+
* TEST_TIME_PARTITION_INTERVAL);
5457
private static final long TEST_TTL = 7 * TEST_TIME_PARTITION_INTERVAL;
5558

5659
@Before
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.it.partition;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
23+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
24+
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
25+
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
26+
import org.apache.iotdb.it.env.EnvFactory;
27+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
28+
import org.apache.iotdb.itbase.category.ClusterIT;
29+
import org.apache.iotdb.itbase.env.BaseEnv;
30+
import org.apache.iotdb.rpc.TSStatusCode;
31+
32+
import org.junit.After;
33+
import org.junit.Assert;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
import org.junit.experimental.categories.Category;
37+
import org.junit.runner.RunWith;
38+
39+
import java.sql.Connection;
40+
import java.sql.Statement;
41+
import java.util.TreeMap;
42+
import java.util.concurrent.TimeUnit;
43+
44+
@RunWith(IoTDBTestRunner.class)
45+
@Category({ClusterIT.class})
46+
public class IoTDBPartitionTableAutoCleanUSIT {
47+
48+
private static final String TREE_DATABASE_PREFIX = "root.db.g_";
49+
private static final String TABLE_DATABASE_PREFIX = "database_";
50+
51+
private static final int TEST_REPLICATION_FACTOR = 1;
52+
private static final long TEST_TIME_PARTITION_INTERVAL_IN_MS = 604800_000;
53+
private static final long TEST_TTL_CHECK_INTERVAL = 5_000;
54+
55+
private static final TTimePartitionSlot TEST_CURRENT_TIME_SLOT =
56+
new TTimePartitionSlot()
57+
.setStartTime(
58+
System.currentTimeMillis()
59+
* 1000L
60+
/ TEST_TIME_PARTITION_INTERVAL_IN_MS
61+
* TEST_TIME_PARTITION_INTERVAL_IN_MS);
62+
private static final long TEST_TTL_IN_MS = 7 * TEST_TIME_PARTITION_INTERVAL_IN_MS;
63+
64+
@Before
65+
public void setUp() throws Exception {
66+
EnvFactory.getEnv()
67+
.getConfig()
68+
.getCommonConfig()
69+
.setSchemaReplicationFactor(TEST_REPLICATION_FACTOR)
70+
.setDataReplicationFactor(TEST_REPLICATION_FACTOR)
71+
.setTimePartitionInterval(TEST_TIME_PARTITION_INTERVAL_IN_MS)
72+
.setTTLCheckInterval(TEST_TTL_CHECK_INTERVAL)
73+
// Note that the time precision of IoTDB is us in this IT
74+
.setTimestampPrecision("us");
75+
76+
// Init 1C1D environment
77+
EnvFactory.getEnv().initClusterEnvironment(1, 1);
78+
}
79+
80+
@After
81+
public void tearDown() {
82+
EnvFactory.getEnv().cleanClusterEnvironment();
83+
}
84+
85+
@Test
86+
public void testAutoCleanPartitionTableForTreeModel() throws Exception {
87+
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT);
88+
Statement statement = connection.createStatement()) {
89+
// Create databases and insert test data
90+
for (int i = 0; i < 3; i++) {
91+
String databaseName = String.format("%s%d", TREE_DATABASE_PREFIX, i);
92+
statement.execute(String.format("CREATE DATABASE %s", databaseName));
93+
statement.execute(
94+
String.format(
95+
"CREATE TIMESERIES %s.s WITH DATATYPE=INT64,ENCODING=PLAIN", databaseName));
96+
// Insert expired data
97+
statement.execute(
98+
String.format(
99+
"INSERT INTO %s(timestamp, s) VALUES (%d, %d)",
100+
databaseName, TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL_IN_MS * 2000, -1));
101+
// Insert existed data
102+
statement.execute(
103+
String.format(
104+
"INSERT INTO %s(timestamp, s) VALUES (%d, %d)",
105+
databaseName, TEST_CURRENT_TIME_SLOT.getStartTime(), 1));
106+
}
107+
// Let db0.TTL > device.TTL, the valid TTL should be the bigger one
108+
statement.execute(String.format("SET TTL TO %s0 %d", TREE_DATABASE_PREFIX, TEST_TTL_IN_MS));
109+
statement.execute(String.format("SET TTL TO %s0.s %d", TREE_DATABASE_PREFIX, 10));
110+
// Let db1.TTL < device.TTL, the valid TTL should be the bigger one
111+
statement.execute(String.format("SET TTL TO %s1 %d", TREE_DATABASE_PREFIX, 10));
112+
statement.execute(String.format("SET TTL TO %s1.s %d", TREE_DATABASE_PREFIX, TEST_TTL_IN_MS));
113+
// Set TTL to path db2.**
114+
statement.execute(
115+
String.format("SET TTL TO %s2.** %d", TREE_DATABASE_PREFIX, TEST_TTL_IN_MS));
116+
}
117+
TDataPartitionReq req = new TDataPartitionReq();
118+
for (int i = 0; i < 3; i++) {
119+
req.putToPartitionSlotsMap(String.format("%s%d", TREE_DATABASE_PREFIX, i), new TreeMap<>());
120+
}
121+
try (SyncConfigNodeIServiceClient client =
122+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
123+
for (int retry = 0; retry < 120; retry++) {
124+
boolean partitionTableAutoCleaned = true;
125+
TDataPartitionTableResp resp = client.getDataPartitionTable(req);
126+
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
127+
partitionTableAutoCleaned =
128+
resp.getDataPartitionTable().entrySet().stream()
129+
.flatMap(e1 -> e1.getValue().entrySet().stream())
130+
.allMatch(e2 -> e2.getValue().size() == 1);
131+
}
132+
if (partitionTableAutoCleaned) {
133+
return;
134+
}
135+
TimeUnit.SECONDS.sleep(1);
136+
}
137+
}
138+
Assert.fail("The PartitionTable in the ConfigNode is not auto cleaned!");
139+
}
140+
141+
@Test
142+
public void testAutoCleanPartitionTableForTableModel() throws Exception {
143+
try (final Connection connection =
144+
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
145+
final Statement statement = connection.createStatement()) {
146+
// Create databases and insert test data
147+
String databaseName = TABLE_DATABASE_PREFIX;
148+
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName));
149+
statement.execute(String.format("USE %s", databaseName));
150+
statement.execute("CREATE TABLE tb (time TIMESTAMP TIME, s int64 FIELD)");
151+
// Insert expired data
152+
statement.execute(
153+
String.format(
154+
"INSERT INTO tb(time, s) VALUES (%d, %d)",
155+
TEST_CURRENT_TIME_SLOT.getStartTime() - TEST_TTL_IN_MS * 2000, -1));
156+
// Insert existed data
157+
statement.execute(
158+
String.format(
159+
"INSERT INTO tb(time, s) VALUES (%d, %d)", TEST_CURRENT_TIME_SLOT.getStartTime(), 1));
160+
statement.execute(String.format("USE %s", TABLE_DATABASE_PREFIX));
161+
statement.execute(String.format("ALTER TABLE tb SET PROPERTIES TTL=%d", TEST_TTL_IN_MS));
162+
}
163+
164+
TDataPartitionReq req = new TDataPartitionReq();
165+
req.putToPartitionSlotsMap(TABLE_DATABASE_PREFIX, new TreeMap<>());
166+
try (SyncConfigNodeIServiceClient client =
167+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
168+
for (int retry = 0; retry < 120; retry++) {
169+
boolean partitionTableAutoCleaned = true;
170+
TDataPartitionTableResp resp = client.getDataPartitionTable(req);
171+
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == resp.getStatus().getCode()) {
172+
partitionTableAutoCleaned =
173+
resp.getDataPartitionTable().entrySet().stream()
174+
.flatMap(e1 -> e1.getValue().entrySet().stream())
175+
.allMatch(e2 -> e2.getValue().size() == 1);
176+
}
177+
if (partitionTableAutoCleaned) {
178+
return;
179+
}
180+
TimeUnit.SECONDS.sleep(1);
181+
}
182+
}
183+
Assert.fail("The PartitionTable in the ConfigNode is not auto cleaned!");
184+
}
185+
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.iotdb.commons.conf.CommonConfig;
2424
import org.apache.iotdb.commons.conf.CommonDescriptor;
2525
import org.apache.iotdb.commons.utils.PathUtils;
26-
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2726
import org.apache.iotdb.confignode.consensus.request.write.partition.AutoCleanPartitionTablePlan;
2827
import org.apache.iotdb.confignode.manager.ConfigManager;
2928
import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -43,6 +42,10 @@ public class PartitionTableAutoCleaner<Env> extends InternalProcedure<Env> {
4342
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionTableAutoCleaner.class);
4443

4544
private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
45+
46+
private static final String timestampPrecision =
47+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision();
48+
4649
private final ConfigManager configManager;
4750

4851
public PartitionTableAutoCleaner(ConfigManager configManager) {
@@ -75,8 +78,7 @@ protected void periodicExecute(Env env) {
7578
"[PartitionTableCleaner] Periodically activate PartitionTableAutoCleaner for: {}",
7679
databaseTTLMap);
7780
// Only clean the partition table when necessary
78-
TTimePartitionSlot currentTimePartitionSlot =
79-
TimePartitionUtils.getCurrentTimePartitionSlot();
81+
TTimePartitionSlot currentTimePartitionSlot = getCurrentTimePartitionSlot();
8082
try {
8183
configManager
8284
.getConsensusManager()
@@ -86,4 +88,19 @@ protected void periodicExecute(Env env) {
8688
}
8789
}
8890
}
91+
92+
/**
93+
* @return The time partition slot corresponding to current timestamp. Note that we do not shift
94+
* the start time to the correct starting point, since this interface only constructs a time
95+
* reference position for the partition table cleaner.
96+
*/
97+
private static TTimePartitionSlot getCurrentTimePartitionSlot() {
98+
if ("ms".equals(timestampPrecision)) {
99+
return new TTimePartitionSlot(System.currentTimeMillis());
100+
} else if ("us".equals(timestampPrecision)) {
101+
return new TTimePartitionSlot(System.currentTimeMillis() * 1000);
102+
} else {
103+
return new TTimePartitionSlot(System.currentTimeMillis() * 1000_000);
104+
}
105+
}
89106
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ public class TimePartitionUtils {
3434
private static long timePartitionOrigin =
3535
CommonDescriptor.getInstance().getConfig().getTimePartitionOrigin();
3636

37-
private static String timestampPrecision =
38-
CommonDescriptor.getInstance().getConfig().getTimestampPrecision();
39-
4037
/** Time range for dividing database, the time unit is the same with IoTDB's TimestampPrecision */
4138
private static long timePartitionInterval =
4239
CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
@@ -74,16 +71,6 @@ public class TimePartitionUtils {
7471
}
7572
}
7673

77-
public static TTimePartitionSlot getCurrentTimePartitionSlot() {
78-
if ("ms".equals(timestampPrecision)) {
79-
return getTimePartitionSlot(System.currentTimeMillis());
80-
} else if ("us".equals(timestampPrecision)) {
81-
return getTimePartitionSlot(System.currentTimeMillis() * 1000);
82-
} else {
83-
return getTimePartitionSlot(System.currentTimeMillis() * 1000_000);
84-
}
85-
}
86-
8774
public static TTimePartitionSlot getTimePartitionSlot(long time) {
8875
TTimePartitionSlot timePartitionSlot = new TTimePartitionSlot();
8976
timePartitionSlot.setStartTime(getTimePartitionLowerBound(time));

0 commit comments

Comments
 (0)