Skip to content

Commit 88e2aa2

Browse files
committed
[core] Fix data evolution mode with limit manifest push down
1 parent f95454b commit 88e2aa2

File tree

6 files changed

+84
-51
lines changed

6 files changed

+84
-51
lines changed

paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package org.apache.paimon.utils;
2020

21+
import java.util.ArrayList;
2122
import java.util.Collection;
23+
import java.util.Iterator;
2224
import java.util.List;
2325
import java.util.concurrent.ThreadLocalRandom;
2426

@@ -36,4 +38,12 @@ public static <T> T pickRandomly(List<T> list) {
3638
public static <T> boolean isNullOrEmpty(Collection<T> list) {
3739
return list == null || list.isEmpty();
3840
}
41+
42+
public static <T> List<T> toList(Iterator<T> iterator) {
43+
List<T> result = new ArrayList<>();
44+
while (iterator.hasNext()) {
45+
result.add(iterator.next());
46+
}
47+
return result;
48+
}
3949
}

paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ public AppendOnlyFileStoreScan newScan() {
177177
schemaManager,
178178
schema,
179179
manifestFileFactory(),
180-
options.scanManifestParallelism());
180+
options.scanManifestParallelism(),
181+
options.deletionVectorsEnabled());
181182
}
182183

183184
return new AppendOnlyFileStoreScan(
@@ -188,7 +189,8 @@ public AppendOnlyFileStoreScan newScan() {
188189
schema,
189190
manifestFileFactory(),
190191
options.scanManifestParallelism(),
191-
options.fileIndexReadEnabled());
192+
options.fileIndexReadEnabled(),
193+
options.deletionVectorsEnabled());
192194
}
193195

194196
@Override

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,16 @@
4141
import org.apache.paimon.types.RowType;
4242
import org.apache.paimon.utils.BiFilter;
4343
import org.apache.paimon.utils.Filter;
44+
import org.apache.paimon.utils.ListUtils;
4445
import org.apache.paimon.utils.Pair;
4546
import org.apache.paimon.utils.Range;
4647
import org.apache.paimon.utils.SnapshotManager;
4748

4849
import javax.annotation.Nullable;
4950

5051
import java.util.ArrayList;
51-
import java.util.Collection;
5252
import java.util.Collections;
5353
import java.util.Iterator;
54-
import java.util.LinkedHashMap;
5554
import java.util.List;
5655
import java.util.Map;
5756
import java.util.Set;
@@ -278,31 +277,13 @@ public Plan plan() {
278277
manifests = postFilterManifests(manifests);
279278

280279
Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
281-
List<ManifestEntry> files = new ArrayList<>();
282-
while (iterator.hasNext()) {
283-
files.add(iterator.next());
280+
if (supportsLimitPushManifestEntries()) {
281+
iterator = limitPushManifestEntries(iterator);
284282
}
285283

286-
files = postFilterManifestEntries(files);
287-
288-
if (wholeBucketFilterEnabled()) {
289-
// We group files by bucket here, and filter them by the whole bucket filter.
290-
// Why do this: because in primary key table, we can't just filter the value
291-
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
292-
// but we can do this by filter the whole bucket files
293-
files =
294-
files.stream()
295-
.collect(
296-
Collectors.groupingBy(
297-
// we use LinkedHashMap to avoid disorder
298-
file -> Pair.of(file.partition(), file.bucket()),
299-
LinkedHashMap::new,
300-
Collectors.toList()))
301-
.values()
302-
.stream()
303-
.map(this::filterWholeBucketByStats)
304-
.flatMap(Collection::stream)
305-
.collect(Collectors.toList());
284+
List<ManifestEntry> files = ListUtils.toList(iterator);
285+
if (postFilterManifestEntriesEnabled()) {
286+
files = postFilterManifestEntries(files);
306287
}
307288

308289
List<ManifestEntry> result = files;
@@ -457,16 +438,20 @@ protected List<ManifestFileMeta> postFilterManifests(List<ManifestFileMeta> mani
457438
return manifests;
458439
}
459440

460-
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
461-
return entries;
441+
protected boolean postFilterManifestEntriesEnabled() {
442+
return false;
462443
}
463444

464-
protected boolean wholeBucketFilterEnabled() {
445+
protected boolean supportsLimitPushManifestEntries() {
465446
return false;
466447
}
467448

468-
protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> entries) {
469-
return entries;
449+
protected Iterator<ManifestEntry> limitPushManifestEntries(Iterator<ManifestEntry> entries) {
450+
throw new UnsupportedOperationException();
451+
}
452+
453+
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
454+
throw new UnsupportedOperationException();
470455
}
471456

472457
/** Note: Keep this thread-safe. */

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.paimon.fileindex.FileIndexPredicate;
2323
import org.apache.paimon.manifest.ManifestEntry;
2424
import org.apache.paimon.manifest.ManifestFile;
25-
import org.apache.paimon.manifest.ManifestFileMeta;
2625
import org.apache.paimon.predicate.Predicate;
2726
import org.apache.paimon.schema.SchemaManager;
2827
import org.apache.paimon.schema.TableSchema;
@@ -35,17 +34,19 @@
3534

3635
import java.io.IOException;
3736
import java.util.Iterator;
38-
import java.util.List;
3937
import java.util.Map;
4038
import java.util.concurrent.ConcurrentHashMap;
4139

40+
import static org.apache.paimon.utils.Preconditions.checkArgument;
41+
4242
/** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
4343
public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
4444

4545
private final BucketSelectConverter bucketSelectConverter;
4646
private final SimpleStatsEvolutions simpleStatsEvolutions;
4747

4848
private final boolean fileIndexReadEnabled;
49+
private final boolean deletionVectorsEnabled;
4950

5051
protected Predicate inputFilter;
5152

@@ -63,7 +64,8 @@ public AppendOnlyFileStoreScan(
6364
TableSchema schema,
6465
ManifestFile.Factory manifestFileFactory,
6566
Integer scanManifestParallelism,
66-
boolean fileIndexReadEnabled) {
67+
boolean fileIndexReadEnabled,
68+
boolean deletionVectorsEnabled) {
6769
super(
6870
manifestsReader,
6971
snapshotManager,
@@ -75,6 +77,7 @@ public AppendOnlyFileStoreScan(
7577
this.simpleStatsEvolutions =
7678
new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id());
7779
this.fileIndexReadEnabled = fileIndexReadEnabled;
80+
this.deletionVectorsEnabled = deletionVectorsEnabled;
7881
}
7982

8083
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
@@ -84,15 +87,14 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
8487
}
8588

8689
@Override
87-
protected Iterator<ManifestEntry> readManifestEntries(
88-
List<ManifestFileMeta> manifests, boolean useSequential) {
89-
Iterator<ManifestEntry> baseIterator = super.readManifestEntries(manifests, useSequential);
90-
91-
if (limit != null && limit > 0) {
92-
return new LimitAwareManifestEntryIterator(baseIterator, limit);
93-
}
90+
public boolean supportsLimitPushManifestEntries() {
91+
return limit != null && limit > 0 && !deletionVectorsEnabled;
92+
}
9493

95-
return baseIterator;
94+
@Override
95+
protected Iterator<ManifestEntry> limitPushManifestEntries(Iterator<ManifestEntry> entries) {
96+
checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
97+
return new LimitAwareManifestEntryIterator(entries, limit);
9698
}
9799

98100
/** Note: Keep this thread-safe. */

paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.stream.Collectors;
5151

5252
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
53+
import static org.apache.paimon.utils.Preconditions.checkNotNull;
5354

5455
/** {@link FileStoreScan} for data-evolution enabled table. */
5556
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
@@ -64,7 +65,8 @@ public DataEvolutionFileStoreScan(
6465
SchemaManager schemaManager,
6566
TableSchema schema,
6667
ManifestFile.Factory manifestFileFactory,
67-
Integer scanManifestParallelism) {
68+
Integer scanManifestParallelism,
69+
boolean deletionVectorsEnabled) {
6870
super(
6971
manifestsReader,
7072
bucketSelectConverter,
@@ -73,7 +75,8 @@ public DataEvolutionFileStoreScan(
7375
schema,
7476
manifestFileFactory,
7577
scanManifestParallelism,
76-
false);
78+
false,
79+
deletionVectorsEnabled);
7780
}
7881

7982
@Override
@@ -144,11 +147,19 @@ public FileStoreScan withReadType(RowType readType) {
144147
return this;
145148
}
146149

150+
@Override
151+
public boolean supportsLimitPushManifestEntries() {
152+
return false;
153+
}
154+
155+
@Override
156+
protected boolean postFilterManifestEntriesEnabled() {
157+
return inputFilter != null;
158+
}
159+
147160
@Override
148161
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
149-
if (inputFilter == null) {
150-
return entries;
151-
}
162+
checkNotNull(inputFilter);
152163

153164
// group by row id range
154165
RangeHelper<ManifestEntry> rangeHelper =

paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,20 @@
3434
import org.apache.paimon.stats.SimpleStatsEvolutions;
3535
import org.apache.paimon.table.source.ScanMode;
3636
import org.apache.paimon.types.RowType;
37+
import org.apache.paimon.utils.Pair;
3738
import org.apache.paimon.utils.SnapshotManager;
3839

3940
import javax.annotation.Nullable;
4041

4142
import java.io.IOException;
4243
import java.util.ArrayList;
44+
import java.util.Collection;
4345
import java.util.Collections;
46+
import java.util.LinkedHashMap;
4447
import java.util.List;
4548
import java.util.Map;
4649
import java.util.concurrent.ConcurrentHashMap;
50+
import java.util.stream.Collectors;
4751

4852
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
4953
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
@@ -155,7 +159,7 @@ protected boolean filterByStats(ManifestEntry entry) {
155159

156160
@Override
157161
protected ManifestEntry dropStats(ManifestEntry entry) {
158-
if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
162+
if (!isValueFilterEnabled() && postFilterManifestEntriesEnabled()) {
159163
return new FilteredManifestEntry(entry.copyWithoutStats(), filterByValueFilter(entry));
160164
}
161165
return entry.copyWithoutStats();
@@ -200,12 +204,31 @@ private boolean isValueFilterEnabled() {
200204
}
201205

202206
@Override
203-
protected boolean wholeBucketFilterEnabled() {
207+
protected boolean postFilterManifestEntriesEnabled() {
204208
return valueFilter != null && scanMode == ScanMode.ALL;
205209
}
206210

207211
@Override
208-
protected List<ManifestEntry> filterWholeBucketByStats(List<ManifestEntry> entries) {
212+
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> files) {
213+
// We group files by bucket here, and filter them by the whole bucket filter.
214+
// Why do this: because in primary key table, we can't just filter the value
215+
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
216+
// but we can do this by filter the whole bucket files
217+
return files.stream()
218+
.collect(
219+
Collectors.groupingBy(
220+
// we use LinkedHashMap to avoid disorder
221+
file -> Pair.of(file.partition(), file.bucket()),
222+
LinkedHashMap::new,
223+
Collectors.toList()))
224+
.values()
225+
.stream()
226+
.map(this::doFilterWholeBucketByStats)
227+
.flatMap(Collection::stream)
228+
.collect(Collectors.toList());
229+
}
230+
231+
private List<ManifestEntry> doFilterWholeBucketByStats(List<ManifestEntry> entries) {
209232
return noOverlapping(entries)
210233
? filterWholeBucketPerFile(entries)
211234
: filterWholeBucketAllFiles(entries);

0 commit comments

Comments
 (0)