diff --git a/openai-java-client-okhttp/src/main/kotlin/com/openai/client/okhttp/OkHttpClient.kt b/openai-java-client-okhttp/src/main/kotlin/com/openai/client/okhttp/OkHttpClient.kt index d99c47cb0..f36954771 100644 --- a/openai-java-client-okhttp/src/main/kotlin/com/openai/client/okhttp/OkHttpClient.kt +++ b/openai-java-client-okhttp/src/main/kotlin/com/openai/client/okhttp/OkHttpClient.kt @@ -2,6 +2,7 @@ package com.openai.client.okhttp import com.openai.core.RequestOptions import com.openai.core.Timeout +import com.openai.core.http.CancellationToken import com.openai.core.http.Headers import com.openai.core.http.HttpClient import com.openai.core.http.HttpMethod @@ -47,23 +48,68 @@ class OkHttpClient private constructor(private val okHttpClient: okhttp3.OkHttpC override fun executeAsync( request: HttpRequest, requestOptions: RequestOptions, + ): CompletableFuture { + return executeAsync(request, requestOptions, CancellationToken.none()) + } + + override fun executeAsync( + request: HttpRequest, + requestOptions: RequestOptions, + cancellationToken: CancellationToken, ): CompletableFuture { val future = CompletableFuture() + // Check if already cancelled + if (cancellationToken.isCancellationRequested()) { + future.completeExceptionally( + java.util.concurrent.CancellationException("Request was cancelled before execution") + ) + request.body?.close() + return future + } + request.body?.run { future.whenComplete { _, _ -> close() } } - newCall(request, requestOptions) - .enqueue( - object : Callback { - override fun onResponse(call: Call, response: Response) { + val call = newCall(request, requestOptions) + + // Register cancellation callback + val registration = + cancellationToken.register( + Runnable { + // Cancel the OkHttp call + call.cancel() + // Complete the future with cancellation exception + future.completeExceptionally( + java.util.concurrent.CancellationException("Request was cancelled") + ) + } + ) + + // Enqueue the call + call.enqueue( + object : Callback { + override fun onResponse(call: Call, response: Response) { + registration.unregister() + if (!future.isDone) { future.complete(response.toResponse()) } + } - override fun onFailure(call: Call, e: IOException) { - future.completeExceptionally(OpenAIIoException("Request failed", e)) + override fun onFailure(call: Call, e: IOException) { + registration.unregister() + if (!future.isDone) { + // Check if this was a cancellation + if (cancellationToken.isCancellationRequested()) { + future.completeExceptionally( + java.util.concurrent.CancellationException("Request was cancelled") + ) + } else { + future.completeExceptionally(OpenAIIoException("Request failed", e)) + } } } - ) + } + ) return future } diff --git a/openai-java-core/src/main/kotlin/com/openai/core/CancellableCompletableFuture.kt b/openai-java-core/src/main/kotlin/com/openai/core/CancellableCompletableFuture.kt new file mode 100644 index 000000000..53fffe968 --- /dev/null +++ b/openai-java-core/src/main/kotlin/com/openai/core/CancellableCompletableFuture.kt @@ -0,0 +1,153 @@ +package com.openai.core + +import com.openai.core.http.CancellationToken +import com.openai.core.http.CancellationTokenSource +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.function.BiConsumer +import java.util.function.BiFunction +import java.util.function.Consumer +import java.util.function.Function + +/** + * A [CompletableFuture] that supports cancellation via a [CancellationToken]. + * + * When [cancel] is called, this future will: + * 1. Request cancellation via the associated [CancellationToken] + * 2. Complete exceptionally with a [java.util.concurrent.CancellationException] + * 3. Allow the underlying operation to clean up resources via the token's callbacks + * + * @param T the result type + */ +class CancellableCompletableFuture +private constructor( + private val cancellationTokenSource: CancellationTokenSource, + private val delegate: CompletableFuture +) : CompletableFuture() { + + init { + // Forward completion from delegate to this future + delegate.whenComplete { result, error -> + if (error != null) { + this.completeExceptionally(error) + } else { + this.complete(result) + } + } + } + + /** + * Returns the [CancellationToken] associated with this future. + */ + fun cancellationToken(): CancellationToken = cancellationTokenSource.token() + + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + // Request cancellation via the token (this will trigger any registered callbacks) + cancellationTokenSource.cancel() + + // Complete the delegate future exceptionally if not already done + val cancelled = + delegate.completeExceptionally( + java.util.concurrent.CancellationException("Future was cancelled") + ) + + // Also mark this future as cancelled + super.cancel(mayInterruptIfRunning) + + return cancelled + } + + override fun isCancelled(): Boolean { + return super.isCancelled() || delegate.isCancelled() + } + + // Delegated methods that return CancellableCompletableFuture to maintain type + override fun thenApply(fn: Function): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenApply(fn)) + } + + override fun thenApplyAsync(fn: Function): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenApplyAsync(fn)) + } + + override fun thenCompose( + fn: Function> + ): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenCompose(fn).toCompletableFuture()) + } + + override fun thenComposeAsync( + fn: Function> + ): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenComposeAsync(fn).toCompletableFuture()) + } + + override fun thenAccept(action: Consumer): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenAccept(action)) + } + + override fun thenAcceptAsync(action: Consumer): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenAcceptAsync(action)) + } + + override fun thenRun(action: Runnable): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenRun(action)) + } + + override fun thenRunAsync(action: Runnable): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.thenRunAsync(action)) + } + + override fun handle(fn: BiFunction): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.handle(fn)) + } + + override fun handleAsync( + fn: BiFunction + ): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.handleAsync(fn)) + } + + override fun whenComplete( + action: BiConsumer + ): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.whenComplete(action)) + } + + override fun whenCompleteAsync( + action: BiConsumer + ): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.whenCompleteAsync(action)) + } + + override fun exceptionally(fn: Function): CancellableCompletableFuture { + return wrap(cancellationTokenSource, delegate.exceptionally(fn)) + } + + companion object { + /** + * Creates a new [CancellableCompletableFuture] with a new cancellation token source. + */ + @JvmStatic + fun create(): CancellableCompletableFuture { + return CancellableCompletableFuture( + CancellationTokenSource(), + CompletableFuture() + ) + } + + /** + * Wraps an existing [CompletableFuture] with cancellation support. + * + * @param tokenSource the cancellation token source to use + * @param delegate the future to wrap + */ + @JvmStatic + fun wrap( + tokenSource: CancellationTokenSource, + delegate: CompletableFuture + ): CancellableCompletableFuture { + return CancellableCompletableFuture(tokenSource, delegate) + } + } +} diff --git a/openai-java-core/src/main/kotlin/com/openai/core/CancellableCompletableFutureExtensions.kt b/openai-java-core/src/main/kotlin/com/openai/core/CancellableCompletableFutureExtensions.kt new file mode 100644 index 000000000..b886c40ac --- /dev/null +++ b/openai-java-core/src/main/kotlin/com/openai/core/CancellableCompletableFutureExtensions.kt @@ -0,0 +1,18 @@ +package com.openai.core + +import com.openai.core.http.CancellationTokenSource +import java.util.concurrent.CompletableFuture + +/** + * Wraps a [CompletableFuture] with cancellation token support. + * + * This extension method provides a clean way to add cancellation support to CompletableFutures. + * + * @param tokenSource the cancellation token source to use + * @return a [CancellableCompletableFuture] that can be cancelled via the token + */ +fun CompletableFuture.withCancellation( + tokenSource: CancellationTokenSource +): CancellableCompletableFuture { + return CancellableCompletableFuture.wrap(tokenSource, this) +} diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/CancellationToken.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/CancellationToken.kt new file mode 100644 index 000000000..495b300ff --- /dev/null +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/CancellationToken.kt @@ -0,0 +1,73 @@ +package com.openai.core.http + +/** + * A token that can be used to signal cancellation of an asynchronous operation. + * + * Cancellation tokens are created by [CancellationTokenSource] and can be passed through + * async operation chains to enable cooperative cancellation. + */ +interface CancellationToken { + + /** + * Returns true if cancellation has been requested for this token. + */ + fun isCancellationRequested(): Boolean + + /** + * Registers a callback to be invoked when cancellation is requested. + * + * If cancellation has already been requested, the callback will be invoked immediately + * on the calling thread. + * + * @param callback the callback to invoke on cancellation + * @return a [Registration] that can be used to unregister the callback + */ + fun register(callback: Runnable): Registration + + /** + * Throws [CancellationException] if cancellation has been requested. + */ + fun throwIfCancellationRequested() { + if (isCancellationRequested()) { + throw CancellationException("Operation was cancelled") + } + } + + /** + * A registration handle returned by [register] that can be used to unregister a callback. + */ + interface Registration : AutoCloseable { + /** + * Unregisters the callback associated with this registration. + * After this method returns, the callback will not be invoked, even if + * cancellation is subsequently requested. + */ + fun unregister() + + /** Alias for [unregister] to support AutoCloseable. */ + override fun close() = unregister() + } + + companion object { + /** + * Returns a cancellation token that will never have cancellation requested. + */ + @JvmStatic + fun none(): CancellationToken = NoneCancellationToken + + private object NoneCancellationToken : CancellationToken { + override fun isCancellationRequested(): Boolean = false + + override fun register(callback: Runnable): Registration = + object : Registration { + override fun unregister() {} + } + } + } +} + +/** + * Exception thrown when an operation is cancelled via a [CancellationToken]. + */ +class CancellationException(message: String? = null, cause: Throwable? = null) : + RuntimeException(message, cause) diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/CancellationTokenSource.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/CancellationTokenSource.kt new file mode 100644 index 000000000..0508515ab --- /dev/null +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/CancellationTokenSource.kt @@ -0,0 +1,110 @@ +package com.openai.core.http + +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.atomic.AtomicBoolean + +/** + * A source for creating and controlling [CancellationToken]s. + * + * This class is thread-safe and can be used from multiple threads concurrently. + * + * Example usage: + * ``` + * val source = CancellationTokenSource() + * val token = source.token() + * + * // Pass token to async operations + * asyncOperation(token) + * + * // Cancel the operation + * source.cancel() + * ``` + */ +class CancellationTokenSource : AutoCloseable { + + private val cancelled = AtomicBoolean(false) + private val callbacks = CopyOnWriteArrayList() + + private val token = + object : CancellationToken { + override fun isCancellationRequested(): Boolean = cancelled.get() + + override fun register(callback: Runnable): CancellationToken.Registration { + // If already cancelled, invoke immediately + if (cancelled.get()) { + safeInvoke(callback) + return NoOpRegistration + } + + // Add callback to list + callbacks.add(callback) + + // Check again in case cancellation happened between the check and add + if (cancelled.get()) { + safeInvoke(callback) + } + + return object : CancellationToken.Registration { + private val unregistered = AtomicBoolean(false) + + override fun unregister() { + if (unregistered.compareAndSet(false, true)) { + callbacks.remove(callback) + } + } + } + } + } + + /** + * Returns the [CancellationToken] controlled by this source. + */ + fun token(): CancellationToken = token + + /** + * Requests cancellation of the operations associated with this token source. + * + * This method is idempotent - calling it multiple times has the same effect as calling + * it once. + * + * All registered callbacks will be invoked synchronously on the calling thread. + */ + fun cancel() { + if (cancelled.compareAndSet(false, true)) { + // Invoke all callbacks + val callbacksCopy = callbacks.toList() + callbacks.clear() + + callbacksCopy.forEach { callback -> safeInvoke(callback) } + } + } + + /** + * Returns true if cancellation has been requested. + */ + fun isCancellationRequested(): Boolean = cancelled.get() + + /** + * Closes this cancellation token source. This is equivalent to calling [cancel]. + */ + override fun close() { + cancel() + } + + private fun safeInvoke(callback: Runnable) { + try { + callback.run() + } catch (e: Throwable) { + // Log the error but don't let it propagate and prevent other callbacks from running + System.err.println("Error in cancellation callback: ${e.message}") + e.printStackTrace() + } + } + + private companion object { + private val NoOpRegistration = + object : CancellationToken.Registration { + override fun unregister() {} + } + } +} diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/HttpClient.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/HttpClient.kt index 3db3a2669..b82520423 100644 --- a/openai-java-core/src/main/kotlin/com/openai/core/http/HttpClient.kt +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/HttpClient.kt @@ -21,6 +21,24 @@ interface HttpClient : AutoCloseable { fun executeAsync(request: HttpRequest): CompletableFuture = executeAsync(request, RequestOptions.none()) + /** + * Executes an HTTP request asynchronously with cancellation support. + * + * @param request the HTTP request to execute + * @param requestOptions options for this request + * @param cancellationToken token that can be used to cancel the operation + * @return a CompletableFuture that completes with the HTTP response + */ + fun executeAsync( + request: HttpRequest, + requestOptions: RequestOptions = RequestOptions.none(), + cancellationToken: CancellationToken = CancellationToken.none(), + ): CompletableFuture { + // Default implementation delegates to non-cancellable version for backward compatibility + // Implementations should override this to provide proper cancellation support + return executeAsync(request, requestOptions) + } + /** Overridden from [AutoCloseable] to not have a checked exception in its signature. */ override fun close() } diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/PhantomReachableClosingHttpClient.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/PhantomReachableClosingHttpClient.kt index 4d891cc69..6fa5b851b 100644 --- a/openai-java-core/src/main/kotlin/com/openai/core/http/PhantomReachableClosingHttpClient.kt +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/PhantomReachableClosingHttpClient.kt @@ -22,5 +22,11 @@ internal class PhantomReachableClosingHttpClient(private val httpClient: HttpCli requestOptions: RequestOptions, ): CompletableFuture = httpClient.executeAsync(request, requestOptions) + override fun executeAsync( + request: HttpRequest, + requestOptions: RequestOptions, + cancellationToken: CancellationToken + ): CompletableFuture = httpClient.executeAsync(request, requestOptions, cancellationToken) + override fun close() = httpClient.close() } diff --git a/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt b/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt index 6eaa33a25..2a351f5db 100644 --- a/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt +++ b/openai-java-core/src/main/kotlin/com/openai/core/http/RetryingHttpClient.kt @@ -74,9 +74,17 @@ private constructor( override fun executeAsync( request: HttpRequest, requestOptions: RequestOptions, + ): CompletableFuture { + return executeAsync(request, requestOptions, CancellationToken.none()) + } + + override fun executeAsync( + request: HttpRequest, + requestOptions: RequestOptions, + cancellationToken: CancellationToken, ): CompletableFuture { if (!isRetryable(request) || maxRetries <= 0) { - return httpClient.executeAsync(request, requestOptions) + return httpClient.executeAsync(request, requestOptions, cancellationToken) } val modifiedRequest = maybeAddIdempotencyHeader(request) @@ -91,16 +99,35 @@ private constructor( request: HttpRequest, requestOptions: RequestOptions, ): CompletableFuture { + // Check if cancelled before attempting retry + if (cancellationToken.isCancellationRequested()) { + val cancelledFuture = CompletableFuture() + cancelledFuture.completeExceptionally( + java.util.concurrent.CancellationException("Request was cancelled") + ) + return cancelledFuture + } + val requestWithRetryCount = if (shouldSendRetryCount) setRetryCountHeader(request, retries) else request return httpClient - .executeAsync(requestWithRetryCount, requestOptions) + .executeAsync(requestWithRetryCount, requestOptions, cancellationToken) .handleAsync( fun( response: HttpResponse?, throwable: Throwable?, ): CompletableFuture { + // Check if this was a cancellation + if (cancellationToken.isCancellationRequested()) { + response?.close() + val cancelledFuture = CompletableFuture() + cancelledFuture.completeExceptionally( + java.util.concurrent.CancellationException("Request was cancelled") + ) + return cancelledFuture + } + if (response != null) { if (++retries > maxRetries || !shouldRetry(response)) { return CompletableFuture.completedFuture(response) @@ -116,6 +143,16 @@ private constructor( val backoffDuration = getRetryBackoffDuration(retries, response) // All responses must be closed, so close the failed one before retrying. response?.close() + + // Check cancellation before sleeping + if (cancellationToken.isCancellationRequested()) { + val cancelledFuture = CompletableFuture() + cancelledFuture.completeExceptionally( + java.util.concurrent.CancellationException("Request was cancelled") + ) + return cancelledFuture + } + return sleeper.sleepAsync(backoffDuration).thenCompose { executeWithRetries(requestWithRetryCount, requestOptions) } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt index 095861f0e..1ce4eaaea 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/EmbeddingServiceAsyncImpl.kt @@ -7,6 +7,7 @@ import com.openai.core.RequestOptions import com.openai.core.handlers.errorBodyHandler import com.openai.core.handlers.errorHandler import com.openai.core.handlers.jsonHandler +import com.openai.core.http.CancellationTokenSource import com.openai.core.http.HttpMethod import com.openai.core.http.HttpRequest import com.openai.core.http.HttpResponse @@ -15,6 +16,7 @@ import com.openai.core.http.HttpResponseFor import com.openai.core.http.json import com.openai.core.http.parseable import com.openai.core.prepareAsync +import com.openai.core.withCancellation import com.openai.models.embeddings.CreateEmbeddingResponse import com.openai.models.embeddings.EmbeddingCreateParams import java.util.concurrent.CompletableFuture @@ -59,6 +61,7 @@ class EmbeddingServiceAsyncImpl internal constructor(private val clientOptions: params: EmbeddingCreateParams, requestOptions: RequestOptions, ): CompletableFuture> { + val cancellationTokenSource = CancellationTokenSource() val request = HttpRequest.builder() .method(HttpMethod.POST) @@ -69,7 +72,13 @@ class EmbeddingServiceAsyncImpl internal constructor(private val clientOptions: .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { + clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) + } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -81,6 +90,7 @@ class EmbeddingServiceAsyncImpl internal constructor(private val clientOptions: } } } + .withCancellation(cancellationTokenSource) } } } diff --git a/openai-java-core/src/main/kotlin/com/openai/services/async/chat/ChatCompletionServiceAsyncImpl.kt b/openai-java-core/src/main/kotlin/com/openai/services/async/chat/ChatCompletionServiceAsyncImpl.kt index 391be3b16..4ad74f473 100644 --- a/openai-java-core/src/main/kotlin/com/openai/services/async/chat/ChatCompletionServiceAsyncImpl.kt +++ b/openai-java-core/src/main/kotlin/com/openai/services/async/chat/ChatCompletionServiceAsyncImpl.kt @@ -35,6 +35,8 @@ import com.openai.models.chat.completions.ChatCompletionRetrieveParams import com.openai.models.chat.completions.ChatCompletionUpdateParams import com.openai.services.async.chat.completions.MessageServiceAsync import com.openai.services.async.chat.completions.MessageServiceAsyncImpl +import com.openai.core.http.CancellationTokenSource +import com.openai.core.withCancellation import java.util.concurrent.CompletableFuture import java.util.function.Consumer import kotlin.jvm.optionals.getOrNull @@ -128,6 +130,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS params: ChatCompletionCreateParams, requestOptions: RequestOptions, ): CompletableFuture> { + val cancellationTokenSource = CancellationTokenSource() val request = HttpRequest.builder() .method(HttpMethod.POST) @@ -138,7 +141,11 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -150,6 +157,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS } } } + .withCancellation(cancellationTokenSource) } private val createStreamingHandler: Handler> = @@ -159,6 +167,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS params: ChatCompletionCreateParams, requestOptions: RequestOptions, ): CompletableFuture>> { + val cancellationTokenSource = CancellationTokenSource() val request = HttpRequest.builder() .method(HttpMethod.POST) @@ -178,7 +187,11 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -192,6 +205,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS } } } + .withCancellation(cancellationTokenSource) } private val retrieveHandler: Handler = @@ -201,6 +215,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS params: ChatCompletionRetrieveParams, requestOptions: RequestOptions, ): CompletableFuture> { + val cancellationTokenSource = CancellationTokenSource() // We check here instead of in the params builder because this can be specified // positionally or in the params class. checkRequired("completionId", params.completionId().getOrNull()) @@ -213,7 +228,11 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -225,6 +244,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS } } } + .withCancellation(cancellationTokenSource) } private val updateHandler: Handler = @@ -234,6 +254,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS params: ChatCompletionUpdateParams, requestOptions: RequestOptions, ): CompletableFuture> { + val cancellationTokenSource = CancellationTokenSource() // We check here instead of in the params builder because this can be specified // positionally or in the params class. checkRequired("completionId", params.completionId().getOrNull()) @@ -247,7 +268,11 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -259,6 +284,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS } } } + .withCancellation(cancellationTokenSource) } private val listHandler: Handler = @@ -268,6 +294,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS params: ChatCompletionListParams, requestOptions: RequestOptions, ): CompletableFuture> { + val cancellationTokenSource = CancellationTokenSource() val request = HttpRequest.builder() .method(HttpMethod.GET) @@ -277,7 +304,11 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -297,6 +328,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS } } } + .withCancellation(cancellationTokenSource) } private val deleteHandler: Handler = @@ -306,6 +338,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS params: ChatCompletionDeleteParams, requestOptions: RequestOptions, ): CompletableFuture> { + val cancellationTokenSource = CancellationTokenSource() // We check here instead of in the params builder because this can be specified // positionally or in the params class. checkRequired("completionId", params.completionId().getOrNull()) @@ -319,7 +352,11 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS .prepareAsync(clientOptions, params) val requestOptions = requestOptions.applyDefaults(RequestOptions.from(clientOptions)) return request - .thenComposeAsync { clientOptions.httpClient.executeAsync(it, requestOptions) } + .thenComposeAsync { clientOptions.httpClient.executeAsync( + it, + requestOptions, + cancellationTokenSource.token() + ) } .thenApply { response -> errorHandler.handle(response).parseable { response @@ -331,6 +368,7 @@ internal constructor(private val clientOptions: ClientOptions) : ChatCompletionS } } } + .withCancellation(cancellationTokenSource) } } }