Skip to content

Commit 205e373

Browse files
authored
Refactored the schema cache (apache#16446)
* partial * Update pom.xml
1 parent 21c1893 commit 205e373

File tree

7 files changed

+112
-94
lines changed

7 files changed

+112
-94
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/DeviceSchemaInfo.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,12 @@
3030
import java.util.Set;
3131
import java.util.stream.Collectors;
3232

33-
import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
34-
3533
public class DeviceSchemaInfo {
3634

37-
private PartialPath devicePath;
38-
private boolean isAligned;
39-
private List<IMeasurementSchemaInfo> measurementSchemaInfoList;
40-
private int templateId = NON_TEMPLATE;
41-
42-
private DeviceSchemaInfo() {}
35+
private final PartialPath devicePath;
36+
private final boolean isAligned;
37+
private final List<IMeasurementSchemaInfo> measurementSchemaInfoList;
38+
private final int templateId;
4339

4440
public DeviceSchemaInfo(
4541
PartialPath devicePath,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ private long evictOneCacheEntry() {
224224
final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
225225
firstKeyMap.get(belongedGroup.getFirstKey());
226226
if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
227+
// The removal is non-atomic, but it's ok because it's just a cache and does not affect the
228+
// consistency if you evicts some entries being added
227229
if (Objects.nonNull(firstKeyMap.remove(belongedGroup.getFirstKey()))) {
228230
memory +=
229231
sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public ClusterSchemaTree fetchSchema(
125125
Set<String> storageGroupSet = new HashSet<>();
126126
if (!explicitDevicePatternList.isEmpty()) {
127127
for (PartialPath explicitDevicePattern : explicitDevicePatternList) {
128-
cachedSchema = schemaCache.getMatchedSchemaWithTemplate(explicitDevicePattern);
128+
cachedSchema = schemaCache.getMatchedTemplateSchema(explicitDevicePattern);
129129
if (cachedSchema.isEmpty()) {
130130
isAllCached = false;
131131
break;
@@ -144,7 +144,7 @@ public ClusterSchemaTree fetchSchema(
144144
continue;
145145
}
146146
cachedSchema =
147-
schemaCache.getMatchedSchemaWithoutTemplate(new MeasurementPath(fullPath.getNodes()));
147+
schemaCache.getMatchedNormalSchema(new MeasurementPath(fullPath.getNodes()));
148148
if (cachedSchema.isEmpty()) {
149149
isAllCached = false;
150150
break;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2800,12 +2800,11 @@ private UpdateLastCacheOperator createUpdateLastCacheOperator(
28002800
final boolean isNeedUpdateLastCache = context.isNeedUpdateLastCache();
28012801
if (isNeedUpdateLastCache) {
28022802
TreeDeviceSchemaCacheManager.getInstance()
2803-
.updateLastCache(
2803+
.declareLastCache(
28042804
((DataDriverContext) operatorContext.getDriverContext())
28052805
.getDataRegion()
28062806
.getDatabaseName(),
2807-
fullPath,
2808-
false);
2807+
fullPath);
28092808
}
28102809

28112810
return Objects.isNull(node.getOutputViewPath())
@@ -2857,13 +2856,12 @@ private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator(
28572856

28582857
for (int i = 0; i < size; ++i) {
28592858
TreeDeviceSchemaCacheManager.getInstance()
2860-
.updateLastCache(
2859+
.declareLastCache(
28612860
databaseName,
28622861
new MeasurementPath(
28632862
devicePath.concatNode(unCachedPath.getMeasurementList().get(i)),
28642863
unCachedPath.getSchemaList().get(i),
2865-
true),
2866-
false);
2864+
true));
28672865
}
28682866
}
28692867

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,12 @@ public TSDataType getDataType() {
8282

8383
private static final Optional<Pair<OptionalLong, TsPrimitiveType[]>> HIT_AND_ALL_NULL =
8484
Optional.of(new Pair<>(OptionalLong.empty(), null));
85+
86+
/** This means that the tv pair has been put, and the value is null */
8587
public static final TimeValuePair EMPTY_TIME_VALUE_PAIR =
8688
new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
89+
90+
/** This means that the tv pair has been declared, and is ready for the next put. */
8791
private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR =
8892
new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
8993

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceSchemaCacheManager.java

Lines changed: 87 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,14 @@ public ClusterSchemaTree get(final PartialPath devicePath, final String[] measur
128128
* @param devicePath full path of the device
129129
* @return empty if cache miss or the device path is not a template activated path
130130
*/
131-
public ClusterSchemaTree getMatchedSchemaWithTemplate(final PartialPath devicePath) {
131+
public ClusterSchemaTree getMatchedTemplateSchema(final PartialPath devicePath) {
132132
final ClusterSchemaTree tree = new ClusterSchemaTree();
133133
final IDeviceSchema schema = tableDeviceSchemaCache.getDeviceSchema(devicePath.getNodes());
134134
if (!(schema instanceof TreeDeviceTemplateSchema)) {
135135
return tree;
136136
}
137137
final TreeDeviceTemplateSchema treeSchema = (TreeDeviceTemplateSchema) schema;
138-
Template template = templateManager.getTemplate(treeSchema.getTemplateId());
138+
final Template template = templateManager.getTemplate(treeSchema.getTemplateId());
139139
tree.appendTemplateDevice(devicePath, template.isDirectAligned(), template.getId(), template);
140140
tree.setDatabases(Collections.singleton(treeSchema.getDatabase()));
141141
return tree;
@@ -147,7 +147,7 @@ public ClusterSchemaTree getMatchedSchemaWithTemplate(final PartialPath devicePa
147147
* @param fullPath full path
148148
* @return empty if cache miss
149149
*/
150-
public ClusterSchemaTree getMatchedSchemaWithoutTemplate(final PartialPath fullPath) {
150+
public ClusterSchemaTree getMatchedNormalSchema(final PartialPath fullPath) {
151151
final ClusterSchemaTree tree = new ClusterSchemaTree();
152152
final IDeviceSchema schema =
153153
tableDeviceSchemaCache.getDeviceSchema(
@@ -283,60 +283,7 @@ public List<Integer> computeWithTemplate(final ISchemaComputation computation) {
283283
continue;
284284
}
285285
final IMeasurementSchema schema = templateSchema.get(measurements[i]);
286-
computation.computeMeasurement(
287-
i,
288-
new IMeasurementSchemaInfo() {
289-
@Override
290-
public String getName() {
291-
return schema.getMeasurementName();
292-
}
293-
294-
@Override
295-
public IMeasurementSchema getSchema() {
296-
if (isLogicalView()) {
297-
return new LogicalViewSchema(
298-
schema.getMeasurementName(), ((LogicalViewSchema) schema).getExpression());
299-
} else {
300-
return this.getSchemaAsMeasurementSchema();
301-
}
302-
}
303-
304-
@Override
305-
public MeasurementSchema getSchemaAsMeasurementSchema() {
306-
return new MeasurementSchema(
307-
schema.getMeasurementName(),
308-
schema.getType(),
309-
schema.getEncodingType(),
310-
schema.getCompressor());
311-
}
312-
313-
@Override
314-
public LogicalViewSchema getSchemaAsLogicalViewSchema() {
315-
throw new RuntimeException(
316-
new UnsupportedOperationException(
317-
"Function getSchemaAsLogicalViewSchema is not supported in DeviceUsingTemplateSchemaCache."));
318-
}
319-
320-
@Override
321-
public Map<String, String> getTagMap() {
322-
return null;
323-
}
324-
325-
@Override
326-
public Map<String, String> getAttributeMap() {
327-
return null;
328-
}
329-
330-
@Override
331-
public String getAlias() {
332-
return null;
333-
}
334-
335-
@Override
336-
public boolean isLogicalView() {
337-
return schema.isLogicalView();
338-
}
339-
});
286+
computation.computeMeasurement(i, new WrappedSchemaInfo(schema));
340287
}
341288
return indexOfMissingMeasurements;
342289
}
@@ -399,33 +346,46 @@ public void updateLastCacheIfExists(
399346
*
400347
* <p>Note: The query shall put the {@link TableDeviceLastCache} twice:
401348
*
402-
* <p>- First time set the "isCommit" to {@code false} before the query accesses data. It is just
403-
* to allow the writing to update the cache, then avoid that the query put a stale value to cache
404-
* and break the consistency. WARNING: The writing may temporarily put a stale value in cache if a
405-
* stale value is written, but it won't affect the eventual consistency.
349+
* <p>- First time call this before the query accesses data. It is just to allow the writing to
350+
* update the cache, then avoid that the query put a stale value to cache and break the
351+
* consistency. WARNING: The writing may temporarily put a stale value in cache if a stale value
352+
* is written, but it won't affect the eventual consistency.
406353
*
407354
* <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
408355
* #updateLastCacheIfExists(String, IDeviceID, String[], TimeValuePair[], boolean,
409356
* IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or contain {@code null},
410357
* if the measurement is with all {@code null}s, its {@link TimeValuePair} shall be {@link
411358
* TableDeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to update time column.
412359
*
360+
* @param database the device's database, WITH "root"
361+
* @param measurementPath the fetched {@link MeasurementPath}
362+
*/
363+
public void declareLastCache(final String database, final MeasurementPath measurementPath) {
364+
tableDeviceSchemaCache.updateLastCache(
365+
database,
366+
measurementPath.getIDeviceID(),
367+
new String[] {measurementPath.getMeasurement()},
368+
null,
369+
measurementPath.isUnderAlignedEntity(),
370+
new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
371+
true);
372+
}
373+
374+
/**
375+
* Update the {@link TableDeviceLastCache} on query in tree model.
376+
*
413377
* <p>If the query has ended abnormally, it shall call this to invalidate the entry it has pushed
414-
* in the first time, to avoid the stale writing damaging the eventual consistency. In this case
415-
* and the "isInvalidate" shall be {@code true}.
378+
* in the first time, to avoid the stale writing damaging the eventual consistency.
416379
*
417380
* @param database the device's database, WITH "root"
418381
* @param measurementPath the fetched {@link MeasurementPath}
419-
* @param isInvalidate {@code true} if invalidate the first pushed cache, or {@code null} for the
420-
* first fetch.
421382
*/
422-
public void updateLastCache(
423-
final String database, final MeasurementPath measurementPath, final boolean isInvalidate) {
383+
public void invalidateLastCache(final String database, final MeasurementPath measurementPath) {
424384
tableDeviceSchemaCache.updateLastCache(
425385
database,
426386
measurementPath.getIDeviceID(),
427387
new String[] {measurementPath.getMeasurement()},
428-
isInvalidate ? new TimeValuePair[] {null} : null,
388+
new TimeValuePair[] {null},
429389
measurementPath.isUnderAlignedEntity(),
430390
new IMeasurementSchema[] {measurementPath.getMeasurementSchema()},
431391
true);
@@ -446,4 +406,63 @@ public void invalidate(final List<MeasurementPath> partialPathList) {
446406
public void cleanUp() {
447407
tableDeviceSchemaCache.invalidateAll();
448408
}
409+
410+
private static class WrappedSchemaInfo implements IMeasurementSchemaInfo {
411+
private final IMeasurementSchema schema;
412+
413+
public WrappedSchemaInfo(final IMeasurementSchema schema) {
414+
this.schema = schema;
415+
}
416+
417+
@Override
418+
public String getName() {
419+
return schema.getMeasurementName();
420+
}
421+
422+
@Override
423+
public IMeasurementSchema getSchema() {
424+
if (isLogicalView()) {
425+
return new LogicalViewSchema(
426+
schema.getMeasurementName(), ((LogicalViewSchema) schema).getExpression());
427+
} else {
428+
return this.getSchemaAsMeasurementSchema();
429+
}
430+
}
431+
432+
@Override
433+
public MeasurementSchema getSchemaAsMeasurementSchema() {
434+
return new MeasurementSchema(
435+
schema.getMeasurementName(),
436+
schema.getType(),
437+
schema.getEncodingType(),
438+
schema.getCompressor());
439+
}
440+
441+
@Override
442+
public LogicalViewSchema getSchemaAsLogicalViewSchema() {
443+
throw new RuntimeException(
444+
new UnsupportedOperationException(
445+
"Function getSchemaAsLogicalViewSchema is not supported in DeviceUsingTemplateSchemaCache."));
446+
}
447+
448+
@Override
449+
public Map<String, String> getTagMap() {
450+
return null;
451+
}
452+
453+
@Override
454+
public Map<String, String> getAttributeMap() {
455+
return null;
456+
}
457+
458+
@Override
459+
public String getAlias() {
460+
return null;
461+
}
462+
463+
@Override
464+
public boolean isLogicalView() {
465+
return schema.isLogicalView();
466+
}
467+
}
449468
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,10 @@ public void testUpdateLastCache() throws IllegalPathException {
197197

198198
final TimeValuePair tv1 = new TimeValuePair(1, new TsPrimitiveType.TsInt(1));
199199

200-
treeDeviceSchemaCacheManager.updateLastCache(
201-
database, new MeasurementPath(device.concatNode("s1"), s1), false);
202-
treeDeviceSchemaCacheManager.updateLastCache(
203-
database, new MeasurementPath(device.concatNode("s3"), s3), false);
200+
treeDeviceSchemaCacheManager.declareLastCache(
201+
database, new MeasurementPath(device.concatNode("s1"), s1));
202+
treeDeviceSchemaCacheManager.declareLastCache(
203+
database, new MeasurementPath(device.concatNode("s3"), s3));
204204

205205
// Simulate "s1" revert when the query has failed in calculation
206206
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
@@ -213,8 +213,8 @@ public void testUpdateLastCache() throws IllegalPathException {
213213
},
214214
false,
215215
new MeasurementSchema[] {s1});
216-
treeDeviceSchemaCacheManager.updateLastCache(
217-
database, new MeasurementPath(device.concatNode("s1"), s1), true);
216+
treeDeviceSchemaCacheManager.invalidateLastCache(
217+
database, new MeasurementPath(device.concatNode("s1"), s1));
218218

219219
// "s2" shall be null since the "null" timeValuePair has not been put
220220
treeDeviceSchemaCacheManager.updateLastCacheIfExists(
@@ -294,12 +294,11 @@ public void testPut() throws Exception {
294294
new MeasurementPath("root.sg1.d3.s1", TSDataType.FLOAT));
295295
treeDeviceSchemaCacheManager.put(clusterSchemaTree);
296296
final ClusterSchemaTree d1Tree =
297-
treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new PartialPath("root.sg1.d1"));
297+
treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new PartialPath("root.sg1.d1"));
298298
final ClusterSchemaTree d2Tree =
299-
treeDeviceSchemaCacheManager.getMatchedSchemaWithTemplate(new PartialPath("root.sg1.d2"));
299+
treeDeviceSchemaCacheManager.getMatchedTemplateSchema(new PartialPath("root.sg1.d2"));
300300
final ClusterSchemaTree d3Tree =
301-
treeDeviceSchemaCacheManager.getMatchedSchemaWithoutTemplate(
302-
new MeasurementPath("root.sg1.d3.s1"));
301+
treeDeviceSchemaCacheManager.getMatchedNormalSchema(new MeasurementPath("root.sg1.d3.s1"));
303302
List<MeasurementPath> measurementPaths = d1Tree.searchMeasurementPaths(ALL_MATCH_PATTERN).left;
304303
Assert.assertEquals(2, measurementPaths.size());
305304
for (final MeasurementPath measurementPath : measurementPaths) {

0 commit comments

Comments
 (0)