Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class DataNodeMemoryConfig {
private long maxBytesPerFragmentInstance =
Runtime.getRuntime().maxMemory() * 3 / 10 * 200 / 1001 / queryThreadCount;

/** Max bytes of cached device metadata index entry bytes per FragmentInstance */
private long maxCachedDeviceMetadataIndexEntryBytesPerFI = 32 * 1024 * 1024;

/** The memory manager of on heap */
private MemoryManager onHeapMemoryManager;

Expand Down Expand Up @@ -410,6 +413,11 @@ private void initQueryEngineMemoryAllocate(
setQueryThreadCount(
Integer.parseInt(
properties.getProperty("query_thread_count", Integer.toString(getQueryThreadCount()))));
setMaxCachedDeviceMetadataIndexEntryBytesPerFI(
Long.parseLong(
properties.getProperty(
"max_cached_device_metadata_index_entry_bytes_per_fi",
Long.toString(getMaxCachedDeviceMetadataIndexEntryBytesPerFI()))));

if (getQueryThreadCount() <= 0) {
setQueryThreadCount(Runtime.getRuntime().availableProcessors());
Expand Down Expand Up @@ -536,6 +544,15 @@ public void setBufferedArraysMemoryProportion(double bufferedArraysMemoryProport
this.bufferedArraysMemoryProportion = bufferedArraysMemoryProportion;
}

public long getMaxCachedDeviceMetadataIndexEntryBytesPerFI() {
return maxCachedDeviceMetadataIndexEntryBytesPerFI;
}

public void setMaxCachedDeviceMetadataIndexEntryBytesPerFI(
long maxCachedDeviceMetadataIndexEntryBytesPerFI) {
this.maxCachedDeviceMetadataIndexEntryBytesPerFI = maxCachedDeviceMetadataIndexEntryBytesPerFI;
}

public double getDevicePathCacheProportion() {
return devicePathCacheProportion;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.fragment;

import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.exception.MemoryNotEnoughException;
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;

import org.apache.tsfile.exception.TsFileRuntimeException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.query.DeviceMetadataIndexEntriesQueryResult;
import org.apache.tsfile.utils.Pair;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;

public class DeviceMetadataIndexEntryCache {
private static final DataNodeMemoryConfig config =
IoTDBDescriptor.getInstance().getMemoryConfig();
private final FragmentInstanceContext context;
private TreeMap<IDeviceID, Integer> deviceIndexMap;
private final Map<String, DeviceMetadataIndexEntriesQueryResult>
deviceMetadataIndexNodeOffsetsCache = new ConcurrentHashMap<>();
private List<IDeviceID> sortedDevices;
private int[] deviceIdxArr;
private final AtomicLong reservedMemory;
private final LongConsumer ioSizeRecorder;

public DeviceMetadataIndexEntryCache(FragmentInstanceContext context) {
this.context = context;
this.reservedMemory = new AtomicLong(0);
this.ioSizeRecorder =
context.getQueryStatistics().getLoadTimeSeriesMetadataActualIOSize()::addAndGet;
}

public void addDevices(AbstractDataSourceOperator operator, List<DeviceEntry> deviceEntries) {
deviceIndexMap = deviceIndexMap == null ? new TreeMap<>(IDeviceID::compareTo) : deviceIndexMap;
int[] operatorDeviceIndexArr = new int[deviceEntries.size()];
for (int i = 0; i < deviceEntries.size(); i++) {
int idx =
deviceIndexMap.computeIfAbsent(
deviceEntries.get(i).getDeviceID(), k -> deviceIndexMap.size());
operatorDeviceIndexArr[i] = idx;
}
operator.setDeviceIndexArr(operatorDeviceIndexArr);
}

public void addDevice(AbstractDataSourceOperator operator, IDeviceID deviceID) {
deviceIndexMap = deviceIndexMap == null ? new TreeMap<>() : deviceIndexMap;
int idx = deviceIndexMap.computeIfAbsent(deviceID, k -> deviceIndexMap.size());
operator.setDeviceIndexArr(new int[] {idx});
}

// Pair.right represents whether the device may exist in the file
public Pair<long[], Boolean> getCachedDeviceMetadataIndexNodeOffset(
IDeviceID device, int deviceIndex, String filePath, boolean ignoreNotExists)
throws IOException {
// cache is disabled
if (deviceIndex < 0) {
return new Pair<>(null, true);
}
if (deviceIndexMap != null && deviceIndexMap.size() == 1) {
return new Pair<>(null, true);
}
// not cached
DeviceMetadataIndexEntriesQueryResult resourceCache = loadOffsetsToCache(filePath);
if (resourceCache == null) {
return new Pair<>(null, true);
}
int indexAfterSort = deviceIdxArr[deviceIndex];
long[] result = resourceCache.getDeviceMetadataIndexNodeOffset(indexAfterSort);
// the device does not exist in the file
if (result == null) {
if (!ignoreNotExists) {
throw new IOException("Device {" + device + "} is not in tsFileMetaData of " + filePath);
}
return new Pair<>(null, false);
}
return new Pair<>(result, true);
}

private DeviceMetadataIndexEntriesQueryResult loadOffsetsToCache(String filePath)
throws IOException {
TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, true);
IDeviceID firstDevice = getSortedDevices().get(0);
return deviceMetadataIndexNodeOffsetsCache.computeIfAbsent(
filePath,
k -> {
if (reservedMemory.get() >= config.getMaxCachedDeviceMetadataIndexEntryBytesPerFI()) {
return null;
}
DeviceMetadataIndexEntriesQueryResult result;
try {
result =
reader.getDeviceMetadataIndexNodeOffsets(
firstDevice.isTableModel() ? firstDevice.getTableName() : null,
sortedDevices,
ioSizeRecorder);
long memCost = result.ramBytesUsed();
context.getMemoryReservationContext().reserveMemoryCumulatively(memCost);
reservedMemory.addAndGet(memCost);
} catch (IOException e) {
throw new TsFileRuntimeException(e);
} catch (MemoryNotEnoughException ignored) {
return null;
}
return result;
});
}

private synchronized List<IDeviceID> getSortedDevices() {
if (deviceIdxArr == null) {
sort();
}
return sortedDevices;
}

private void sort() {
deviceIdxArr = new int[deviceIndexMap.size()];
sortedDevices = new ArrayList<>(deviceIndexMap.size());
int i = 0;
for (Map.Entry<IDeviceID, Integer> entry : deviceIndexMap.entrySet()) {
sortedDevices.add(entry.getKey());
deviceIdxArr[entry.getValue()] = i++;
}
deviceIndexMap = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public class FragmentInstanceContext extends QueryContext {
private long closedSeqFileNum = 0;
private long closedUnseqFileNum = 0;

private DeviceMetadataIndexEntryCache metadataIndexEntryCache =
new DeviceMetadataIndexEntryCache(this);

public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
FragmentInstanceContext instanceContext =
Expand Down Expand Up @@ -327,6 +330,10 @@ private void initialize() {
stateMachine.addStateChangeListener(this::updateStatsIfDone);
}

public DeviceMetadataIndexEntryCache getMetadataIndexEntryCache() {
return metadataIndexEntryCache;
}

private void updateStatsIfDone(FragmentInstanceState newState) {
if (newState.isDone()) {
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -794,6 +801,7 @@ public synchronized void releaseResource() {
dataRegion = null;
globalTimeFilter = null;
sharedQueryDataSource = null;
metadataIndexEntryCache = null;

// record fragment instance execution time and metadata get time to metrics
long durationTime = System.currentTimeMillis() - executionStartTime.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,22 @@ public abstract class AbstractDataSourceOperator extends AbstractSourceOperator
// Using for building result tsBlock
protected TsBlockBuilder resultTsBlockBuilder;

protected int[] deviceIndexInFI;

@Override
public void initQueryDataSource(IQueryDataSource dataSource) {
seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource);
seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource, getCurrentDeviceIndex());
resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
}

public int getCurrentDeviceIndex() {
return deviceIndexInFI[0];
}

public void setDeviceIndexArr(int[] arr) {
this.deviceIndexInFI = arr;
}

@Override
public void close() throws Exception {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static void appendOneColumn(

@Override
public void initQueryDataSource(IQueryDataSource dataSource) {
seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource);
seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource, getCurrentDeviceIndex());
resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ protected AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(
return FileLoaderUtils.loadAlignedTimeSeriesMetadata(
resource,
(AlignedFullPath) seriesPath,
deviceIndexInFI,
context,
scanOptions.getGlobalTimeFilter(),
isSeq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private FileLoaderUtils() {
public static TimeseriesMetadata loadTimeSeriesMetadata(
TsFileResource resource,
NonAlignedFullPath seriesPath,
int deviceIndexInFI,
FragmentInstanceContext context,
Filter globalTimeFilter,
Set<String> allSensors,
Expand All @@ -101,6 +102,7 @@ public static TimeseriesMetadata loadTimeSeriesMetadata(
resource.getTsFileID(),
seriesPath.getDeviceId(),
seriesPath.getMeasurement()),
deviceIndexInFI,
allSensors,
context.ignoreNotExistsDevice()
|| resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE,
Expand Down Expand Up @@ -180,6 +182,7 @@ public static TimeseriesMetadata loadTimeSeriesMetadata(
public static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadata(
TsFileResource resource,
AlignedFullPath alignedPath,
int deviceIndexInFI,
FragmentInstanceContext context,
Filter globalTimeFilter,
boolean isSeq,
Expand All @@ -193,7 +196,12 @@ public static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadata(
if (resource.isClosed()) {
alignedTimeSeriesMetadata =
loadAlignedTimeSeriesMetadataFromDisk(
resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows);
resource,
alignedPath,
deviceIndexInFI,
context,
globalTimeFilter,
ignoreAllNullRows);
} else { // if the tsfile is unclosed, we just get it directly from TsFileResource
loadFromMem = true;
alignedTimeSeriesMetadata =
Expand Down Expand Up @@ -256,6 +264,7 @@ public static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadata(
private static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadataFromDisk(
TsFileResource resource,
AlignedFullPath alignedPath,
int deviceIndexInFI,
FragmentInstanceContext context,
Filter globalTimeFilter,
boolean ignoreAllNullRows)
Expand All @@ -277,6 +286,7 @@ private static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadataFr
cache.get(
filePath,
new TimeSeriesMetadataCacheKey(resource.getTsFileID(), deviceId, ""),
deviceIndexInFI,
allSensors,
context.ignoreNotExistsDevice()
|| resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE,
Expand Down Expand Up @@ -307,6 +317,7 @@ private static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadataFr
filePath,
new TimeSeriesMetadataCacheKey(
resource.getTsFileID(), deviceId, valueMeasurement),
deviceIndexInFI,
allSensors,
context.ignoreNotExistsDevice()
|| resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class SeriesScanUtil implements Accountable {
protected final IFullPath seriesPath;

private final IDeviceID deviceID;
protected int deviceIndexInFI = -1;
protected boolean isAligned = false;
private final TSDataType dataType;

Expand Down Expand Up @@ -171,12 +172,17 @@ public SeriesScanUtil(
versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
}

public void initQueryDataSource(QueryDataSource dataSource) {
initQueryDataSource(dataSource, -1);
}

/**
* Initialize the query data source. This method should be called <b>before any other methods</b>.
*
* @param dataSource the query data source
*/
public void initQueryDataSource(QueryDataSource dataSource) {
public void initQueryDataSource(QueryDataSource dataSource, int currentDeviceIndexInFI) {
this.deviceIndexInFI = currentDeviceIndexInFI;
dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending());
this.dataSource = dataSource;

Expand Down Expand Up @@ -1235,6 +1241,7 @@ protected ITimeSeriesMetadata loadTimeSeriesMetadata(TsFileResource resource, bo
return FileLoaderUtils.loadTimeSeriesMetadata(
resource,
(NonAlignedFullPath) seriesPath,
deviceIndexInFI,
context,
scanOptions.getGlobalTimeFilter(),
scanOptions.getAllSensors(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
// construct AlignedSeriesScanUtil for next device
constructAlignedSeriesScanUtil();
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex());
}

if (currentDeviceIndex >= deviceCount) {
Expand Down Expand Up @@ -717,7 +717,7 @@ private void checkIfAllAggregatorHasFinalResult() {
// construct AlignedSeriesScanUtil for next device
constructAlignedSeriesScanUtil();
queryDataSource.reset();
this.seriesScanUtil.initQueryDataSource(queryDataSource);
this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex());
}

if (currentDeviceIndex >= deviceCount) {
Expand Down Expand Up @@ -762,7 +762,7 @@ public List<TSDataType> getResultDataTypes() {
@Override
public void initQueryDataSource(IQueryDataSource dataSource) {
this.queryDataSource = (QueryDataSource) dataSource;
this.seriesScanUtil.initQueryDataSource(queryDataSource);
this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex());
this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
}

Expand All @@ -783,6 +783,11 @@ public long calculateRetainedSizeAfterCallingNext() {
: 0;
}

@Override
public int getCurrentDeviceIndex() {
return deviceIndexInFI == null ? -1 : deviceIndexInFI[currentDeviceIndex];
}

@Override
public void close() throws Exception {
super.close();
Expand Down
Loading