Skip to content

Commit 313905d

Browse files
authored
Replace the key in FileReaderManager to TsFileID
1 parent 854f1dc commit 313905d

31 files changed

+120
-89
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,12 @@ public BloomFilter apply(BloomFilterCacheKey bloomFilterCacheKey) {
210210
try {
211211
cacheMiss = true;
212212
TsFileSequenceReader reader =
213-
FileReaderManager.getInstance().get(bloomFilterCacheKey.filePath, true, ioSizeRecorder);
213+
FileReaderManager.getInstance()
214+
.get(
215+
bloomFilterCacheKey.filePath,
216+
bloomFilterCacheKey.tsFileID,
217+
true,
218+
ioSizeRecorder);
214219
return reader.readBloomFilter(ioSizeRecorder);
215220
} catch (IOException e) {
216221
throw new IoTDBIORuntimeException(e);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ public Chunk apply(ChunkCacheKey key) {
307307
try {
308308
cacheMiss = true;
309309
TsFileSequenceReader reader =
310-
FileReaderManager.getInstance().get(key.getFilePath(), key.closed, ioSizeRecorder);
310+
FileReaderManager.getInstance()
311+
.get(key.getFilePath(), key.tsFileID, key.closed, ioSizeRecorder);
311312
Chunk chunk = reader.readMemChunk(key.offsetOfChunkHeader, ioSizeRecorder);
312313
// to save memory footprint, we don't save measurementId in ChunkHeader of Chunk
313314
chunk.getHeader().setMeasurementID(null);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ public TimeseriesMetadata get(
132132

133133
// bloom filter part
134134
TsFileSequenceReader reader =
135-
FileReaderManager.getInstance().get(filePath, true, bloomFilterIoSizeRecorder);
135+
FileReaderManager.getInstance()
136+
.get(filePath, key.tsFileID, true, bloomFilterIoSizeRecorder);
136137
BloomFilter bloomFilter = reader.readBloomFilter(bloomFilterIoSizeRecorder);
137138
queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount().incrementAndGet();
138139
if (bloomFilter != null
@@ -193,7 +194,7 @@ public TimeseriesMetadata get(
193194
loadBloomFilterTime = System.nanoTime() - loadBloomFilterStartTime;
194195
TsFileSequenceReader reader =
195196
FileReaderManager.getInstance()
196-
.get(filePath, true, timeSeriesMetadataIoSizeRecorder);
197+
.get(filePath, key.tsFileID, true, timeSeriesMetadataIoSizeRecorder);
197198
List<TimeseriesMetadata> timeSeriesMetadataList =
198199
reader.readTimeseriesMetadata(
199200
key.device,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2998,7 +2998,7 @@ private void settleTsFileCallBack(
29982998
if (!oldTsFileResource.getTsFile().exists()) {
29992999
tsFileManager.remove(oldTsFileResource, oldTsFileResource.isSeq());
30003000
}
3001-
FileReaderManager.getInstance().closeFileAndRemoveReader(oldTsFileResource.getTsFilePath());
3001+
FileReaderManager.getInstance().closeFileAndRemoveReader(oldTsFileResource.getTsFileID());
30023002
oldTsFileResource.setSettleTsFileCallBack(null);
30033003
SettleService.getINSTANCE().getFilesToBeSettledCount().addAndGet(-1);
30043004
} catch (IOException e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ public MultiTsFileDeviceIterator(
121121
this.tsFileResourcesSortedByDesc, TsFileResource::compareFileCreationOrderByDesc);
122122
for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
123123
TsFileSequenceReader reader =
124-
FileReaderManager.getInstance().get(tsFileResource.getTsFilePath(), true);
124+
FileReaderManager.getInstance()
125+
.get(tsFileResource.getTsFilePath(), tsFileResource.getTsFileID(), true);
125126
readerMap.put(tsFileResource, reader);
126127
deviceIteratorMap.put(tsFileResource, reader.getAllDevicesIteratorWithIsAligned());
127128
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public void write(TsBlock tsBlock, int subTaskId) throws IOException {
7373

7474
@Override
7575
protected TsFileSequenceReader getFileReader(TsFileResource resource) throws IOException {
76-
return FileReaderManager.getInstance().get(resource.getTsFilePath(), true);
76+
return FileReaderManager.getInstance()
77+
.get(resource.getTsFilePath(), resource.getTsFileID(), true);
7778
}
7879

7980
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1825,6 +1825,7 @@ private void processAlignedChunkMetaDataFromFlushedMemTable(
18251825
deviceID,
18261826
measurement,
18271827
filePath,
1828+
tsFileResource.getTsFileID(),
18281829
false,
18291830
valueChunkMetaData.getOffsetOfChunkHeader(),
18301831
valueChunkMetaData.getStatistics(),
@@ -1849,6 +1850,7 @@ private void processChunkMetaDataFromFlushedMemTable(
18491850
deviceID,
18501851
measurement,
18511852
filePath,
1853+
tsFileResource.getTsFileID(),
18521854
false,
18531855
chunkMetadata.getOffsetOfChunkHeader(),
18541856
chunkMetadata.getStatistics()));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/FileReaderManager.java

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.read.control;
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
23+
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
2324
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
2425

2526
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -57,25 +58,25 @@ public class FileReaderManager {
5758
* the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the
5859
* corresponding reader.
5960
*/
60-
private Map<String, TsFileSequenceReader> closedFileReaderMap;
61+
private Map<TsFileID, TsFileSequenceReader> closedFileReaderMap;
6162

6263
/**
6364
* the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap is the
6465
* corresponding reader.
6566
*/
66-
private Map<String, TsFileSequenceReader> unclosedFileReaderMap;
67+
private Map<TsFileID, TsFileSequenceReader> unclosedFileReaderMap;
6768

6869
/**
6970
* the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the
7071
* file's reference count.
7172
*/
72-
private Map<String, AtomicInteger> closedReferenceMap;
73+
private Map<TsFileID, AtomicInteger> closedReferenceMap;
7374

7475
/**
7576
* the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap is the
7677
* file's reference count.
7778
*/
78-
private Map<String, AtomicInteger> unclosedReferenceMap;
79+
private Map<TsFileID, AtomicInteger> unclosedReferenceMap;
7980

8081
private FileReaderManager() {
8182
closedFileReaderMap = new ConcurrentHashMap<>();
@@ -88,14 +89,14 @@ public static FileReaderManager getInstance() {
8889
return FileReaderManagerHelper.INSTANCE;
8990
}
9091

91-
public synchronized void closeFileAndRemoveReader(String filePath) throws IOException {
92-
closedReferenceMap.remove(filePath);
93-
TsFileSequenceReader reader = closedFileReaderMap.remove(filePath);
92+
public synchronized void closeFileAndRemoveReader(TsFileID tsFileID) throws IOException {
93+
closedReferenceMap.remove(tsFileID);
94+
TsFileSequenceReader reader = closedFileReaderMap.remove(tsFileID);
9495
if (reader != null) {
9596
reader.close();
9697
}
97-
unclosedReferenceMap.remove(filePath);
98-
reader = unclosedFileReaderMap.remove(filePath);
98+
unclosedReferenceMap.remove(tsFileID);
99+
reader = unclosedFileReaderMap.remove(tsFileID);
99100
if (reader != null) {
100101
reader.close();
101102
}
@@ -106,35 +107,38 @@ public synchronized void closeFileAndRemoveReader(String filePath) throws IOExce
106107
* exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosing .
107108
* Otherwise a new reader will be created and cached.
108109
*
109-
* @param filePath the path of the file, of which the reader is desired.
110+
* @param filePath the path of the tsfile
111+
* @param tsFileID the id of the tsfile, of which the reader is desired.
110112
* @param isClosed whether the corresponding file still receives insertions or not.
111113
* @return the reader of the file specified by filePath.
112114
* @throws IOException when reader cannot be created.
113115
*/
114116
@SuppressWarnings("squid:S2095")
115-
public synchronized TsFileSequenceReader get(String filePath, boolean isClosed)
117+
public synchronized TsFileSequenceReader get(String filePath, TsFileID tsFileID, boolean isClosed)
116118
throws IOException {
117-
return get(filePath, isClosed, null);
119+
return get(filePath, tsFileID, isClosed, null);
118120
}
119121

120122
/**
121123
* Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
122124
* exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosing .
123125
* Otherwise a new reader will be created and cached.
124126
*
125-
* @param filePath the path of the file, of which the reader is desired.
127+
* @param filePath the path of the tsfile
128+
* @param tsFileID the id of the tsfile, of which the reader is desired.
126129
* @param isClosed whether the corresponding file still receives insertions or not.
127130
* @param ioSizeRecorder can be null
128131
* @return the reader of the file specified by filePath.
129132
* @throws IOException when reader cannot be created.
130133
*/
131134
@SuppressWarnings("squid:S2095")
132135
public synchronized TsFileSequenceReader get(
133-
String filePath, boolean isClosed, LongConsumer ioSizeRecorder) throws IOException {
136+
String filePath, TsFileID tsFileID, boolean isClosed, LongConsumer ioSizeRecorder)
137+
throws IOException {
134138

135-
Map<String, TsFileSequenceReader> readerMap =
139+
Map<TsFileID, TsFileSequenceReader> readerMap =
136140
!isClosed ? unclosedFileReaderMap : closedFileReaderMap;
137-
if (!readerMap.containsKey(filePath)) {
141+
if (!readerMap.containsKey(tsFileID)) {
138142
int currentOpenedReaderCount = readerMap.size();
139143
if (currentOpenedReaderCount >= MAX_CACHED_FILE_SIZE
140144
&& (currentOpenedReaderCount % PRINT_INTERVAL == 0)) {
@@ -149,11 +153,11 @@ public synchronized TsFileSequenceReader get(
149153
// already do the version check in TsFileSequenceReader's constructor
150154
tsFileReader = new TsFileSequenceReader(filePath, ioSizeRecorder);
151155
}
152-
readerMap.put(filePath, tsFileReader);
156+
readerMap.put(tsFileID, tsFileReader);
153157
return tsFileReader;
154158
}
155159

156-
return readerMap.get(filePath);
160+
return readerMap.get(tsFileID);
157161
}
158162

159163
/**
@@ -165,11 +169,11 @@ public void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed)
165169
synchronized (this) {
166170
if (!isClosed) {
167171
unclosedReferenceMap
168-
.computeIfAbsent(tsFile.getTsFilePath(), k -> new AtomicInteger())
172+
.computeIfAbsent(tsFile.getTsFileID(), k -> new AtomicInteger())
169173
.getAndIncrement();
170174
} else {
171175
closedReferenceMap
172-
.computeIfAbsent(tsFile.getTsFilePath(), k -> new AtomicInteger())
176+
.computeIfAbsent(tsFile.getTsFileID(), k -> new AtomicInteger())
173177
.getAndIncrement();
174178
}
175179
}
@@ -181,38 +185,39 @@ public void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed)
181185
*/
182186
public void decreaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
183187
synchronized (this) {
184-
if (!isClosed && unclosedReferenceMap.containsKey(tsFile.getTsFilePath())) {
185-
if (unclosedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0) {
186-
closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), false);
188+
if (!isClosed && unclosedReferenceMap.containsKey(tsFile.getTsFileID())) {
189+
if (unclosedReferenceMap.get(tsFile.getTsFileID()).decrementAndGet() == 0) {
190+
closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), tsFile.getTsFileID(), false);
187191
}
188-
} else if (closedReferenceMap.containsKey(tsFile.getTsFilePath())
189-
&& (closedReferenceMap.get(tsFile.getTsFilePath()).decrementAndGet() == 0)) {
190-
closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), true);
192+
} else if (closedReferenceMap.containsKey(tsFile.getTsFileID())
193+
&& (closedReferenceMap.get(tsFile.getTsFileID()).decrementAndGet() == 0)) {
194+
closeUnUsedReaderAndRemoveRef(tsFile.getTsFilePath(), tsFile.getTsFileID(), true);
191195
}
192196
}
193197
tsFile.readUnlock();
194198
}
195199

196-
private void closeUnUsedReaderAndRemoveRef(String tsFilePath, boolean isClosed) {
197-
Map<String, TsFileSequenceReader> readerMap =
200+
private void closeUnUsedReaderAndRemoveRef(
201+
String tsFilePath, TsFileID tsFileID, boolean isClosed) {
202+
Map<TsFileID, TsFileSequenceReader> readerMap =
198203
isClosed ? closedFileReaderMap : unclosedFileReaderMap;
199-
Map<String, AtomicInteger> refMap = isClosed ? closedReferenceMap : unclosedReferenceMap;
204+
Map<TsFileID, AtomicInteger> refMap = isClosed ? closedReferenceMap : unclosedReferenceMap;
200205
synchronized (this) {
201206
// check ref num again
202-
if (refMap.get(tsFilePath).get() != 0) {
207+
if (refMap.get(tsFileID).get() != 0) {
203208
return;
204209
}
205210

206-
TsFileSequenceReader reader = readerMap.get(tsFilePath);
211+
TsFileSequenceReader reader = readerMap.get(tsFileID);
207212
if (reader != null) {
208213
try {
209214
reader.close();
210215
} catch (IOException e) {
211216
logger.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
212217
}
213218
}
214-
readerMap.remove(tsFilePath);
215-
refMap.remove(tsFilePath);
219+
readerMap.remove(tsFileID);
220+
refMap.remove(tsFileID);
216221
if (resourceLogger.isDebugEnabled()) {
217222
resourceLogger.debug("{} TsFileReader is closed because of no reference.", tsFilePath);
218223
}
@@ -226,10 +231,10 @@ private void closeUnUsedReaderAndRemoveRef(String tsFilePath, boolean isClosed)
226231
* @throws IOException if failed to close file handlers, IOException will be thrown
227232
*/
228233
public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
229-
Iterator<Map.Entry<String, TsFileSequenceReader>> iterator =
234+
Iterator<Map.Entry<TsFileID, TsFileSequenceReader>> iterator =
230235
closedFileReaderMap.entrySet().iterator();
231236
while (iterator.hasNext()) {
232-
Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
237+
Map.Entry<TsFileID, TsFileSequenceReader> entry = iterator.next();
233238
entry.getValue().close();
234239
if (resourceLogger.isDebugEnabled()) {
235240
resourceLogger.debug("{} closedTsFileReader is closed.", entry.getKey());
@@ -239,7 +244,7 @@ public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
239244
}
240245
iterator = unclosedFileReaderMap.entrySet().iterator();
241246
while (iterator.hasNext()) {
242-
Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
247+
Map.Entry<TsFileID, TsFileSequenceReader> entry = iterator.next();
243248
entry.getValue().close();
244249
if (resourceLogger.isDebugEnabled()) {
245250
resourceLogger.debug("{} unclosedTsFileReader is closed.", entry.getKey());
@@ -251,17 +256,17 @@ public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
251256

252257
/** This method is only for unit tests. */
253258
public synchronized boolean contains(TsFileResource tsFile, boolean isClosed) {
254-
return (isClosed && closedFileReaderMap.containsKey(tsFile.getTsFilePath()))
255-
|| (!isClosed && unclosedFileReaderMap.containsKey(tsFile.getTsFilePath()));
259+
return (isClosed && closedFileReaderMap.containsKey(tsFile.getTsFileID()))
260+
|| (!isClosed && unclosedFileReaderMap.containsKey(tsFile.getTsFileID()));
256261
}
257262

258263
@TestOnly
259-
public Map<String, TsFileSequenceReader> getClosedFileReaderMap() {
264+
public Map<TsFileID, TsFileSequenceReader> getClosedFileReaderMap() {
260265
return closedFileReaderMap;
261266
}
262267

263268
@TestOnly
264-
public Map<String, TsFileSequenceReader> getUnclosedFileReaderMap() {
269+
public Map<TsFileID, TsFileSequenceReader> getUnclosedFileReaderMap() {
265270
return unclosedFileReaderMap;
266271
}
267272

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AbstractDeviceChunkMetaData;
3030
import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.AlignedDeviceChunkMetaData;
3131
import org.apache.iotdb.db.storageengine.dataregion.read.filescan.model.DeviceChunkMetaData;
32+
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
3233
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3334
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
3435
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
@@ -108,7 +109,8 @@ public boolean isTimeSeriesTimeDeleted(IDeviceID deviceID, String timeSeriesName
108109
@Override
109110
public Iterator<AbstractDeviceChunkMetaData> getAllDeviceChunkMetaData() throws IOException {
110111

111-
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(getFilePath(), true);
112+
TsFileSequenceReader tsFileReader =
113+
FileReaderManager.getInstance().get(getFilePath(), tsFileResource.getTsFileID(), true);
112114
TsFileDeviceIterator deviceIterator = tsFileReader.getAllDevicesIteratorWithIsAligned();
113115

114116
List<AbstractDeviceChunkMetaData> deviceChunkMetaDataList = new LinkedList<>();
@@ -161,10 +163,12 @@ public Iterator<IChunkHandle> getChunkHandles(
161163
List<Statistics<? extends Serializable>> statisticsList,
162164
List<Integer> orderedIndexList) {
163165
String filePath = tsFileResource.getTsFilePath();
166+
TsFileID tsFileID = tsFileResource.getTsFileID();
164167
List<IChunkHandle> chunkHandleList = new ArrayList<>();
165168
for (int i : orderedIndexList) {
166169
AbstractChunkOffset chunkOffset = chunkInfoList.get(i);
167-
chunkHandleList.add(chunkOffset.generateChunkHandle(filePath, statisticsList.get(i)));
170+
chunkHandleList.add(
171+
chunkOffset.generateChunkHandle(filePath, tsFileID, statisticsList.get(i)));
168172
}
169173
return chunkHandleList.iterator();
170174
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl;
2121

22+
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
2223
import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer;
2324

2425
import org.apache.tsfile.encrypt.EncryptParameter;
@@ -45,11 +46,12 @@ public DiskAlignedChunkHandleImpl(
4546
IDeviceID deviceID,
4647
String measurement,
4748
String filePath,
49+
TsFileID tsFileID,
4850
boolean isTsFileClosed,
4951
long offset,
5052
Statistics<? extends Serializable> chunkStatistic,
5153
SharedTimeDataBuffer sharedTimeDataBuffer) {
52-
super(deviceID, measurement, filePath, isTsFileClosed, offset, chunkStatistic);
54+
super(deviceID, measurement, filePath, tsFileID, isTsFileClosed, offset, chunkStatistic);
5355
this.sharedTimeDataBuffer = sharedTimeDataBuffer;
5456
}
5557

0 commit comments

Comments
 (0)