2424import org .apache .iotdb .db .engine .storagegroup .TsFileNameGenerator ;
2525import org .apache .iotdb .db .engine .storagegroup .TsFileResource ;
2626import org .apache .iotdb .db .exception .MergeException ;
27- import org .apache .iotdb .db .utils .MergeUtils ;
2827
2928import org .slf4j .Logger ;
3029import org .slf4j .LoggerFactory ;
@@ -73,13 +72,16 @@ public class RewriteCompactionFileSelector implements ICrossSpaceMergeFileSelect
7372 private boolean [] seqSelected ;
7473 private int seqSelectedNum ;
7574
75+ private AbstractCompactionEstimator compactionEstimator ;
76+
7677 public RewriteCompactionFileSelector (CrossSpaceCompactionResource resource , long memoryBudget ) {
7778 this .resource = resource ;
7879 this .memoryBudget = memoryBudget ;
7980 this .maxCrossCompactionFileNum =
8081 IoTDBDescriptor .getInstance ().getConfig ().getMaxCrossCompactionCandidateFileNum ();
8182 this .maxCrossCompactionFileSize =
8283 IoTDBDescriptor .getInstance ().getConfig ().getMaxCrossCompactionCandidateFileSize ();
84+ this .compactionEstimator = new RewriteCrossCompactionEstimator ();
8385 }
8486
8587 /**
@@ -113,10 +115,7 @@ public List[] select() throws MergeException {
113115 "Selecting merge candidates from {} seqFile, {} unseqFiles" ,
114116 resource .getSeqFiles ().size (),
115117 resource .getUnseqFiles ().size ());
116- select (false );
117- if (selectedUnseqFiles .isEmpty ()) {
118- select (true );
119- }
118+ selectFiles ();
120119 resource .setSeqFiles (selectedSeqFiles );
121120 resource .setUnseqFiles (selectedUnseqFiles );
122121 resource .removeOutdatedSeqReaders ();
@@ -126,6 +125,12 @@ public List[] select() throws MergeException {
126125 }
127126 } catch (IOException e ) {
128127 throw new MergeException (e );
128+ } finally {
129+ try {
130+ compactionEstimator .close ();
131+ } catch (Exception e ) {
132+ throw new MergeException (e );
133+ }
129134 }
130135 if (logger .isInfoEnabled ()) {
131136 logger .info (
@@ -147,10 +152,9 @@ public List[] select() throws MergeException {
147152 * exceed the memory overhead preset by the system for the compaction thread, put them into the
148153 * selectedSeqFiles and selectedUnseqFiles.
149154 *
150- * @param useTightBound whether is tight estimate or loop estimate
151155 * @throws IOException
152156 */
153- void select ( boolean useTightBound ) throws IOException {
157+ void selectFiles ( ) throws IOException {
154158 tmpSelectedSeqFiles = new HashSet <>();
155159 seqSelected = new boolean [resource .getSeqFiles ().size ()];
156160 seqSelectedNum = 0 ;
@@ -189,16 +193,17 @@ void select(boolean useTightBound) throws IOException {
189193 }
190194 }
191195
196+ List <TsFileResource > tmpSelectedSeqFileResources = new ArrayList <>();
192197 for (int seqIndex : tmpSelectedSeqFiles ) {
198+ TsFileResource tsFileResource = resource .getSeqFiles ().get (seqIndex );
199+ tmpSelectedSeqFileResources .add (tsFileResource );
193200 totalSize += resource .getSeqFiles ().get (seqIndex ).getTsFileSize ();
194201 }
195202 totalSize += unseqFile .getTsFileSize ();
196203
197204 tempMaxSeqFileCost = maxSeqFileCost ;
198205 long newCost =
199- useTightBound
200- ? calculateTightMemoryCost (unseqFile , tmpSelectedSeqFiles , startTime , timeLimit )
201- : calculateLooseMemoryCost (unseqFile , tmpSelectedSeqFiles , startTime , timeLimit );
206+ compactionEstimator .estimateCrossCompactionMemory (tmpSelectedSeqFileResources , unseqFile );
202207 if (!updateSelectedFiles (newCost , unseqFile )) {
203208 // older unseq files must be merged before newer ones
204209 break ;
@@ -342,110 +347,6 @@ private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
342347 }
343348 }
344349
345- private long calculateMemoryCost (
346- TsFileResource tmpSelectedUnseqFile ,
347- Collection <Integer > tmpSelectedSeqFiles ,
348- IFileQueryMemMeasurement unseqMeasurement ,
349- IFileQueryMemMeasurement seqMeasurement ,
350- long startTime ,
351- long timeLimit )
352- throws IOException {
353- long cost = 0 ;
354- Long fileCost = unseqMeasurement .measure (tmpSelectedUnseqFile );
355- cost += fileCost ;
356-
357- for (Integer seqFileIdx : tmpSelectedSeqFiles ) {
358- TsFileResource seqFile = resource .getSeqFiles ().get (seqFileIdx );
359- fileCost = seqMeasurement .measure (seqFile );
360- if (fileCost > tempMaxSeqFileCost ) {
361- // only one file will be read at the same time, so only the largest one is recorded here
362- cost -= tempMaxSeqFileCost ;
363- cost += fileCost ;
364- tempMaxSeqFileCost = fileCost ;
365- }
366- // but writing data into a new file may generate the same amount of metadata in memory
367- cost += calculateMetadataSize (seqFile );
368- long timeConsumption = System .currentTimeMillis () - startTime ;
369- if (timeConsumption > timeLimit ) {
370- return Long .MAX_VALUE ;
371- }
372- }
373- return cost ;
374- }
375-
376- private long calculateLooseMemoryCost (
377- TsFileResource tmpSelectedUnseqFile ,
378- Collection <Integer > tmpSelectedSeqFiles ,
379- long startTime ,
380- long timeLimit )
381- throws IOException {
382- return calculateMemoryCost (
383- tmpSelectedUnseqFile ,
384- tmpSelectedSeqFiles ,
385- TsFileResource ::getTsFileSize ,
386- this ::calculateMetadataSize ,
387- startTime ,
388- timeLimit );
389- }
390-
391- private long calculateTightMemoryCost (
392- TsFileResource tmpSelectedUnseqFile ,
393- Collection <Integer > tmpSelectedSeqFiles ,
394- long startTime ,
395- long timeLimit )
396- throws IOException {
397- return calculateMemoryCost (
398- tmpSelectedUnseqFile ,
399- tmpSelectedSeqFiles ,
400- this ::calculateTightUnseqMemoryCost ,
401- this ::calculateTightSeqMemoryCost ,
402- startTime ,
403- timeLimit );
404- }
405-
406- private long calculateMetadataSize (TsFileResource seqFile ) throws IOException {
407- Long cost = fileMetaSizeMap .get (seqFile );
408- if (cost == null ) {
409- cost = MergeUtils .getFileMetaSize (seqFile , resource .getFileReader (seqFile ));
410- fileMetaSizeMap .put (seqFile , cost );
411- logger .debug (LOG_FILE_COST , seqFile , cost );
412- }
413- return cost ;
414- }
415-
416- private long calculateTightFileMemoryCost (
417- TsFileResource seqFile , IFileQueryMemMeasurement measurement ) throws IOException {
418- Long cost = maxSeriesQueryCostMap .get (seqFile );
419- if (cost == null ) {
420- long [] chunkNums =
421- MergeUtils .findTotalAndLargestSeriesChunkNum (seqFile , resource .getFileReader (seqFile ));
422- long totalChunkNum = chunkNums [0 ];
423- long maxChunkNum = chunkNums [1 ];
424- cost = measurement .measure (seqFile ) * maxChunkNum / totalChunkNum ;
425- maxSeriesQueryCostMap .put (seqFile , cost );
426- logger .debug (LOG_FILE_COST , seqFile , cost );
427- }
428- return cost ;
429- }
430-
431- // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
432- // its proportion to all series to get a maximum estimation
433- private long calculateTightSeqMemoryCost (TsFileResource seqFile ) throws IOException {
434- long singleSeriesCost = calculateTightFileMemoryCost (seqFile , this ::calculateMetadataSize );
435- long multiSeriesCost = concurrentMergeNum * singleSeriesCost ;
436- long maxCost = calculateMetadataSize (seqFile );
437- return Math .min (multiSeriesCost , maxCost );
438- }
439-
440- // this method traverses all ChunkMetadata to find out which series has the most chunks and uses
441- // its proportion among all series to get a maximum estimation
442- private long calculateTightUnseqMemoryCost (TsFileResource unseqFile ) throws IOException {
443- long singleSeriesCost = calculateTightFileMemoryCost (unseqFile , TsFileResource ::getTsFileSize );
444- long multiSeriesCost = concurrentMergeNum * singleSeriesCost ;
445- long maxCost = unseqFile .getTsFileSize ();
446- return Math .min (multiSeriesCost , maxCost );
447- }
448-
449350 @ Override
450351 public int getConcurrentMergeNum () {
451352 return concurrentMergeNum ;
0 commit comments