Skip to content

Commit 04489b7

Browse files
committed
Fix PrefetchingResultSetIterator regression introduced in latest commit
Latest commit introduces a regression & doesn't handle all corner cases. Fix this by calling maybePrefetch in constructor & update unit tests accordingly.
1 parent a63795a commit 04489b7

File tree

2 files changed

+62
-10
lines changed

2 files changed

+62
-10
lines changed

connector/src/main/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIterator.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time
3737
}
3838

3939
private def maybePrefetch(): Unit = {
40-
// It is sometimes possible to see pages in the middle have zero
41-
// elements, hence iterating until we get at least one element is
42-
// needed (and it needs to be done before calling .next(), too)
40+
// It is sometimes possible to see pages have zero elements that are
41+
// not the last, hence iterating until we get at least one element is
42+
// needed
4343
while (!currentIterator.hasNext && currentResultSet.hasMorePages) {
4444
currentResultSet = Await.result(nextResultSet.get, Duration.Inf)
4545
currentIterator = currentResultSet.currentPage().iterator()
@@ -51,8 +51,16 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time
5151
currentIterator.hasNext || currentResultSet.hasMorePages
5252

5353
override def next(): Row = {
54-
maybePrefetch()
5554
val row = currentIterator.next()
55+
56+
// This must be called after the call to next() and not before,
57+
// so that hasNext returns is a correct result
58+
maybePrefetch()
5659
row
5760
}
61+
62+
// It is possible to have empty pages at the start of the query result,
63+
// for example when using "filtering"; discard those and ensure hasNext
64+
// always returns the correct result
65+
maybePrefetch()
5866
}

connector/src/test/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIteratorSpec.scala

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,61 @@ import scala.jdk.CollectionConverters._
1010

1111
class PrefetchingResultSetIteratorSpec extends FlatSpec with Matchers with MockitoSugar {
1212

13-
"PrefetchingResultSetIterator" should "handle empty pages that are not the last with AsyncResultSet" in {
13+
"PrefetchingResultSetIterator" should "handle empty pages that are not the last" in {
1414
val row1 = mock[Row]
1515
val row2 = mock[Row]
1616
val row3 = mock[Row]
1717

1818
val asyncResultSet1 = mock[AsyncResultSet]
1919
val asyncResultSet2 = mock[AsyncResultSet]
2020
val asyncResultSet3 = mock[AsyncResultSet]
21+
val asyncResultSet4 = mock[AsyncResultSet]
22+
val asyncResultSet5 = mock[AsyncResultSet]
2123

22-
// First page has data
23-
when(asyncResultSet1.currentPage()).thenReturn(Seq(row1).asJava)
24+
// First page is empty
25+
when(asyncResultSet1.currentPage()).thenReturn(Seq.empty[Row].asJava)
26+
when(asyncResultSet1.hasMorePages()).thenReturn(true)
27+
when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2))
28+
29+
// Second page has data
30+
when(asyncResultSet2.currentPage()).thenReturn(Seq(row1).asJava)
31+
when(asyncResultSet2.hasMorePages()).thenReturn(true)
32+
when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3))
33+
34+
// Third page is empty
35+
when(asyncResultSet3.currentPage()).thenReturn(Seq.empty[Row].asJava)
36+
when(asyncResultSet3.hasMorePages()).thenReturn(true)
37+
when(asyncResultSet3.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet4))
38+
39+
// Fourth page has data
40+
when(asyncResultSet4.currentPage()).thenReturn(Seq(row2, row3).asJava)
41+
when(asyncResultSet4.hasMorePages()).thenReturn(true)
42+
when(asyncResultSet4.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet5))
43+
44+
// Last page is empty
45+
when(asyncResultSet5.currentPage()).thenReturn(Seq.empty[Row].asJava)
46+
when(asyncResultSet5.hasMorePages()).thenReturn(false)
47+
48+
val iterator = new PrefetchingResultSetIterator(asyncResultSet1)
49+
50+
val rows = iterator.toList
51+
52+
rows should contain theSameElementsInOrderAs Seq(row1, row2, row3)
53+
54+
verify(asyncResultSet1).fetchNextPage()
55+
verify(asyncResultSet2).fetchNextPage()
56+
verify(asyncResultSet3).fetchNextPage()
57+
verify(asyncResultSet4).fetchNextPage()
58+
verify(asyncResultSet5, never()).fetchNextPage()
59+
}
60+
61+
it should "handle a result made of empty pages only" in {
62+
val asyncResultSet1 = mock[AsyncResultSet]
63+
val asyncResultSet2 = mock[AsyncResultSet]
64+
val asyncResultSet3 = mock[AsyncResultSet]
65+
66+
// First page is empty
67+
when(asyncResultSet1.currentPage()).thenReturn(Seq.empty[Row].asJava)
2468
when(asyncResultSet1.hasMorePages()).thenReturn(true)
2569
when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2))
2670

@@ -29,15 +73,15 @@ class PrefetchingResultSetIteratorSpec extends FlatSpec with Matchers with Mocki
2973
when(asyncResultSet2.hasMorePages()).thenReturn(true)
3074
when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3))
3175

32-
// Third page has data
33-
when(asyncResultSet3.currentPage()).thenReturn(Seq(row2, row3).asJava)
76+
// Last page is empty
77+
when(asyncResultSet3.currentPage()).thenReturn(Seq.empty[Row].asJava)
3478
when(asyncResultSet3.hasMorePages()).thenReturn(false)
3579

3680
val iterator = new PrefetchingResultSetIterator(asyncResultSet1)
3781

3882
val rows = iterator.toList
3983

40-
rows should contain theSameElementsInOrderAs Seq(row1, row2, row3)
84+
rows should contain theSameElementsInOrderAs Seq.empty[Row]
4185

4286
verify(asyncResultSet1).fetchNextPage()
4387
verify(asyncResultSet2).fetchNextPage()

0 commit comments

Comments
 (0)