Skip to content

Commit a63795a

Browse files
committed
PrefetchingResultSetIterator: fix bug w/r/t intermediate zero-row pages
Zero row pages can sometimes happen, for example when using "filtering". The iterator has a logic bug which make it throw an exception when encountering this. Fix this issue with minimal code changes and add associated unit test.
1 parent 7ae4271 commit a63795a

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

connector/src/main/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIterator.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time
3737
}
3838

3939
private def maybePrefetch(): Unit = {
40-
if (!currentIterator.hasNext && currentResultSet.hasMorePages) {
40+
// It is sometimes possible to see pages in the middle have zero
41+
// elements, hence iterating until we get at least one element is
42+
// needed (and it needs to be done before calling .next(), too)
43+
while (!currentIterator.hasNext && currentResultSet.hasMorePages) {
4144
currentResultSet = Await.result(nextResultSet.get, Duration.Inf)
4245
currentIterator = currentResultSet.currentPage().iterator()
4346
nextResultSet = fetchNextPage()
@@ -48,8 +51,8 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time
4851
currentIterator.hasNext || currentResultSet.hasMorePages
4952

5053
override def next(): Row = {
51-
val row = currentIterator.next() // let's try to exhaust the current iterator first
5254
maybePrefetch()
55+
val row = currentIterator.next()
5356
row
5457
}
5558
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.datastax.spark.connector.rdd.reader
2+
3+
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, Row}
4+
import org.mockito.Mockito._
5+
import org.scalatest.{FlatSpec, Matchers}
6+
import org.scalatestplus.mockito.MockitoSugar
7+
8+
import java.util.concurrent.CompletableFuture
9+
import scala.jdk.CollectionConverters._
10+
11+
class PrefetchingResultSetIteratorSpec extends FlatSpec with Matchers with MockitoSugar {
12+
13+
"PrefetchingResultSetIterator" should "handle empty pages that are not the last with AsyncResultSet" in {
14+
val row1 = mock[Row]
15+
val row2 = mock[Row]
16+
val row3 = mock[Row]
17+
18+
val asyncResultSet1 = mock[AsyncResultSet]
19+
val asyncResultSet2 = mock[AsyncResultSet]
20+
val asyncResultSet3 = mock[AsyncResultSet]
21+
22+
// First page has data
23+
when(asyncResultSet1.currentPage()).thenReturn(Seq(row1).asJava)
24+
when(asyncResultSet1.hasMorePages()).thenReturn(true)
25+
when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2))
26+
27+
// Second page is empty
28+
when(asyncResultSet2.currentPage()).thenReturn(Seq.empty[Row].asJava)
29+
when(asyncResultSet2.hasMorePages()).thenReturn(true)
30+
when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3))
31+
32+
// Third page has data
33+
when(asyncResultSet3.currentPage()).thenReturn(Seq(row2, row3).asJava)
34+
when(asyncResultSet3.hasMorePages()).thenReturn(false)
35+
36+
val iterator = new PrefetchingResultSetIterator(asyncResultSet1)
37+
38+
val rows = iterator.toList
39+
40+
rows should contain theSameElementsInOrderAs Seq(row1, row2, row3)
41+
42+
verify(asyncResultSet1).fetchNextPage()
43+
verify(asyncResultSet2).fetchNextPage()
44+
verify(asyncResultSet3, never()).fetchNextPage()
45+
}
46+
}

0 commit comments

Comments
 (0)