2121import org .apache .paimon .CoreOptions ;
2222import org .apache .paimon .Snapshot ;
2323import org .apache .paimon .data .BinaryRow ;
24- import org .apache .paimon .format .blob .BlobFileFormat ;
2524import org .apache .paimon .io .DataFileMeta ;
2625import org .apache .paimon .manifest .ManifestEntry ;
2726import org .apache .paimon .manifest .ManifestFileMeta ;
3635import java .util .ArrayDeque ;
3736import java .util .ArrayList ;
3837import java .util .Collections ;
39- import java .util .Comparator ;
38+ import java .util .HashMap ;
39+ import java .util .LinkedHashMap ;
4040import java .util .List ;
41+ import java .util .Map ;
4142import java .util .Queue ;
43+ import java .util .TreeMap ;
4244import java .util .stream .Collectors ;
4345
44- import static org .apache .paimon .utils . Preconditions . checkArgument ;
46+ import static org .apache .paimon .format . blob . BlobFileFormat . isBlobFile ;
4547
4648/** Compact coordinator to compact data evolution table. */
4749public class DataEvolutionCompactCoordinator {
@@ -107,12 +109,6 @@ List<ManifestEntry> scan() {
107109 currentMetas .stream ()
108110 .flatMap (meta -> snapshotReader .readManifest (meta ).stream ())
109111 .collect (Collectors .toList ());
110- Comparator <ManifestEntry > comparator =
111- Comparator .comparingLong ((ManifestEntry a ) -> a .file ().nonNullFirstRowId ())
112- .thenComparingInt (
113- a -> BlobFileFormat .isBlobFile (a .fileName ()) ? 1 : 0 );
114- targetEntries .sort (comparator );
115-
116112 result .addAll (targetEntries );
117113 }
118114 return result ;
@@ -126,14 +122,6 @@ static class CompactPlanner {
126122 private final long targetFileSize ;
127123 private final long openFileCost ;
128124 private final long compactMinFileNum ;
129- private long lastRowIdStart = -1 ;
130- private long nextRowIdExpected = -1 ;
131- private long weightSum = 0L ;
132- private BinaryRow lastPartition = null ;
133- private boolean skipFile = false ;
134- private List <DataEvolutionCompactTask > tasks = new ArrayList <>();
135- private List <DataFileMeta > groupFiles = new ArrayList <>();
136- private List <DataFileMeta > blobFiles = new ArrayList <>();
137125
138126 CompactPlanner (
139127 boolean compactBlob ,
@@ -146,90 +134,121 @@ static class CompactPlanner {
146134 this .compactMinFileNum = compactMinFileNum ;
147135 }
148136
149- List <DataEvolutionCompactTask > compactPlan (Iterable <ManifestEntry > entries ) {
150- for (ManifestEntry entry : entries ) {
151- long rowId = entry .file ().nonNullFirstRowId ();
152- if (rowId < lastRowIdStart ) {
153- throw new IllegalStateException (
154- "Files are not in order by rowId. Current file rowId: "
155- + rowId
156- + ", last file rowId: "
157- + lastRowIdStart );
158- } else if (rowId == lastRowIdStart ) {
159- checkArgument (
160- lastPartition .equals (entry .partition ()),
161- "Inconsistent partition for the same rowId: " + rowId );
162- if (!skipFile ) {
163- if (BlobFileFormat .isBlobFile (entry .fileName ())) {
164- blobFiles .add (entry .file ());
137+ List <DataEvolutionCompactTask > compactPlan (List <ManifestEntry > input ) {
138+ List <DataEvolutionCompactTask > tasks = new ArrayList <>();
139+ Map <BinaryRow , List <DataFileMeta >> partitionedFiles = new LinkedHashMap <>();
140+ for (ManifestEntry entry : input ) {
141+ partitionedFiles
142+ .computeIfAbsent (entry .partition (), k -> new ArrayList <>())
143+ .add (entry .file ());
144+ }
145+
146+ for (Map .Entry <BinaryRow , List <DataFileMeta >> partitionFiles :
147+ partitionedFiles .entrySet ()) {
148+ BinaryRow partition = partitionFiles .getKey ();
149+ List <DataFileMeta > files = partitionFiles .getValue ();
150+ RangeHelper <DataFileMeta > rangeHelper =
151+ new RangeHelper <>(
152+ DataFileMeta ::nonNullFirstRowId ,
153+ // merge adjacent files
154+ f -> f .nonNullFirstRowId () + f .rowCount ());
155+
156+ List <List <DataFileMeta >> ranges = rangeHelper .mergeOverlappingRanges (files );
157+
158+ for (List <DataFileMeta > group : ranges ) {
159+ List <DataFileMeta > dataFiles = new ArrayList <>();
160+ List <DataFileMeta > blobFiles = new ArrayList <>();
161+ TreeMap <Long , DataFileMeta > treeMap = new TreeMap <>();
162+ Map <DataFileMeta , List <DataFileMeta >> dataFileToBlobFiles = new HashMap <>();
163+ for (DataFileMeta f : group ) {
164+ if (!isBlobFile (f .fileName ())) {
165+ treeMap .put (f .nonNullFirstRowId (), f );
166+ dataFiles .add (f );
165167 } else {
166- groupFiles .add (entry .file ());
167- weightSum += Math .max (entry .file ().fileSize (), openFileCost );
168+ blobFiles .add (f );
168169 }
169170 }
170- } else if (rowId < nextRowIdExpected ) {
171- checkArgument (
172- lastPartition .equals (entry .partition ()),
173- "Inconsistent partition for the same rowId: " + rowId );
174- checkArgument (
175- BlobFileFormat .isBlobFile (entry .fileName ()),
176- "Data file found in the middle of blob files for rowId: " + rowId );
177- if (!skipFile ) {
178- blobFiles .add (entry .file ());
179- }
180- } else {
181- BinaryRow currentPartition = entry .partition ();
182- long currentWeight = Math .max (entry .file ().fileSize (), openFileCost );
183- // skip big file
184- skipFile = currentWeight > targetFileSize ;
185-
186- // If compaction condition meets, do compaction
187- if (weightSum > targetFileSize
188- || rowId > nextRowIdExpected
189- || !currentPartition .equals (lastPartition )
190- || skipFile ) {
191- flushAll ();
171+
172+ if (compactBlob ) {
173+ // associate blob files to data files
174+ for (DataFileMeta blobFile : blobFiles ) {
175+ Long key = treeMap .floorKey (blobFile .nonNullFirstRowId ());
176+ if (key != null ) {
177+ DataFileMeta dataFile = treeMap .get (key );
178+ if (blobFile .nonNullFirstRowId () >= dataFile .nonNullFirstRowId ()
179+ && blobFile .nonNullFirstRowId ()
180+ <= dataFile .nonNullFirstRowId ()
181+ + dataFile .rowCount ()
182+ - 1 ) {
183+ dataFileToBlobFiles
184+ .computeIfAbsent (dataFile , k -> new ArrayList <>())
185+ .add (blobFile );
186+ }
187+ }
188+ }
192189 }
193190
194- if (!skipFile ) {
195- weightSum += currentWeight ;
196- groupFiles .add (entry .file ());
191+ RangeHelper <DataFileMeta > rangeHelper2 =
192+ new RangeHelper <>(
193+ DataFileMeta ::nonNullFirstRowId ,
194+ // files group
195+ f -> f .nonNullFirstRowId () + f .rowCount () - 1 );
196+ List <List <DataFileMeta >> groupedFiles =
197+ rangeHelper2 .mergeOverlappingRanges (dataFiles );
198+ List <DataFileMeta > waitCompactFiles = new ArrayList <>();
199+
200+ long weightSum = 0L ;
201+ for (List <DataFileMeta > fileGroup : groupedFiles ) {
202+ long currentGroupWeight =
203+ fileGroup .stream ()
204+ .mapToLong (d -> Math .max (d .fileSize (), openFileCost ))
205+ .sum ();
206+ if (currentGroupWeight > targetFileSize ) {
207+ // compact current file group to merge field files
208+ tasks .addAll (triggerTask (fileGroup , partition , dataFileToBlobFiles ));
209+ // compact wait compact files
210+ tasks .addAll (
211+ triggerTask (waitCompactFiles , partition , dataFileToBlobFiles ));
212+ waitCompactFiles = new ArrayList <>();
213+ weightSum = 0 ;
214+ } else {
215+ weightSum += currentGroupWeight ;
216+ waitCompactFiles .addAll (fileGroup );
217+ if (weightSum > targetFileSize ) {
218+ tasks .addAll (
219+ triggerTask (
220+ waitCompactFiles , partition , dataFileToBlobFiles ));
221+ waitCompactFiles = new ArrayList <>();
222+ weightSum = 0L ;
223+ }
224+ }
197225 }
198- lastRowIdStart = rowId ;
199- nextRowIdExpected = rowId + entry .file ().rowCount ();
200- lastPartition = currentPartition ;
226+ tasks .addAll (triggerTask (waitCompactFiles , partition , dataFileToBlobFiles ));
201227 }
202228 }
203- // do compaction for the last group
204- flushAll ();
205-
206- List <DataEvolutionCompactTask > result = new ArrayList <>(tasks );
207- tasks = new ArrayList <>();
208- return result ;
229+ return tasks ;
209230 }
210231
211- private void flushAll () {
212- if (!groupFiles .isEmpty ()) {
213- if (groupFiles .size () >= compactMinFileNum ) {
214- tasks .add (
215- new DataEvolutionCompactTask (
216- lastPartition , new ArrayList <>(groupFiles ), false ));
217-
218- if (compactBlob && blobFiles .size () > 1 ) {
219- tasks .add (
220- new DataEvolutionCompactTask (
221- lastPartition , new ArrayList <>(blobFiles ), true ));
222- }
223- }
224-
225- weightSum = 0L ;
226- groupFiles = new ArrayList <>();
227- blobFiles = new ArrayList <>();
232+ private List <DataEvolutionCompactTask > triggerTask (
233+ List <DataFileMeta > dataFiles ,
234+ BinaryRow partition ,
235+ Map <DataFileMeta , List <DataFileMeta >> dataFileToBlobFiles ) {
236+ List <DataEvolutionCompactTask > tasks = new ArrayList <>();
237+ if (dataFiles .size () >= compactMinFileNum ) {
238+ tasks .add (new DataEvolutionCompactTask (partition , dataFiles , false ));
228239 }
229240
230- checkArgument (weightSum == 0L , "Weight sum should be zero after compaction." );
231- checkArgument (groupFiles .isEmpty (), "Group files should be empty." );
232- checkArgument (blobFiles .isEmpty (), "Blob files should be empty." );
241+ if (compactBlob ) {
242+ List <DataFileMeta > blobFiles = new ArrayList <>();
243+ for (DataFileMeta dataFile : dataFiles ) {
244+ blobFiles .addAll (
245+ dataFileToBlobFiles .getOrDefault (dataFile , Collections .emptyList ()));
246+ }
247+ if (blobFiles .size () >= compactMinFileNum ) {
248+ tasks .add (new DataEvolutionCompactTask (partition , blobFiles , true ));
249+ }
250+ }
251+ return tasks ;
233252 }
234253 }
235254}
0 commit comments