Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-03a897a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Fix an issue where `StackOverflowError` can occur when iterating over large pages from an async paginator. This can manifest as the publisher hanging/never reaching the end of the stream. Fixes [#6411](https://github.com/aws/aws-sdk-java-v2/issues/6411)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.awssdk.core.internal.pagination.async;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -32,6 +34,8 @@
public final class ItemsSubscription<ResponseT, ItemT> extends PaginationSubscription<ResponseT> {
private final Function<ResponseT, Iterator<ItemT>> getIteratorFunction;
private volatile Iterator<ItemT> singlePageItemsIterator;
private final AtomicBoolean handlingRequests = new AtomicBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we're effectively using this as a "try-lock" (ie, return quickly if another thread is already in this method)?

Copy link
Contributor Author

@dagnir dagnir Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep basically

edit: not just another thread, but also prevent handleRequests from reentering, which could happen for example if the subscriber does something like

void onNext(String item) {
    subscription.request(1);
}

private volatile boolean awaitingNewPage = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using an AtomicBoolean for handlingRequests but a volatile for awaitingNewPage? I'm trying to work through different cases.... when handleRequests sets awaitingNewPage to true (line 78), we create a next page future and then the whenComplete on that future which may run in a different thread and so the awaitingNewPage getting set to false may only apply in that thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only reason awaitingNewPage is volatile is I didn't need to use a CAS for it

so the awaitingNewPage getting set to false may only apply in that thread?

Hmm I might not completely understand the question here, but it's volatile so reads on awaitingNewPage will never be stale. e.g. thread T1 setting awaitingNewPage = true (atomic), and T2 reading it after the write will always return true (never false).


private ItemsSubscription(BuilderImpl builder) {
super(builder);
Expand All @@ -47,61 +51,83 @@ public static Builder builder() {

@Override
protected void handleRequests() {
if (!hasMoreItems() && !hasNextPage()) {
completeSubscription();
// Prevent recursion if we already invoked handleRequests
if (!handlingRequests.compareAndSet(false, true)) {
return;
}

synchronized (this) {
if (outstandingRequests.get() <= 0) {
stopTask();
return;
}
}

if (!isTerminated()) {
/**
* Current page is null only the first time the method is called.
* Once initialized, current page will never be null
*/
if (currentPage == null || (!hasMoreItems() && hasNextPage())) {
fetchNextPage();

} else if (hasMoreItems()) {
sendNextElement();

// All valid cases are covered above. Throw an exception if any combination is missed
} else {
throw new IllegalStateException("Execution should have not reached here");
try {
while (true) {
if (!hasMoreItems() && !hasNextPage()) {
completeSubscription();
return;
}

synchronized (this) {
if (outstandingRequests.get() <= 0) {
stopTask();
return;
}
}

if (isTerminated()) {
return;
}

if (shouldFetchNextPage()) {
awaitingNewPage = true;
fetchNextPage().whenComplete((r, e) -> {
if (e == null) {
awaitingNewPage = false;
handleRequests();
}
// note: signaling onError if e != null is taken care of by fetchNextPage(). No need to do it here.
});
} else if (hasMoreItems()) {
synchronized (this) {
if (outstandingRequests.get() <= 0) {
continue;
}

subscriber.onNext(singlePageItemsIterator.next());
outstandingRequests.getAndDecrement();
}
} else {
// Outstanding demand AND no items in current page AND waiting for next page. Just return for now, and
// we'll handle demand when the new page arrives.
return;
}
}
} finally {
handlingRequests.set(false);
}
}

private void fetchNextPage() {
nextPageFetcher.nextPage(currentPage)
.whenComplete(((response, error) -> {
if (response != null) {
currentPage = response;
singlePageItemsIterator = getIteratorFunction.apply(response);
sendNextElement();
}
if (error != null) {
subscriber.onError(error);
cleanup();
}
}));
private CompletableFuture<ResponseT> fetchNextPage() {
return nextPageFetcher.nextPage(currentPage)
.whenComplete((response, error) -> {
if (response != null) {
currentPage = response;
singlePageItemsIterator = getIteratorFunction.apply(response);
} else if (error != null) {
subscriber.onError(error);
cleanup();
}
});
}

/**
* Calls onNext and calls the recursive method.
*/
private void sendNextElement() {
if (singlePageItemsIterator.hasNext()) {
subscriber.onNext(singlePageItemsIterator.next());
outstandingRequests.getAndDecrement();
// Conditions when to fetch the next page:
// - We're NOT already waiting for a new page AND either
// - We still need to fetch the first page OR
// - We've exhausted the current page AND there is a next page available
private boolean shouldFetchNextPage() {
if (awaitingNewPage) {
return false;
}

handleRequests();
// Current page is null only the first time the method is called.
// Once initialized, current page will never be null.
return currentPage == null || (!hasMoreItems() && hasNextPage());
}

private boolean hasMoreItems() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package software.amazon.awssdk.core.pagination.async;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import software.amazon.awssdk.core.SdkResponse;

public class PaginatedItemsPublisherTest {
@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
void subscribe_largePage_doesNotFail() throws Exception {
int nItems = 100_000;

Function<SdkResponse, Iterator<String>> iteratorFn = resp ->
new Iterator<String>() {
private int count = 0;

@Override
public boolean hasNext() {
return count < nItems;
}

@Override
public String next() {
++count;
return "item";
}
};

AsyncPageFetcher<SdkResponse> pageFetcher = new AsyncPageFetcher<SdkResponse>() {
@Override
public boolean hasNextPage(SdkResponse oldPage) {
return false;
}

@Override
public CompletableFuture<SdkResponse> nextPage(SdkResponse oldPage) {
return CompletableFuture.completedFuture(mock(SdkResponse.class));
}
};

PaginatedItemsPublisher<SdkResponse, String> publisher = PaginatedItemsPublisher.builder()
.isLastPage(false)
.nextPageFetcher(pageFetcher)
.iteratorFunction(iteratorFn)
.build();

AtomicLong counter = new AtomicLong();
publisher.subscribe(i -> counter.incrementAndGet()).join();
assertThat(counter.get()).isEqualTo(nItems);
}

@Test
@Timeout(value = 1, unit = TimeUnit.MINUTES)
void subscribe_longStream_doesNotFail() throws Exception {
int nPages = 100_000;
int nItemsPerPage = 1;
Function<SdkResponse, Iterator<String>> iteratorFn = resp ->
new Iterator<String>() {
private int count = 0;

@Override
public boolean hasNext() {
return count < nItemsPerPage;
}

@Override
public String next() {
++count;
return "item";
}
};

AsyncPageFetcher<TestResponse> pageFetcher = new AsyncPageFetcher<TestResponse>() {
@Override
public boolean hasNextPage(TestResponse oldPage) {
return oldPage.pageNumber() < nPages - 1;
}

@Override
public CompletableFuture<TestResponse> nextPage(TestResponse oldPage) {
int nextPageNum;
if (oldPage == null) {
nextPageNum = 0;
} else {
nextPageNum = oldPage.pageNumber() + 1;
}
return CompletableFuture.completedFuture(createResponse(nextPageNum));
}
};

PaginatedItemsPublisher<SdkResponse, String> publisher = PaginatedItemsPublisher.builder()
.isLastPage(false)
.nextPageFetcher(pageFetcher)
.iteratorFunction(iteratorFn)
.build();

AtomicLong counter = new AtomicLong();
publisher.subscribe(i -> counter.incrementAndGet()).join();
assertThat(counter.get()).isEqualTo(nPages * nItemsPerPage);
}

private abstract class TestResponse extends SdkResponse {

protected TestResponse(Builder builder) {
super(builder);
}

abstract Integer pageNumber();
}

private static TestResponse createResponse(Integer pageNumber) {
TestResponse mock = mock(TestResponse.class);
when(mock.pageNumber()).thenReturn(pageNumber);
return mock;
}
}
Loading
Loading