Skip to content

Commit 6ecb88d

Browse files
authored
[To rel/0.13][IOTDB-4615] TTL supports timestamp precision (apache#7580)
1 parent 504b543 commit 6ecb88d

File tree

28 files changed

+178
-161
lines changed

28 files changed

+178
-161
lines changed

cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.exception.metadata.MetadataException;
2626
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
2727
import org.apache.iotdb.db.metadata.path.PartialPath;
28+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
2829
import org.apache.iotdb.db.service.IoTDB;
2930

3031
import org.apache.commons.collections4.map.MultiKeyMap;
@@ -139,8 +140,8 @@ default MultiKeyMap<Long, PartitionGroup> partitionByPathRangeTime(
139140

140141
MultiKeyMap<Long, PartitionGroup> timeRangeMapRaftGroup = new MultiKeyMap<>();
141142
PartialPath storageGroup = IoTDB.metaManager.getBelongedStorageGroup(path);
142-
startTime = StorageEngine.convertMilliWithPrecision(startTime);
143-
endTime = StorageEngine.convertMilliWithPrecision(endTime);
143+
startTime = DateTimeUtils.convertMilliTimeWithPrecision(startTime);
144+
endTime = DateTimeUtils.convertMilliTimeWithPrecision(endTime);
144145
while (startTime <= endTime) {
145146
long nextTime = (startTime / partitionInterval + 1) * partitionInterval;
146147
timeRangeMapRaftGroup.put(

server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private IoTDBConstant() {}
9090

9191
public static final String COLUMN_STORAGE_GROUP = "storage group";
9292
public static final String COLUMN_LOCK_INFO = "lock holder";
93-
public static final String COLUMN_TTL = "ttl";
93+
public static final String COLUMN_TTL = "ttl(ms)";
9494

9595
public static final String COLUMN_TASK_ID = "task id";
9696
public static final String COLUMN_SUBMIT_TIME = "submit time";

server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.iotdb.db.engine.compaction.cross.CrossCompactionStrategy;
2525
import org.apache.iotdb.db.engine.compaction.inner.InnerCompactionStrategy;
2626
import org.apache.iotdb.db.exception.query.QueryProcessException;
27-
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
27+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
2828
import org.apache.iotdb.db.service.metrics.MetricService;
2929
import org.apache.iotdb.external.api.IPropertiesLoader;
3030
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
@@ -1517,7 +1517,7 @@ private void loadCQProps(Properties properties) {
15171517
}
15181518

15191519
conf.setContinuousQueryMinimumEveryInterval(
1520-
DatetimeUtils.convertDurationStrToLong(
1520+
DateTimeUtils.convertDurationStrToLong(
15211521
properties.getProperty("continuous_query_minimum_every_interval", "1s"),
15221522
conf.getTimestampPrecision()));
15231523

server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
6565
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
6666
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
67+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
6768
import org.apache.iotdb.db.rescon.SystemInfo;
6869
import org.apache.iotdb.db.service.IService;
6970
import org.apache.iotdb.db.service.IoTDB;
@@ -254,7 +255,7 @@ public static StorageEngine getInstance() {
254255

255256
private static void initTimePartition() {
256257
timePartitionInterval =
257-
convertMilliWithPrecision(
258+
DateTimeUtils.convertMilliTimeWithPrecision(
258259
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval() * 1000L);
259260
}
260261

@@ -297,22 +298,6 @@ public static void transmitOperationSync(PhysicalPlan physicalPlan) {
297298
}
298299
}
299300

300-
public static long convertMilliWithPrecision(long milliTime) {
301-
long result = milliTime;
302-
String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
303-
switch (timePrecision) {
304-
case "ns":
305-
result = milliTime * 1000_000L;
306-
break;
307-
case "us":
308-
result = milliTime * 1000L;
309-
break;
310-
default:
311-
break;
312-
}
313-
return result;
314-
}
315-
316301
public static String getDeviceNameByPlan(PhysicalPlan plan) {
317302
if (plan instanceof InsertPlan) {
318303
InsertPlan physicalPlan = (InsertPlan) plan;
@@ -342,6 +327,9 @@ public static long getTimePartitionInterval() {
342327
@TestOnly
343328
public static void setTimePartitionInterval(long timePartitionInterval) {
344329
StorageEngine.timePartitionInterval = timePartitionInterval;
330+
if (timePartitionInterval == -1) {
331+
initTimePartition();
332+
}
345333
}
346334

347335
public static long getTimePartition(long time) {
@@ -787,7 +775,7 @@ public VirtualStorageGroupProcessor buildNewStorageGroupProcessor(
787775
virtualStorageGroupId,
788776
fileFlushPolicy,
789777
storageGroupMNode.getFullPath());
790-
processor.setDataTTL(storageGroupMNode.getDataTTL());
778+
processor.setDataTTLWithTimePrecisionCheck(storageGroupMNode.getDataTTL());
791779
processor.setCustomFlushListeners(customFlushListeners);
792780
processor.setCustomCloseFileListeners(customCloseFileListeners);
793781
return processor;

server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.iotdb.db.engine.StorageEngine;
2727
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
2828
import org.apache.iotdb.db.metadata.path.PartialPath;
29-
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
29+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
3030
import org.apache.iotdb.tsfile.utils.FilePathUtils;
3131

3232
import org.slf4j.Logger;
@@ -373,7 +373,7 @@ public void checkArchivingTasks() {
373373

374374
for (ArchivingTask task : archivingTasks) {
375375

376-
if (task.getStartTime() - DatetimeUtils.currentTime() <= 0
376+
if (task.getStartTime() - DateTimeUtils.currentTime() <= 0
377377
&& task.getStatus() == ArchivingTask.ArchivingTaskStatus.READY) {
378378

379379
// storage group has no data

server/src/main/java/org/apache/iotdb/db/engine/archiving/ArchivingTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2424
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
2525
import org.apache.iotdb.db.metadata.path.PartialPath;
26-
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
26+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
2727
import org.apache.iotdb.tsfile.utils.FilePathUtils;
2828
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
2929

@@ -76,7 +76,7 @@ public ArchivingTask(
7676
this.targetDir = targetDir;
7777
this.ttl = ttl;
7878
this.startTime = startTime;
79-
this.submitTime = DatetimeUtils.currentTime();
79+
this.submitTime = DateTimeUtils.currentTime();
8080
}
8181

8282
public ArchivingTask(

server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
2828
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
2929
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
30-
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
30+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
3131
import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
3232
import org.apache.iotdb.db.service.IService;
3333
import org.apache.iotdb.db.service.ServiceType;
@@ -49,7 +49,7 @@ public class ContinuousQueryService implements IService {
4949

5050
private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
5151

52-
private static final long SYSTEM_STARTUP_TIME = DatetimeUtils.currentTime();
52+
private static final long SYSTEM_STARTUP_TIME = DateTimeUtils.currentTime();
5353

5454
private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
5555
ContinuousQueryTaskPoolManager.getInstance();
@@ -130,7 +130,7 @@ public void start() throws StartupException {
130130
this::checkAndSubmitTasks,
131131
0,
132132
TASK_SUBMIT_CHECK_INTERVAL,
133-
DatetimeUtils.timestampPrecisionStringToTimeUnit(
133+
DateTimeUtils.timestampPrecisionStringToTimeUnit(
134134
IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
135135

136136
LOGGER.info("Continuous query service started.");
@@ -157,7 +157,7 @@ private long calculateNextExecutionTimestamp(
157157
}
158158

159159
private void checkAndSubmitTasks() {
160-
long currentTimestamp = DatetimeUtils.currentTime();
160+
long currentTimestamp = DateTimeUtils.currentTime();
161161
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
162162
long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
163163
while (currentTimestamp >= nextExecutionTimestamp) {
@@ -242,7 +242,7 @@ private void doRegister(CreateContinuousQueryPlan plan) {
242242
continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
243243
nextExecutionTimestamps.put(
244244
plan.getContinuousQueryName(),
245-
calculateNextExecutionTimestamp(plan, DatetimeUtils.currentTime()));
245+
calculateNextExecutionTimestamp(plan, DateTimeUtils.currentTime()));
246246
}
247247

248248
@TestOnly

server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.engine.querycontext;
2121

2222
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
23+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
2324
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
2425
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
2526
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
@@ -77,9 +78,9 @@ public void setDataTTL(long dataTTL) {
7778
public Filter updateFilterUsingTTL(Filter filter) {
7879
if (dataTTL != Long.MAX_VALUE) {
7980
if (filter != null) {
80-
filter = new AndFilter(filter, TimeFilter.gtEq(System.currentTimeMillis() - dataTTL));
81+
filter = new AndFilter(filter, TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL));
8182
} else {
82-
filter = TimeFilter.gtEq(System.currentTimeMillis() - dataTTL);
83+
filter = TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL);
8384
}
8485
}
8586
return filter;

server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
3333
import org.apache.iotdb.db.exception.PartitionViolationException;
3434
import org.apache.iotdb.db.metadata.path.PartialPath;
35+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
3536
import org.apache.iotdb.db.query.filter.TsFileFilter;
3637
import org.apache.iotdb.db.service.UpgradeSevice;
3738
import org.apache.iotdb.db.utils.TestOnly;
@@ -769,7 +770,7 @@ public boolean isSatisfied(
769770

770771
/** @return whether the given time falls in ttl */
771772
private boolean isAlive(long time, long dataTTL) {
772-
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
773+
return dataTTL == Long.MAX_VALUE || (DateTimeUtils.currentTime() - time) <= dataTTL;
773774
}
774775

775776
public void setProcessor(TsFileProcessor processor) {

server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
6464
import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
6565
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
66+
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
6667
import org.apache.iotdb.db.query.context.QueryContext;
6768
import org.apache.iotdb.db.query.control.FileReaderManager;
6869
import org.apache.iotdb.db.query.control.QueryFileManager;
@@ -876,7 +877,7 @@ public void insert(InsertRowPlan insertRowPlan)
876877
throws WriteProcessException, TriggerExecutionException {
877878
// reject insertions that are out of ttl
878879
if (!isAlive(insertRowPlan.getTime())) {
879-
throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
880+
throw new OutOfTTLException(insertRowPlan.getTime(), (DateTimeUtils.currentTime() - dataTTL));
880881
}
881882
writeLock("InsertRow");
882883
try {
@@ -1030,7 +1031,7 @@ public void insertTablet(InsertTabletPlan insertTabletPlan)
10301031

10311032
/** @return whether the given time falls in ttl */
10321033
private boolean isAlive(long time) {
1033-
return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL;
1034+
return dataTTL == Long.MAX_VALUE || (DateTimeUtils.currentTime() - time) <= dataTTL;
10341035
}
10351036

10361037
/**
@@ -1485,7 +1486,7 @@ public synchronized void checkFilesTTL() {
14851486
logicalStorageGroupName + "-" + virtualStorageGroupId);
14861487
return;
14871488
}
1488-
long ttlLowerBound = System.currentTimeMillis() - dataTTL;
1489+
long ttlLowerBound = DateTimeUtils.currentTime() - dataTTL;
14891490
logger.debug(
14901491
"{}: TTL removing files before {}",
14911492
logicalStorageGroupName + "-" + virtualStorageGroupId,
@@ -1559,11 +1560,13 @@ public void checkArchivingTask(ArchivingTask task) {
15591560
logicalStorageGroupName + "-" + virtualStorageGroupId);
15601561
return;
15611562
}
1562-
long ttlLowerBound = System.currentTimeMillis() - task.getTTL();
1563+
1564+
long ttlLowerBound =
1565+
DateTimeUtils.currentTime() - DateTimeUtils.convertMilliTimeWithPrecision(task.getTTL());
15631566
logger.debug(
15641567
"{}: Archiving files before {}",
15651568
logicalStorageGroupName + "-" + virtualStorageGroupId,
1566-
new Date(ttlLowerBound));
1569+
DateTimeUtils.convertMillsecondToZonedDateTime(ttlLowerBound));
15671570

15681571
// copy to avoid concurrent modification of deletion
15691572
List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true));
@@ -1864,7 +1867,7 @@ private List<TsFileResource> getFileResourceListForQuery(
18641867
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
18651868

18661869
long timeLowerBound =
1867-
dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long.MIN_VALUE;
1870+
dataTTL != Long.MAX_VALUE ? DateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE;
18681871
context.setQueryTimeLowerBound(timeLowerBound);
18691872

18701873
// for upgrade files and old files must be closed
@@ -2997,6 +3000,13 @@ public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() {
29973000
return workUnsequenceTsFileProcessors.values();
29983001
}
29993002

3003+
public void setDataTTLWithTimePrecisionCheck(long dataTTL) {
3004+
if (dataTTL != Long.MAX_VALUE) {
3005+
dataTTL = DateTimeUtils.convertMilliTimeWithPrecision(dataTTL);
3006+
}
3007+
this.dataTTL = dataTTL;
3008+
}
3009+
30003010
public void setDataTTL(long dataTTL) {
30013011
this.dataTTL = dataTTL;
30023012
}

0 commit comments

Comments
 (0)