Skip to content

Commit b2a1763

Browse files
feat(client)!: extract auto pagination to shared classes
refactor(client)!: refactor async auto-pagination refactor(client)!: rename `getNextPage{,Params}` to `nextPage{,Params}` refactor(client)!: swap `nextPage{,Params}` to return non-optional # Migration - If you were referencing the `AutoPager` class on a specific `*Page` or `*PageAsync` type, then you should instead reference the shared `AutoPager` and `AutoPagerAsync` types, under the `core` package - `AutoPagerAsync` now has different usage. You can call `.subscribe(...)` on the returned object instead to get called back each page item. You can also call `onCompleteFuture()` to get a future that completes when all items have been processed. Finally, you can call `.close()` on the returned object to stop auto-paginating early - If you were referencing `getNextPage` or `getNextPageParams`: - Swap to `nextPage()` and `nextPageParams()` - Note that these both now return non-optional types (use `hasNextPage()` before calling these, since they will throw if it's impossible to get another page) There are examples and further information about pagination in the readme.
1 parent 0da037e commit b2a1763

File tree

108 files changed

+2302
-2943
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+2302
-2943
lines changed

README.md

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -216,53 +216,101 @@ The SDK throws custom unchecked exception types:
216216

217217
## Pagination
218218

219-
For methods that return a paginated list of results, this library provides convenient ways access the results either one page at a time, or item-by-item across all pages.
219+
The SDK defines methods that return a paginated lists of results. It provides convenient ways to access the results either one page at a time or item-by-item across all pages.
220220

221221
### Auto-pagination
222222

223-
To iterate through all results across all pages, you can use `autoPager`, which automatically handles fetching more pages for you:
223+
To iterate through all results across all pages, use the `autoPager()` method, which automatically fetches more pages as needed.
224224

225-
### Synchronous
225+
When using the synchronous client, the method returns an [`Iterable`](https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html)
226226

227227
```java
228228
import com.lithic.api.models.CardListPage;
229229
import com.lithic.api.models.NonPciCard;
230230

231-
// As an Iterable:
232-
CardListPage page = client.cards().list(params);
231+
CardListPage page = client.cards().list();
232+
233+
// Process as an Iterable
233234
for (NonPciCard card : page.autoPager()) {
234235
System.out.println(card);
235-
};
236+
}
236237

237-
// As a Stream:
238-
client.cards().list(params).autoPager().stream()
238+
// Process as a Stream
239+
page.autoPager()
240+
.stream()
239241
.limit(50)
240242
.forEach(card -> System.out.println(card));
241243
```
242244

243-
### Asynchronous
245+
When using the asynchronous client, the method returns an [`AsyncStreamResponse`](lithic-java-core/src/main/kotlin/com/lithic/api/core/http/AsyncStreamResponse.kt):
244246

245247
```java
246-
// Using forEach, which returns CompletableFuture<Void>:
247-
asyncClient.cards().list(params).autoPager()
248-
.forEach(card -> System.out.println(card), executor);
248+
import com.lithic.api.core.http.AsyncStreamResponse;
249+
import com.lithic.api.models.CardListPageAsync;
250+
import com.lithic.api.models.NonPciCard;
251+
import java.util.Optional;
252+
import java.util.concurrent.CompletableFuture;
253+
254+
CompletableFuture<CardListPageAsync> pageFuture = client.async().cards().list();
255+
256+
pageFuture.thenRun(page -> page.autoPager().subscribe(card -> {
257+
System.out.println(card);
258+
}));
259+
260+
// If you need to handle errors or completion of the stream
261+
pageFuture.thenRun(page -> page.autoPager().subscribe(new AsyncStreamResponse.Handler<>() {
262+
@Override
263+
public void onNext(NonPciCard card) {
264+
System.out.println(card);
265+
}
266+
267+
@Override
268+
public void onComplete(Optional<Throwable> error) {
269+
if (error.isPresent()) {
270+
System.out.println("Something went wrong!");
271+
throw new RuntimeException(error.get());
272+
} else {
273+
System.out.println("No more!");
274+
}
275+
}
276+
}));
277+
278+
// Or use futures
279+
pageFuture.thenRun(page -> page.autoPager()
280+
.subscribe(card -> {
281+
System.out.println(card);
282+
})
283+
.onCompleteFuture()
284+
.whenComplete((unused, error) -> {
285+
if (error != null) {
286+
System.out.println("Something went wrong!");
287+
throw new RuntimeException(error);
288+
} else {
289+
System.out.println("No more!");
290+
}
291+
}));
249292
```
250293

251294
### Manual pagination
252295

253-
If none of the above helpers meet your needs, you can also manually request pages one-by-one. A page of results has a `data()` method to fetch the list of objects, as well as top-level `response` and other methods to fetch top-level data about the page. It also has methods `hasNextPage`, `getNextPage`, and `getNextPageParams` methods to help with pagination.
296+
To access individual page items and manually request the next page, use the `items()`,
297+
`hasNextPage()`, and `nextPage()` methods:
254298

255299
```java
256300
import com.lithic.api.models.CardListPage;
257301
import com.lithic.api.models.NonPciCard;
258302

259-
CardListPage page = client.cards().list(params);
260-
while (page != null) {
261-
for (NonPciCard card : page.data()) {
303+
CardListPage page = client.cards().list();
304+
while (true) {
305+
for (NonPciCard card : page.items()) {
262306
System.out.println(card);
263307
}
264308

265-
page = page.getNextPage().orElse(null);
309+
if (!page.hasNextPage()) {
310+
break;
311+
}
312+
313+
page = page.nextPage();
266314
}
267315
```
268316

@@ -339,7 +387,6 @@ To set a custom timeout, configure the method call using the `timeout` method:
339387

340388
```java
341389
import com.lithic.api.models.Card;
342-
import com.lithic.api.models.CardCreateParams;
343390

344391
Card card = client.cards().create(
345392
params, RequestOptions.builder().timeout(Duration.ofSeconds(30)).build()
@@ -586,7 +633,6 @@ Or configure the method call to validate the response using the `responseValidat
586633

587634
```java
588635
import com.lithic.api.models.Card;
589-
import com.lithic.api.models.CardCreateParams;
590636

591637
Card card = client.cards().create(
592638
params, RequestOptions.builder().responseValidation(true).build()

lithic-java-client-okhttp/src/main/kotlin/com/lithic/api/client/okhttp/LithicOkHttpClient.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.net.Proxy
1313
import java.time.Clock
1414
import java.time.Duration
1515
import java.util.Optional
16+
import java.util.concurrent.Executor
1617
import kotlin.jvm.optionals.getOrNull
1718

1819
class LithicOkHttpClient private constructor() {
@@ -49,6 +50,10 @@ class LithicOkHttpClient private constructor() {
4950

5051
fun jsonMapper(jsonMapper: JsonMapper) = apply { clientOptions.jsonMapper(jsonMapper) }
5152

53+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
54+
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
55+
}
56+
5257
fun clock(clock: Clock) = apply { clientOptions.clock(clock) }
5358

5459
fun headers(headers: Headers) = apply { clientOptions.headers(headers) }

lithic-java-client-okhttp/src/main/kotlin/com/lithic/api/client/okhttp/LithicOkHttpClientAsync.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.net.Proxy
1313
import java.time.Clock
1414
import java.time.Duration
1515
import java.util.Optional
16+
import java.util.concurrent.Executor
1617
import kotlin.jvm.optionals.getOrNull
1718

1819
class LithicOkHttpClientAsync private constructor() {
@@ -49,6 +50,10 @@ class LithicOkHttpClientAsync private constructor() {
4950

5051
fun jsonMapper(jsonMapper: JsonMapper) = apply { clientOptions.jsonMapper(jsonMapper) }
5152

53+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
54+
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
55+
}
56+
5257
fun clock(clock: Clock) = apply { clientOptions.clock(clock) }
5358

5459
fun headers(headers: Headers) = apply { clientOptions.headers(headers) }
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.lithic.api.core
4+
5+
import java.util.stream.Stream
6+
import java.util.stream.StreamSupport
7+
8+
class AutoPager<T> private constructor(private val firstPage: Page<T>) : Iterable<T> {
9+
10+
companion object {
11+
12+
fun <T> from(firstPage: Page<T>): AutoPager<T> = AutoPager(firstPage)
13+
}
14+
15+
override fun iterator(): Iterator<T> =
16+
generateSequence(firstPage) { if (it.hasNextPage()) it.nextPage() else null }
17+
.flatMap { it.items() }
18+
.iterator()
19+
20+
fun stream(): Stream<T> = StreamSupport.stream(spliterator(), false)
21+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.lithic.api.core
4+
5+
import com.lithic.api.core.http.AsyncStreamResponse
6+
import java.util.Optional
7+
import java.util.concurrent.CompletableFuture
8+
import java.util.concurrent.CompletionException
9+
import java.util.concurrent.Executor
10+
import java.util.concurrent.atomic.AtomicReference
11+
12+
class AutoPagerAsync<T>
13+
private constructor(private val firstPage: PageAsync<T>, private val defaultExecutor: Executor) :
14+
AsyncStreamResponse<T> {
15+
16+
companion object {
17+
18+
fun <T> from(firstPage: PageAsync<T>, defaultExecutor: Executor): AutoPagerAsync<T> =
19+
AutoPagerAsync(firstPage, defaultExecutor)
20+
}
21+
22+
private val onCompleteFuture = CompletableFuture<Void?>()
23+
private val state = AtomicReference(State.NEW)
24+
25+
override fun subscribe(handler: AsyncStreamResponse.Handler<T>): AsyncStreamResponse<T> =
26+
subscribe(handler, defaultExecutor)
27+
28+
override fun subscribe(
29+
handler: AsyncStreamResponse.Handler<T>,
30+
executor: Executor,
31+
): AsyncStreamResponse<T> = apply {
32+
// TODO(JDK): Use `compareAndExchange` once targeting JDK 9.
33+
check(state.compareAndSet(State.NEW, State.SUBSCRIBED)) {
34+
if (state.get() == State.SUBSCRIBED) "Cannot subscribe more than once"
35+
else "Cannot subscribe after the response is closed"
36+
}
37+
38+
fun PageAsync<T>.handle(): CompletableFuture<Void?> {
39+
if (state.get() == State.CLOSED) {
40+
return CompletableFuture.completedFuture(null)
41+
}
42+
43+
items().forEach { handler.onNext(it) }
44+
return if (hasNextPage()) nextPage().thenCompose { it.handle() }
45+
else CompletableFuture.completedFuture(null)
46+
}
47+
48+
executor.execute {
49+
firstPage.handle().whenComplete { _, error ->
50+
val actualError =
51+
if (error is CompletionException && error.cause != null) error.cause else error
52+
try {
53+
handler.onComplete(Optional.ofNullable(actualError))
54+
} finally {
55+
try {
56+
if (actualError == null) {
57+
onCompleteFuture.complete(null)
58+
} else {
59+
onCompleteFuture.completeExceptionally(actualError)
60+
}
61+
} finally {
62+
close()
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture
70+
71+
override fun close() {
72+
val previousState = state.getAndSet(State.CLOSED)
73+
if (previousState == State.CLOSED) {
74+
return
75+
}
76+
77+
// When the stream is closed, we should always consider it closed. If it closed due
78+
// to an error, then we will have already completed the future earlier, and this
79+
// will be a no-op.
80+
onCompleteFuture.complete(null)
81+
}
82+
}
83+
84+
private enum class State {
85+
NEW,
86+
SUBSCRIBED,
87+
CLOSED,
88+
}

lithic-java-core/src/main/kotlin/com/lithic/api/core/ClientOptions.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import com.lithic.api.core.http.QueryParams
1010
import com.lithic.api.core.http.RetryingHttpClient
1111
import java.time.Clock
1212
import java.util.Optional
13+
import java.util.concurrent.Executor
14+
import java.util.concurrent.Executors
15+
import java.util.concurrent.ThreadFactory
16+
import java.util.concurrent.atomic.AtomicLong
1317
import kotlin.jvm.optionals.getOrNull
1418

1519
class ClientOptions
@@ -18,6 +22,7 @@ private constructor(
1822
@get:JvmName("httpClient") val httpClient: HttpClient,
1923
@get:JvmName("checkJacksonVersionCompatibility") val checkJacksonVersionCompatibility: Boolean,
2024
@get:JvmName("jsonMapper") val jsonMapper: JsonMapper,
25+
@get:JvmName("streamHandlerExecutor") val streamHandlerExecutor: Executor,
2126
@get:JvmName("clock") val clock: Clock,
2227
@get:JvmName("baseUrl") val baseUrl: String,
2328
@get:JvmName("headers") val headers: Headers,
@@ -65,6 +70,7 @@ private constructor(
6570
private var httpClient: HttpClient? = null
6671
private var checkJacksonVersionCompatibility: Boolean = true
6772
private var jsonMapper: JsonMapper = jsonMapper()
73+
private var streamHandlerExecutor: Executor? = null
6874
private var clock: Clock = Clock.systemUTC()
6975
private var baseUrl: String = PRODUCTION_URL
7076
private var headers: Headers.Builder = Headers.builder()
@@ -80,6 +86,7 @@ private constructor(
8086
httpClient = clientOptions.originalHttpClient
8187
checkJacksonVersionCompatibility = clientOptions.checkJacksonVersionCompatibility
8288
jsonMapper = clientOptions.jsonMapper
89+
streamHandlerExecutor = clientOptions.streamHandlerExecutor
8390
clock = clientOptions.clock
8491
baseUrl = clientOptions.baseUrl
8592
headers = clientOptions.headers.toBuilder()
@@ -99,6 +106,10 @@ private constructor(
99106

100107
fun jsonMapper(jsonMapper: JsonMapper) = apply { this.jsonMapper = jsonMapper }
101108

109+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
110+
this.streamHandlerExecutor = streamHandlerExecutor
111+
}
112+
102113
fun clock(clock: Clock) = apply { this.clock = clock }
103114

104115
fun baseUrl(baseUrl: String) = apply { this.baseUrl = baseUrl }
@@ -253,6 +264,21 @@ private constructor(
253264
),
254265
checkJacksonVersionCompatibility,
255266
jsonMapper,
267+
streamHandlerExecutor
268+
?: Executors.newCachedThreadPool(
269+
object : ThreadFactory {
270+
271+
private val threadFactory: ThreadFactory =
272+
Executors.defaultThreadFactory()
273+
private val count = AtomicLong(0)
274+
275+
override fun newThread(runnable: Runnable): Thread =
276+
threadFactory.newThread(runnable).also {
277+
it.name =
278+
"lithic-stream-handler-thread-${count.getAndIncrement()}"
279+
}
280+
}
281+
),
256282
clock,
257283
baseUrl,
258284
headers.build(),
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.lithic.api.core
4+
5+
/**
6+
* An interface representing a single page, with items of type [T], from a paginated endpoint
7+
* response.
8+
*
9+
* Implementations of this interface are expected to request additional pages synchronously. For
10+
* asynchronous pagination, see the [PageAsync] interface.
11+
*/
12+
interface Page<T> {
13+
14+
/**
15+
* Returns whether there's another page after this one.
16+
*
17+
* The method generally doesn't make requests so the result depends entirely on the data in this
18+
* page. If a significant amount of time has passed between requesting this page and calling
19+
* this method, then the result could be stale.
20+
*/
21+
fun hasNextPage(): Boolean
22+
23+
/**
24+
* Returns the page after this one by making another request.
25+
*
26+
* @throws IllegalStateException if it's impossible to get the next page. This exception is
27+
* avoidable by calling [hasNextPage] first.
28+
*/
29+
fun nextPage(): Page<T>
30+
31+
/** Returns the items in this page. */
32+
fun items(): List<T>
33+
}

0 commit comments

Comments
 (0)