From 93a0d4cd049c2e4a0ca240fbdcae44a5d9d3a8c7 Mon Sep 17 00:00:00 2001 From: Elouan Appere Date: Thu, 21 Nov 2024 17:23:30 +0100 Subject: [PATCH] PrefetchingResultSetIterator: fix bug w/r/t intermediate zero-row pages Zero row pages (that aren't the last) can sometimes happen, for example when using "filtering". The iterator has a logic bug which makes it throw an exception when encountering this. Fix this issue with minimal code changes and add associated unit test. --- .../reader/PrefetchingResultSetIterator.scala | 15 +++- .../PrefetchingResultSetIteratorSpec.scala | 90 +++++++++++++++++++ 2 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 connector/src/test/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIteratorSpec.scala diff --git a/connector/src/main/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIterator.scala b/connector/src/main/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIterator.scala index 9096e4d5d..6006970c5 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIterator.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIterator.scala @@ -37,7 +37,10 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time } private def maybePrefetch(): Unit = { - if (!currentIterator.hasNext && currentResultSet.hasMorePages) { + // It is sometimes possible to see pages have zero elements that are + // not the last, hence iterating until we get at least one element is + // needed + while (!currentIterator.hasNext && currentResultSet.hasMorePages) { currentResultSet = Await.result(nextResultSet.get, Duration.Inf) currentIterator = currentResultSet.currentPage().iterator() nextResultSet = fetchNextPage() @@ -48,8 +51,16 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time currentIterator.hasNext || currentResultSet.hasMorePages override def next(): Row = { - val row = currentIterator.next() // let's try to exhaust the current iterator first + val row = currentIterator.next() + + // This must be called after the call to next() and not before, + // so that hasNext returns is a correct result maybePrefetch() row } + + // It is possible to have empty pages at the start of the query result, + // for example when using "filtering"; discard those and ensure hasNext + // always returns the correct result + maybePrefetch() } diff --git a/connector/src/test/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIteratorSpec.scala b/connector/src/test/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIteratorSpec.scala new file mode 100644 index 000000000..2a0bbece9 --- /dev/null +++ b/connector/src/test/scala/com/datastax/spark/connector/rdd/reader/PrefetchingResultSetIteratorSpec.scala @@ -0,0 +1,90 @@ +package com.datastax.spark.connector.rdd.reader + +import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, Row} +import org.mockito.Mockito._ +import org.scalatest.{FlatSpec, Matchers} +import org.scalatestplus.mockito.MockitoSugar + +import java.util.concurrent.CompletableFuture +import scala.jdk.CollectionConverters._ + +class PrefetchingResultSetIteratorSpec extends FlatSpec with Matchers with MockitoSugar { + + "PrefetchingResultSetIterator" should "handle empty pages that are not the last" in { + val row1 = mock[Row] + val row2 = mock[Row] + val row3 = mock[Row] + + val asyncResultSet1 = mock[AsyncResultSet] + val asyncResultSet2 = mock[AsyncResultSet] + val asyncResultSet3 = mock[AsyncResultSet] + val asyncResultSet4 = mock[AsyncResultSet] + val asyncResultSet5 = mock[AsyncResultSet] + + // First page is empty + when(asyncResultSet1.currentPage()).thenReturn(Seq.empty[Row].asJava) + when(asyncResultSet1.hasMorePages()).thenReturn(true) + when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2)) + + // Second page has data + when(asyncResultSet2.currentPage()).thenReturn(Seq(row1).asJava) + when(asyncResultSet2.hasMorePages()).thenReturn(true) + when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3)) + + // Third page is empty + when(asyncResultSet3.currentPage()).thenReturn(Seq.empty[Row].asJava) + when(asyncResultSet3.hasMorePages()).thenReturn(true) + when(asyncResultSet3.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet4)) + + // Fourth page has data + when(asyncResultSet4.currentPage()).thenReturn(Seq(row2, row3).asJava) + when(asyncResultSet4.hasMorePages()).thenReturn(true) + when(asyncResultSet4.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet5)) + + // Last page is empty + when(asyncResultSet5.currentPage()).thenReturn(Seq.empty[Row].asJava) + when(asyncResultSet5.hasMorePages()).thenReturn(false) + + val iterator = new PrefetchingResultSetIterator(asyncResultSet1) + + val rows = iterator.toList + + rows should contain theSameElementsInOrderAs Seq(row1, row2, row3) + + verify(asyncResultSet1).fetchNextPage() + verify(asyncResultSet2).fetchNextPage() + verify(asyncResultSet3).fetchNextPage() + verify(asyncResultSet4).fetchNextPage() + verify(asyncResultSet5, never()).fetchNextPage() + } + + it should "handle a result made of empty pages only" in { + val asyncResultSet1 = mock[AsyncResultSet] + val asyncResultSet2 = mock[AsyncResultSet] + val asyncResultSet3 = mock[AsyncResultSet] + + // First page is empty + when(asyncResultSet1.currentPage()).thenReturn(Seq.empty[Row].asJava) + when(asyncResultSet1.hasMorePages()).thenReturn(true) + when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2)) + + // Second page is empty + when(asyncResultSet2.currentPage()).thenReturn(Seq.empty[Row].asJava) + when(asyncResultSet2.hasMorePages()).thenReturn(true) + when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3)) + + // Last page is empty + when(asyncResultSet3.currentPage()).thenReturn(Seq.empty[Row].asJava) + when(asyncResultSet3.hasMorePages()).thenReturn(false) + + val iterator = new PrefetchingResultSetIterator(asyncResultSet1) + + val rows = iterator.toList + + rows should contain theSameElementsInOrderAs Seq.empty[Row] + + verify(asyncResultSet1).fetchNextPage() + verify(asyncResultSet2).fetchNextPage() + verify(asyncResultSet3, never()).fetchNextPage() + } +} \ No newline at end of file