Skip to content

Commit 9f699e1

Browse files
authored
Implement DataNode cache TableSchema function to prevent OOM (apache#16412)
* Implement DataNode cache TableSchema function to prevent OOM * add config * update * update
1 parent ff98e83 commit 9f699e1

File tree

4 files changed

+46
-1
lines changed

4 files changed

+46
-1
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ public class IoTDBConfig {
193193
/** Minimum ratio of effective information in wal files */
194194
private volatile double walMinEffectiveInfoRatio = 0.1;
195195

196+
private volatile long dataNodeTableSchemaCacheSize = 1 << 20;
197+
196198
/**
197199
* MemTable size threshold for triggering MemTable snapshot in wal. When a memTable's size exceeds
198200
* this, wal can flush this memtable to disk, otherwise wal will snapshot this memtable in wal.
@@ -1924,6 +1926,17 @@ void setWalMinEffectiveInfoRatio(double walMinEffectiveInfoRatio) {
19241926
this.walMinEffectiveInfoRatio = walMinEffectiveInfoRatio;
19251927
}
19261928

1929+
public long getDataNodeTableSchemaCacheSize() {
1930+
return dataNodeTableSchemaCacheSize;
1931+
}
1932+
1933+
public void setDataNodeTableSchemaCacheSize(long dataNodeTableSchemaCacheSize) {
1934+
if (dataNodeTableSchemaCacheSize < 0) {
1935+
return;
1936+
}
1937+
this.dataNodeTableSchemaCacheSize = dataNodeTableSchemaCacheSize;
1938+
}
1939+
19271940
public long getWalMemTableSnapshotThreshold() {
19281941
return walMemTableSnapshotThreshold;
19291942
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,12 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
982982
"coordinator_write_executor_size",
983983
Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
984984

985+
conf.setDataNodeTableSchemaCacheSize(
986+
Long.parseLong(
987+
properties.getProperty(
988+
"data_node_table_schema_cache_max_size_in_bytes",
989+
String.valueOf(conf.getDataNodeTableSchemaCacheSize()))));
990+
985991
// Commons
986992
commonDescriptor.loadCommonProps(properties);
987993
commonDescriptor.initCommonConfigDir(conf.getSystemDir());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status;
6464
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
6565
import org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
66+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
6667
import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
6768
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
6869
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -157,6 +158,8 @@
157158
import org.apache.iotdb.rpc.RpcUtils;
158159
import org.apache.iotdb.rpc.TSStatusCode;
159160

161+
import com.github.benmanes.caffeine.cache.Cache;
162+
import com.github.benmanes.caffeine.cache.Caffeine;
160163
import org.apache.commons.io.FileUtils;
161164
import org.apache.thrift.TException;
162165
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -249,6 +252,15 @@ public class DataRegion implements IDataRegionForQuery {
249252

250253
private static final Logger logger = LoggerFactory.getLogger(DataRegion.class);
251254

255+
// Cache TableSchema to prevent OOM
256+
private static final Cache<String, org.apache.tsfile.file.metadata.TableSchema> SCHEMA_CACHE =
257+
Caffeine.newBuilder()
258+
.maximumWeight(config.getDataNodeTableSchemaCacheSize())
259+
.weigher(
260+
(String k, org.apache.tsfile.file.metadata.TableSchema v) ->
261+
(int) PipeMemoryWeightUtil.calculateTableSchemaBytesUsed(v))
262+
.build();
263+
252264
/**
253265
* A read write lock for guaranteeing concurrent safety when accessing all fields in this class
254266
* (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor,
@@ -1447,7 +1459,16 @@ private void registerToTsFile(InsertNode node, TsFileProcessor tsFileProcessor)
14471459
throw new TableLostRuntimeException(getDatabaseName(), tableName);
14481460
}
14491461
}
1450-
return TableSchema.of(tsTable).toTsFileTableSchemaNoAttribute();
1462+
1463+
org.apache.tsfile.file.metadata.TableSchema tableSchema =
1464+
TableSchema.of(tsTable).toTsFileTableSchemaNoAttribute();
1465+
org.apache.tsfile.file.metadata.TableSchema cachedSchema =
1466+
SCHEMA_CACHE.getIfPresent(tableName);
1467+
if (Objects.equals(cachedSchema, tableSchema)) {
1468+
return cachedSchema;
1469+
}
1470+
SCHEMA_CACHE.put(tableName, tableSchema);
1471+
return tableSchema;
14511472
});
14521473
}
14531474
}

iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,11 @@ device_path_cache_proportion=0.05
807807
# Datatype: double
808808
write_memory_variation_report_proportion=0.001
809809

810+
# The maximum memory size in bytes for DataNode to table schema cache
811+
# effectiveMode: restart
812+
# Datatype: long
813+
data_node_table_schema_cache_max_size_in_bytes=1048576
814+
810815
# When an inserting is rejected, waiting period (in ms) to check system again, 50 by default.
811816
# If the insertion has been rejected and the read load is low, it can be set larger.
812817
# effectiveMode: restart

0 commit comments

Comments
 (0)