Skip to content

Commit 73cf2aa

Browse files
authored
[core] Incremental query should return empty result for some corner cases (#5339)
1 parent 410c912 commit 73cf2aa

File tree

5 files changed

+113
-61
lines changed

5 files changed

+113
-61
lines changed

paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,28 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho
257257
incrementalBetween.getLeft(), incrementalBetween.getRight()));
258258
}
259259

260+
checkArgument(
261+
endId >= startId,
262+
"Ending snapshotId should >= starting snapshotId %s.",
263+
endId,
264+
startId);
265+
266+
if (snapshotManager.earliestSnapshot() == null) {
267+
LOG.warn("There is currently no snapshot. Waiting for snapshot generation.");
268+
return new EmptyResultStartingScanner(snapshotManager);
269+
}
270+
271+
if (startId == endId) {
272+
return new EmptyResultStartingScanner(snapshotManager);
273+
}
274+
260275
CoreOptions.IncrementalBetweenScanMode scanMode =
261276
options.incrementalBetweenScanMode();
262277
return scanMode == DIFF
263278
? IncrementalDiffStartingScanner.betweenSnapshotIds(
264279
startId, endId, snapshotManager)
265-
: new IncrementalDeltaStartingScanner(
266-
snapshotManager, startId, endId, toSnapshotScanMode(scanMode));
280+
: IncrementalDeltaStartingScanner.betweenSnapshotIds(
281+
startId, endId, snapshotManager, toSnapshotScanMode(scanMode));
267282
}
268283
} else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
269284
Pair<Long, Long> incrementalBetween = options.incrementalBetweenTimestamp();
@@ -277,11 +292,13 @@ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapsho
277292
long startTimestamp = incrementalBetween.getLeft();
278293
long endTimestamp = incrementalBetween.getRight();
279294
checkArgument(
280-
endTimestamp > startTimestamp,
281-
"Ending timestamp %s should be larger than starting timestamp %s.",
295+
endTimestamp >= startTimestamp,
296+
"Ending timestamp %s should be >= starting timestamp %s.",
282297
endTimestamp,
283298
startTimestamp);
284-
if (startTimestamp > latestSnapshot.timeMillis()
299+
300+
if (startTimestamp == endTimestamp
301+
|| startTimestamp > latestSnapshot.timeMillis()
285302
|| endTimestamp < earliestSnapshot.timeMillis()) {
286303
return new EmptyResultStartingScanner(snapshotManager);
287304
}

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.Iterator;
4444
import java.util.List;
4545
import java.util.Map;
46-
import java.util.Optional;
4746
import java.util.concurrent.ConcurrentHashMap;
4847
import java.util.stream.Collectors;
4948
import java.util.stream.LongStream;
@@ -72,11 +71,6 @@ public IncrementalDeltaStartingScanner(
7271

7372
@Override
7473
public Result scan(SnapshotReader reader) {
75-
// Check the validity of scan staring snapshotId.
76-
Optional<Result> checkResult = checkScanSnapshotIdValidity();
77-
if (checkResult.isPresent()) {
78-
return checkResult.get();
79-
}
8074
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new ConcurrentHashMap<>();
8175
ManifestsReader manifestsReader = reader.manifestsReader();
8276

@@ -153,39 +147,23 @@ public Result scan(SnapshotReader reader) {
153147
return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, result));
154148
}
155149

156-
/**
157-
* Check the validity of staring snapshotId early.
158-
*
159-
* @return If the check passes return empty.
160-
*/
161-
private Optional<Result> checkScanSnapshotIdValidity() {
162-
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
163-
Long latestSnapshotId = snapshotManager.latestSnapshotId();
164-
165-
if (earliestSnapshotId == null || latestSnapshotId == null) {
166-
LOG.warn("There is currently no snapshot. Waiting for snapshot generation.");
167-
return Optional.of(new NoSnapshot());
168-
}
169-
170-
checkArgument(
171-
startingSnapshotId <= endingSnapshotId,
172-
"Starting snapshotId %s must less than ending snapshotId %s.",
173-
startingSnapshotId,
174-
endingSnapshotId);
150+
public static StartingScanner betweenSnapshotIds(
151+
long startId, long endId, SnapshotManager snapshotManager, ScanMode scanMode) {
152+
long earliestSnapshotId = snapshotManager.earliestSnapshotId();
153+
long latestSnapshotId = snapshotManager.latestSnapshotId();
175154

176155
// because of the left open right closed rule of IncrementalStartingScanner that is
177156
// different from StaticFromStartingScanner, so we should allow starting snapshotId to be
178157
// equal to the earliestSnapshotId - 1.
179158
checkArgument(
180-
startingSnapshotId >= earliestSnapshotId - 1
181-
&& endingSnapshotId <= latestSnapshotId,
159+
startId >= earliestSnapshotId - 1 && endId <= latestSnapshotId,
182160
"The specified scan snapshotId range [%s, %s] is out of available snapshotId range [%s, %s].",
183-
startingSnapshotId,
184-
endingSnapshotId,
161+
startId,
162+
endId,
185163
earliestSnapshotId,
186164
latestSnapshotId);
187165

188-
return Optional.empty();
166+
return new IncrementalDeltaStartingScanner(snapshotManager, startId, endId, scanMode);
189167
}
190168

191169
public static IncrementalDeltaStartingScanner betweenTimestamps(

paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public Result scan(SnapshotReader reader) {
6767
return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
6868
}
6969

70-
public static IncrementalDiffStartingScanner betweenTags(
70+
public static StartingScanner betweenTags(
7171
Tag startTag,
7272
Tag endTag,
7373
SnapshotManager snapshotManager,
@@ -82,24 +82,22 @@ public static IncrementalDiffStartingScanner betweenTags(
8282
end.id());
8383

8484
checkArgument(
85-
end.id() > start.id(),
86-
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
85+
end.id() >= start.id(),
86+
"Tag end %s with snapshot id %s should be >= tag start %s with snapshot id %s",
8787
incrementalBetween.getRight(),
8888
end.id(),
8989
incrementalBetween.getLeft(),
9090
start.id());
9191

92+
if (start.id() == end.id()) {
93+
return new EmptyResultStartingScanner(snapshotManager);
94+
}
95+
9296
return new IncrementalDiffStartingScanner(snapshotManager, start, end);
9397
}
9498

95-
public static IncrementalDiffStartingScanner betweenSnapshotIds(
99+
public static StartingScanner betweenSnapshotIds(
96100
long startId, long endId, SnapshotManager snapshotManager) {
97-
checkArgument(
98-
endId > startId,
99-
"Ending snapshotId should be larger than starting snapshotId %s.",
100-
endId,
101-
startId);
102-
103101
Snapshot start = snapshotManager.snapshot(startId);
104102
Snapshot end = snapshotManager.snapshot(endId);
105103
return new IncrementalDiffStartingScanner(snapshotManager, start, end);

paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.types.DataTypes;
3434
import org.apache.paimon.types.RowKind;
3535
import org.apache.paimon.utils.Pair;
36+
import org.apache.paimon.utils.SnapshotManager;
3637
import org.apache.paimon.utils.TagManager;
3738

3839
import org.junit.jupiter.api.Test;
@@ -42,6 +43,7 @@
4243
import java.util.List;
4344

4445
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
46+
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
4547
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
4648
import static org.apache.paimon.data.BinaryString.fromString;
4749
import static org.apache.paimon.io.DataFileTestUtils.row;
@@ -287,7 +289,7 @@ public void testTagIncremental() throws Exception {
287289

288290
assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG2,TAG1")))
289291
.hasMessageContaining(
290-
"Tag end TAG1 with snapshot id 1 should be larger than tag start TAG2 with snapshot id 2");
292+
"Tag end TAG1 with snapshot id 1 should be >= tag start TAG2 with snapshot id 2");
291293
}
292294

293295
@Test
@@ -406,6 +408,74 @@ public void testIncrementalToAutoTag() throws Exception {
406408
.containsExactly(GenericRow.of(3, BinaryString.fromString("c")));
407409
}
408410

411+
@Test
412+
public void testIncrementalEmptyResult() throws Exception {
413+
Identifier identifier = identifier("T");
414+
Schema schema =
415+
Schema.newBuilder()
416+
.column("a", DataTypes.INT())
417+
.column("b", DataTypes.STRING())
418+
.primaryKey("a")
419+
.option("bucket", "1")
420+
.build();
421+
catalog.createTable(identifier, schema, false);
422+
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
423+
424+
// no snapshot
425+
assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,2"))).isEmpty();
426+
assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN_TIMESTAMP, "2025-01-01,2025-01-02")))
427+
.isEmpty();
428+
429+
TableWriteImpl<?> write = table.newWrite(commitUser);
430+
TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
431+
SnapshotManager snapshotManager = table.snapshotManager();
432+
433+
write.write(GenericRow.of(1, BinaryString.fromString("a")));
434+
List<CommitMessage> commitMessages = write.prepareCommit(false, 0);
435+
commit.commit(0, commitMessages);
436+
437+
write.write(GenericRow.of(2, BinaryString.fromString("b")));
438+
commitMessages = write.prepareCommit(false, 1);
439+
commit.commit(1, commitMessages);
440+
441+
table.createTag("tag1", 1);
442+
443+
long earliestTimestamp = snapshotManager.earliestSnapshot().timeMillis();
444+
long latestTimestamp = snapshotManager.latestSnapshot().timeMillis();
445+
446+
// same tag
447+
assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "tag1,tag1"))).isEmpty();
448+
449+
// same snapshot
450+
assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,1"))).isEmpty();
451+
452+
// same timestamp
453+
assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN_TIMESTAMP, "2025-01-01,2025-01-01")))
454+
.isEmpty();
455+
456+
// startTimestamp > latestSnapshot.timeMillis()
457+
assertThat(
458+
read(
459+
table,
460+
Pair.of(
461+
INCREMENTAL_BETWEEN_TIMESTAMP,
462+
String.format(
463+
"%s,%s",
464+
latestTimestamp + 1, latestTimestamp + 2))))
465+
.isEmpty();
466+
467+
// endTimestamp < earliestSnapshot.timeMillis()
468+
assertThat(
469+
read(
470+
table,
471+
Pair.of(
472+
INCREMENTAL_BETWEEN_TIMESTAMP,
473+
String.format(
474+
"%s,%s",
475+
earliestTimestamp - 2, earliestTimestamp - 1))))
476+
.isEmpty();
477+
}
478+
409479
private static long utcMills(String timestamp) {
410480
return Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
411481
}

paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,25 +110,14 @@ public void testIllegalScanSnapshotId() throws Exception {
110110
assertThatNoException()
111111
.isThrownBy(
112112
() ->
113-
new IncrementalDeltaStartingScanner(
114-
snapshotManager, 0, 4, ScanMode.DELTA)
113+
IncrementalDeltaStartingScanner.betweenSnapshotIds(
114+
0, 4, snapshotManager, ScanMode.DELTA)
115115
.scan(snapshotReader));
116116

117-
// Starting snapshotId must less than ending snapshotId.
118117
assertThatThrownBy(
119118
() ->
120-
new IncrementalDeltaStartingScanner(
121-
snapshotManager, 4, 3, ScanMode.DELTA)
122-
.scan(snapshotReader))
123-
.satisfies(
124-
anyCauseMatches(
125-
IllegalArgumentException.class,
126-
"Starting snapshotId 4 must less than ending snapshotId 3."));
127-
128-
assertThatThrownBy(
129-
() ->
130-
new IncrementalDeltaStartingScanner(
131-
snapshotManager, 1, 5, ScanMode.DELTA)
119+
IncrementalDeltaStartingScanner.betweenSnapshotIds(
120+
1, 5, snapshotManager, ScanMode.DELTA)
132121
.scan(snapshotReader))
133122
.satisfies(
134123
anyCauseMatches(

0 commit comments

Comments
 (0)