Skip to content

Commit f95454b

Browse files
authored
[core] optimize read append table with limit (#6848)
1 parent e3e3320 commit f95454b

File tree

9 files changed

+344
-1
lines changed

9 files changed

+344
-1
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
9292
private ScanMetrics scanMetrics = null;
9393
private boolean dropStats;
9494
@Nullable protected List<Range> rowRanges;
95+
@Nullable protected Long limit;
9596

9697
public AbstractFileStoreScan(
9798
ManifestsReader manifestsReader,
@@ -251,6 +252,12 @@ public FileStoreScan withReadType(RowType readType) {
251252
return this;
252253
}
253254

255+
@Override
256+
public FileStoreScan withLimit(long limit) {
257+
this.limit = limit;
258+
return this;
259+
}
260+
254261
@Nullable
255262
@Override
256263
public Integer parallelism() {
@@ -379,7 +386,7 @@ public Iterator<ManifestEntry> readFileIterator() {
379386
return readManifestEntries(readManifests().filteredManifests, true);
380387
}
381388

382-
private Iterator<ManifestEntry> readManifestEntries(
389+
protected Iterator<ManifestEntry> readManifestEntries(
383390
List<ManifestFileMeta> manifests, boolean useSequential) {
384391
return scanMode == ScanMode.ALL
385392
? readAndMergeFileEntries(manifests, Function.identity(), useSequential)

paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.fileindex.FileIndexPredicate;
2323
import org.apache.paimon.manifest.ManifestEntry;
2424
import org.apache.paimon.manifest.ManifestFile;
25+
import org.apache.paimon.manifest.ManifestFileMeta;
2526
import org.apache.paimon.predicate.Predicate;
2627
import org.apache.paimon.schema.SchemaManager;
2728
import org.apache.paimon.schema.TableSchema;
@@ -33,6 +34,8 @@
3334
import javax.annotation.Nullable;
3435

3536
import java.io.IOException;
37+
import java.util.Iterator;
38+
import java.util.List;
3639
import java.util.Map;
3740
import java.util.concurrent.ConcurrentHashMap;
3841

@@ -80,6 +83,18 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
8083
return this;
8184
}
8285

86+
@Override
87+
protected Iterator<ManifestEntry> readManifestEntries(
88+
List<ManifestFileMeta> manifests, boolean useSequential) {
89+
Iterator<ManifestEntry> baseIterator = super.readManifestEntries(manifests, useSequential);
90+
91+
if (limit != null && limit > 0) {
92+
return new LimitAwareManifestEntryIterator(baseIterator, limit);
93+
}
94+
95+
return baseIterator;
96+
}
97+
8398
/** Note: Keep this thread-safe. */
8499
@Override
85100
protected boolean filterByStats(ManifestEntry entry) {
@@ -142,4 +157,64 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry
142157
throw new RuntimeException("Exception happens while checking predicate.", e);
143158
}
144159
}
160+
161+
/**
162+
* Iterator that applies limit pushdown by stopping early when enough rows have been
163+
* accumulated.
164+
*/
165+
private static class LimitAwareManifestEntryIterator implements Iterator<ManifestEntry> {
166+
private final Iterator<ManifestEntry> baseIterator;
167+
private final long limit;
168+
169+
private long accumulatedRowCount = 0;
170+
private ManifestEntry nextEntry = null;
171+
private boolean hasNext = false;
172+
173+
LimitAwareManifestEntryIterator(Iterator<ManifestEntry> baseIterator, long limit) {
174+
this.baseIterator = baseIterator;
175+
this.limit = limit;
176+
advance();
177+
}
178+
179+
private void advance() {
180+
// If we've already accumulated enough rows, stop reading more entries
181+
if (accumulatedRowCount >= limit) {
182+
hasNext = false;
183+
nextEntry = null;
184+
return;
185+
}
186+
187+
if (baseIterator.hasNext()) {
188+
nextEntry = baseIterator.next();
189+
hasNext = true;
190+
191+
long fileRowCount = nextEntry.file().rowCount();
192+
if (fileRowCount > 0) {
193+
accumulatedRowCount += fileRowCount;
194+
}
195+
196+
return;
197+
}
198+
199+
// No more base entries
200+
hasNext = false;
201+
nextEntry = null;
202+
}
203+
204+
@Override
205+
public boolean hasNext() {
206+
return hasNext;
207+
}
208+
209+
@Override
210+
public ManifestEntry next() {
211+
// This exception is only thrown if next() is called when hasNext() returns false.
212+
if (!hasNext) {
213+
throw new java.util.NoSuchElementException();
214+
}
215+
ManifestEntry current = nextEntry;
216+
advance();
217+
return current;
218+
}
219+
}
145220
}

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public interface FileStoreScan {
9595

9696
FileStoreScan withReadType(RowType readType);
9797

98+
FileStoreScan withLimit(long limit);
99+
98100
@Nullable
99101
Integer parallelism();
100102

paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public InnerTableScan withFilter(Predicate predicate) {
7979
@Override
8080
public InnerTableScan withLimit(int limit) {
8181
this.pushDownLimit = limit;
82+
snapshotReader.withLimit(limit);
8283
return this;
8384
}
8485

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public interface SnapshotReader {
116116

117117
SnapshotReader withReadType(RowType readType);
118118

119+
SnapshotReader withLimit(int limit);
120+
119121
/** Get splits plan from snapshot. */
120122
Plan read();
121123

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,12 @@ public SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter) {
322322
return this;
323323
}
324324

325+
@Override
326+
public SnapshotReader withLimit(int limit) {
327+
scan.withLimit(limit);
328+
return this;
329+
}
330+
325331
@Override
326332
public SnapshotReader dropStats() {
327333
scan.dropStats();

paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,12 @@ public SnapshotReader withReadType(RowType readType) {
438438
return this;
439439
}
440440

441+
@Override
442+
public SnapshotReader withLimit(int limit) {
443+
wrapped.withLimit(limit);
444+
return this;
445+
}
446+
441447
@Override
442448
public Plan read() {
443449
return wrapped.read();

paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.paimon.table.source;
2020

2121
import org.apache.paimon.predicate.FieldRef;
22+
import org.apache.paimon.predicate.Predicate;
23+
import org.apache.paimon.predicate.PredicateBuilder;
2224
import org.apache.paimon.predicate.TopN;
2325
import org.apache.paimon.stats.SimpleStatsEvolutions;
2426
import org.apache.paimon.table.FileStoreTable;
@@ -111,6 +113,167 @@ public void testPushDownLimit() throws Exception {
111113
commit.close();
112114
}
113115

116+
@Test
117+
public void testLimitPushdownWithFilter() throws Exception {
118+
createAppendOnlyTable();
119+
120+
StreamTableWrite write = table.newWrite(commitUser);
121+
StreamTableCommit commit = table.newCommit(commitUser);
122+
123+
// Write 50 files, each with 1 row. Rows 0-24 have 'a' = 10, rows 25-49 have 'a' = 20.
124+
for (int i = 0; i < 25; i++) {
125+
write.write(rowData(i, 10, (long) i * 100));
126+
commit.commit(i, write.prepareCommit(true, i));
127+
}
128+
for (int i = 25; i < 50; i++) {
129+
write.write(rowData(i, 20, (long) i * 100));
130+
commit.commit(i, write.prepareCommit(true, i));
131+
}
132+
133+
// Without limit, should read all 50 files
134+
TableScan.Plan planWithoutLimit = table.newScan().plan();
135+
int totalSplits = planWithoutLimit.splits().size();
136+
assertThat(totalSplits).isEqualTo(50);
137+
138+
// With filter (a = 20) and limit (10)
139+
// filterByStats has already been applied in baseIterator, so only files 25-49 will be
140+
// returned
141+
// To get 10 rows, it should read 10 files (from index 25 to 34)
142+
Predicate filter =
143+
new PredicateBuilder(table.schema().logicalRowType())
144+
.equal(1, 20); // Filter on 'a' = 20
145+
TableScan.Plan planWithFilterAndLimit =
146+
table.newScan().withFilter(filter).withLimit(10).plan();
147+
int splitsWithFilterAndLimit = planWithFilterAndLimit.splits().size();
148+
149+
// Should read exactly 10 files (from index 25 to 34) to get 10 rows
150+
assertThat(splitsWithFilterAndLimit).isLessThanOrEqualTo(10);
151+
assertThat(splitsWithFilterAndLimit).isGreaterThan(0);
152+
assertThat(splitsWithFilterAndLimit).isLessThan(totalSplits);
153+
154+
write.close();
155+
commit.close();
156+
}
157+
158+
@Test
159+
public void testLimitPushdownWhenDataLessThanLimit() throws Exception {
160+
createAppendOnlyTable();
161+
162+
StreamTableWrite write = table.newWrite(commitUser);
163+
StreamTableCommit commit = table.newCommit(commitUser);
164+
165+
// Write only 3 rows
166+
write.write(rowData(1, 10, 100L));
167+
write.write(rowData(2, 20, 200L));
168+
write.write(rowData(3, 30, 300L));
169+
commit.commit(0, write.prepareCommit(true, 0));
170+
171+
// With limit 10, but only 3 rows exist
172+
// Should return all 3 rows without throwing exception
173+
TableScan.Plan plan = table.newScan().withLimit(10).plan();
174+
assertThat(plan.splits().size()).isEqualTo(3);
175+
176+
// Verify we can read all data correctly
177+
List<String> result = getResult(table.newRead(), plan.splits());
178+
assertThat(result.size()).isEqualTo(3);
179+
assertThat(result).containsExactlyInAnyOrder("+I 1|10|100", "+I 2|20|200", "+I 3|30|300");
180+
181+
write.close();
182+
commit.close();
183+
}
184+
185+
@Test
186+
public void testLimitPushdownWithFilterWhenDataLessThanLimit() throws Exception {
187+
createAppendOnlyTable();
188+
189+
StreamTableWrite write = table.newWrite(commitUser);
190+
StreamTableCommit commit = table.newCommit(commitUser);
191+
192+
// Write 10 rows, but only 3 rows have 'a' = 20
193+
for (int i = 0; i < 7; i++) {
194+
write.write(rowData(i, 10, (long) i * 100));
195+
commit.commit(i, write.prepareCommit(true, i));
196+
}
197+
for (int i = 7; i < 10; i++) {
198+
write.write(rowData(i, 20, (long) i * 100));
199+
commit.commit(i, write.prepareCommit(true, i));
200+
}
201+
202+
// With filter (a = 20) and limit (10), but only 3 rows match
203+
// Should return all 3 matching rows without throwing exception
204+
Predicate filter = new PredicateBuilder(table.schema().logicalRowType()).equal(1, 20);
205+
TableScan.Plan plan = table.newScan().withFilter(filter).withLimit(10).plan();
206+
207+
// Should read all 3 files that match the filter
208+
assertThat(plan.splits().size()).isEqualTo(3);
209+
210+
// Verify we can read all matching data correctly
211+
List<String> result = getResult(table.newRead(), plan.splits());
212+
assertThat(result.size()).isEqualTo(3);
213+
assertThat(result).containsExactlyInAnyOrder("+I 7|20|700", "+I 8|20|800", "+I 9|20|900");
214+
215+
write.close();
216+
commit.close();
217+
}
218+
219+
@Test
220+
public void testLimitPushdownEarlyStop() throws Exception {
221+
createAppendOnlyTable();
222+
223+
StreamTableWrite write = table.newWrite(commitUser);
224+
StreamTableCommit commit = table.newCommit(commitUser);
225+
226+
// Write 100 files, each with 1 row
227+
for (int i = 0; i < 100; i++) {
228+
write.write(rowData(i, i, (long) i * 100));
229+
commit.commit(i, write.prepareCommit(true, i));
230+
}
231+
232+
// Without limit, should read all 100 files
233+
TableScan.Plan planWithoutLimit = table.newScan().plan();
234+
assertThat(planWithoutLimit.splits().size()).isEqualTo(100);
235+
236+
// With limit 10, should only read 10 files (early stop optimization)
237+
TableScan.Plan planWithLimit = table.newScan().withLimit(10).plan();
238+
assertThat(planWithLimit.splits().size()).isEqualTo(10);
239+
240+
// Verify the returned data is correct
241+
List<String> result = getResult(table.newRead(), planWithLimit.splits());
242+
assertThat(result.size()).isEqualTo(10);
243+
244+
write.close();
245+
commit.close();
246+
}
247+
248+
@Test
249+
public void testLimitPushdownBoundaryCases() throws Exception {
250+
createAppendOnlyTable();
251+
252+
StreamTableWrite write = table.newWrite(commitUser);
253+
StreamTableCommit commit = table.newCommit(commitUser);
254+
255+
// Write 5 rows
256+
for (int i = 0; i < 5; i++) {
257+
write.write(rowData(i, i, (long) i * 100));
258+
commit.commit(i, write.prepareCommit(true, i));
259+
}
260+
261+
// Test limit = 1
262+
TableScan.Plan plan1 = table.newScan().withLimit(1).plan();
263+
assertThat(plan1.splits().size()).isEqualTo(1);
264+
265+
// Test limit = 5 (exactly the number of rows)
266+
TableScan.Plan plan5 = table.newScan().withLimit(5).plan();
267+
assertThat(plan5.splits().size()).isEqualTo(5);
268+
269+
// Test limit = 10 (more than the number of rows, should return all 5)
270+
TableScan.Plan plan10 = table.newScan().withLimit(10).plan();
271+
assertThat(plan10.splits().size()).isEqualTo(5);
272+
273+
write.close();
274+
commit.close();
275+
}
276+
114277
@Test
115278
public void testPushDownTopN() throws Exception {
116279
createAppendOnlyTable();

0 commit comments

Comments
 (0)