File tree Expand file tree Collapse file tree 8 files changed +51
-6
lines changed
main/java/org/apache/paimon
test/java/org/apache/paimon/table/source/snapshot Expand file tree Collapse file tree 8 files changed +51
-6
lines changed Original file line number Diff line number Diff line change @@ -83,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
8383 private Filter <Integer > bucketFilter = null ;
8484 private BiFilter <Integer , Integer > totalAwareBucketFilter = null ;
8585 protected ScanMode scanMode = ScanMode .ALL ;
86+ private Integer specifiedLevel = null ;
8687 private Filter <Integer > levelFilter = null ;
8788 private Filter <ManifestEntry > manifestEntryFilter = null ;
8889 private Filter <String > fileNameFilter = null ;
@@ -191,6 +192,13 @@ public FileStoreScan withKind(ScanMode scanMode) {
191192 return this ;
192193 }
193194
195+ @ Override
196+ public FileStoreScan withLevel (int level ) {
197+ manifestsReader .withLevel (level );
198+ this .specifiedLevel = level ;
199+ return this ;
200+ }
201+
194202 @ Override
195203 public FileStoreScan withLevelFilter (Filter <Integer > levelFilter ) {
196204 this .levelFilter = levelFilter ;
@@ -524,7 +532,12 @@ private Filter<InternalRow> createEntryRowFilter() {
524532 return false ;
525533 }
526534
527- if (levelFilter != null && !levelFilter .test (levelGetter .apply (row ))) {
535+ int level = levelGetter .apply (row );
536+ if (specifiedLevel != null && level != specifiedLevel ) {
537+ return false ;
538+ }
539+
540+ if (levelFilter != null && !levelFilter .test (level )) {
528541 return false ;
529542 }
530543
Original file line number Diff line number Diff line change @@ -72,6 +72,8 @@ public interface FileStoreScan {
7272
7373 FileStoreScan withKind (ScanMode scanMode );
7474
75+ FileStoreScan withLevel (int level );
76+
7577 FileStoreScan withLevelFilter (Filter <Integer > levelFilter );
7678
7779 FileStoreScan enableValueFilter ();
Original file line number Diff line number Diff line change @@ -50,6 +50,7 @@ public class ManifestsReader {
5050
5151 private boolean onlyReadRealBuckets = false ;
5252 @ Nullable private Integer specifiedBucket = null ;
53+ @ Nullable private Integer specifiedLevel = null ;
5354 @ Nullable private PartitionPredicate partitionFilter = null ;
5455
5556 public ManifestsReader (
@@ -73,6 +74,11 @@ public ManifestsReader withBucket(int bucket) {
7374 return this ;
7475 }
7576
77+ public ManifestsReader withLevel (int level ) {
78+ this .specifiedLevel = level ;
79+ return this ;
80+ }
81+
7682 public ManifestsReader withPartitionFilter (Predicate predicate ) {
7783 this .partitionFilter = PartitionPredicate .fromPredicate (partitionType , predicate );
7884 return this ;
@@ -147,6 +153,15 @@ private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
147153 }
148154 }
149155
156+ Integer minLevel = manifest .minLevel ();
157+ Integer maxLevel = manifest .maxLevel ();
158+ if (minLevel != null && maxLevel != null ) {
159+ if (specifiedLevel != null
160+ && (specifiedLevel < minLevel || specifiedLevel > maxLevel )) {
161+ return false ;
162+ }
163+ }
164+
150165 if (partitionFilter == null ) {
151166 return true ;
152167 }
Original file line number Diff line number Diff line change @@ -80,6 +80,8 @@ public interface SnapshotReader {
8080
8181 SnapshotReader withMode (ScanMode scanMode );
8282
83+ SnapshotReader withLevel (int level );
84+
8385 SnapshotReader withLevelFilter (Filter <Integer > levelFilter );
8486
8587 SnapshotReader enableValueFilter ();
@@ -131,6 +133,7 @@ interface Plan extends TableScan.Plan {
131133 /** Result splits. */
132134 List <Split > splits ();
133135
136+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
134137 default List <DataSplit > dataSplits () {
135138 return (List ) splits ();
136139 }
Original file line number Diff line number Diff line change @@ -245,6 +245,12 @@ public SnapshotReader withMode(ScanMode scanMode) {
245245 return this ;
246246 }
247247
248+ @ Override
249+ public SnapshotReader withLevel (int level ) {
250+ scan .withLevel (level );
251+ return this ;
252+ }
253+
248254 @ Override
249255 public SnapshotReader withLevelFilter (Filter <Integer > levelFilter ) {
250256 scan .withLevelFilter (levelFilter );
Original file line number Diff line number Diff line change @@ -335,6 +335,12 @@ public SnapshotReader withMode(ScanMode scanMode) {
335335 return this ;
336336 }
337337
338+ @ Override
339+ public SnapshotReader withLevel (int level ) {
340+ wrapped .withLevel (level );
341+ return this ;
342+ }
343+
338344 @ Override
339345 public SnapshotReader withLevelFilter (Filter <Integer > levelFilter ) {
340346 wrapped .withLevelFilter (levelFilter );
Original file line number Diff line number Diff line change @@ -120,9 +120,9 @@ public List<String> primaryKeys() {
120120
121121 @ Override
122122 public SnapshotReader newSnapshotReader () {
123- if (wrapped .schema ().primaryKeys ().size () > 0 ) {
123+ if (! wrapped .schema ().primaryKeys ().isEmpty () ) {
124124 return wrapped .newSnapshotReader ()
125- .withLevelFilter ( level -> level == coreOptions ().numLevels () - 1 )
125+ .withLevel ( coreOptions ().numLevels () - 1 )
126126 .enableValueFilter ();
127127 } else {
128128 return wrapped .newSnapshotReader ();
@@ -132,15 +132,15 @@ public SnapshotReader newSnapshotReader() {
132132 @ Override
133133 public DataTableBatchScan newScan () {
134134 return new DataTableBatchScan (
135- wrapped .schema ().primaryKeys ().size () > 0 ,
135+ ! wrapped .schema ().primaryKeys ().isEmpty () ,
136136 coreOptions (),
137137 newSnapshotReader (),
138138 DefaultValueAssigner .create (wrapped .schema ()));
139139 }
140140
141141 @ Override
142142 public StreamDataTableScan newStreamScan () {
143- if (wrapped .schema ().primaryKeys ().size () > 0 ) {
143+ if (! wrapped .schema ().primaryKeys ().isEmpty () ) {
144144 throw new UnsupportedOperationException (
145145 "Unsupported streaming scan for read optimized table" );
146146 }
Original file line number Diff line number Diff line change @@ -67,7 +67,7 @@ public void testScan() throws Exception {
6767
6868 assertThat (snapshotManager .latestSnapshotId ()).isEqualTo (5 );
6969
70- snapshotReader .withLevelFilter ( level -> level == table .coreOptions ().numLevels () - 1 );
70+ snapshotReader .withLevel ( table .coreOptions ().numLevels () - 1 );
7171 TableRead read = table .newRead ();
7272 ChangelogFollowUpScanner scanner = new ChangelogFollowUpScanner ();
7373
You can’t perform that action at this time.
0 commit comments