Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.openai.client.okhttp

import com.openai.core.RequestOptions
import com.openai.core.Timeout
import com.openai.core.http.CancellationToken
import com.openai.core.http.Headers
import com.openai.core.http.HttpClient
import com.openai.core.http.HttpMethod
Expand Down Expand Up @@ -47,23 +48,68 @@ class OkHttpClient private constructor(private val okHttpClient: okhttp3.OkHttpC
override fun executeAsync(
request: HttpRequest,
requestOptions: RequestOptions,
): CompletableFuture<HttpResponse> {
return executeAsync(request, requestOptions, CancellationToken.none())
}

override fun executeAsync(
request: HttpRequest,
requestOptions: RequestOptions,
cancellationToken: CancellationToken,
): CompletableFuture<HttpResponse> {
val future = CompletableFuture<HttpResponse>()

// Check if already cancelled
if (cancellationToken.isCancellationRequested()) {
future.completeExceptionally(
java.util.concurrent.CancellationException("Request was cancelled before execution")
)
request.body?.close()
return future
}

request.body?.run { future.whenComplete { _, _ -> close() } }

newCall(request, requestOptions)
.enqueue(
object : Callback {
override fun onResponse(call: Call, response: Response) {
val call = newCall(request, requestOptions)

// Register cancellation callback
val registration =
cancellationToken.register(
Runnable {
// Cancel the OkHttp call
call.cancel()
// Complete the future with cancellation exception
future.completeExceptionally(
java.util.concurrent.CancellationException("Request was cancelled")
)
}
)

// Enqueue the call
call.enqueue(
object : Callback {
override fun onResponse(call: Call, response: Response) {
registration.unregister()
if (!future.isDone) {
future.complete(response.toResponse())
}
}

override fun onFailure(call: Call, e: IOException) {
future.completeExceptionally(OpenAIIoException("Request failed", e))
override fun onFailure(call: Call, e: IOException) {
registration.unregister()
if (!future.isDone) {
// Check if this was a cancellation
if (cancellationToken.isCancellationRequested()) {
future.completeExceptionally(
java.util.concurrent.CancellationException("Request was cancelled")
)
} else {
future.completeExceptionally(OpenAIIoException("Request failed", e))
}
}
}
)
}
)

return future
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.openai.core

import com.openai.core.http.CancellationToken
import com.openai.core.http.CancellationTokenSource
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.BiConsumer
import java.util.function.BiFunction
import java.util.function.Consumer
import java.util.function.Function

/**
* A [CompletableFuture] that supports cancellation via a [CancellationToken].
*
* When [cancel] is called, this future will:
* 1. Request cancellation via the associated [CancellationToken]
* 2. Complete exceptionally with a [java.util.concurrent.CancellationException]
* 3. Allow the underlying operation to clean up resources via the token's callbacks
*
* @param T the result type
*/
class CancellableCompletableFuture<T>
private constructor(
private val cancellationTokenSource: CancellationTokenSource,
private val delegate: CompletableFuture<T>
) : CompletableFuture<T>() {

init {
// Forward completion from delegate to this future
delegate.whenComplete { result, error ->
if (error != null) {
this.completeExceptionally(error)
} else {
this.complete(result)
}
}
}

/**
* Returns the [CancellationToken] associated with this future.
*/
fun cancellationToken(): CancellationToken = cancellationTokenSource.token()

override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
// Request cancellation via the token (this will trigger any registered callbacks)
cancellationTokenSource.cancel()

// Complete the delegate future exceptionally if not already done
val cancelled =
delegate.completeExceptionally(
java.util.concurrent.CancellationException("Future was cancelled")
)

// Also mark this future as cancelled
super.cancel(mayInterruptIfRunning)

return cancelled
}

override fun isCancelled(): Boolean {
return super.isCancelled() || delegate.isCancelled()
}

// Delegated methods that return CancellableCompletableFuture to maintain type
override fun <U> thenApply(fn: Function<in T, out U>): CancellableCompletableFuture<U> {
return wrap(cancellationTokenSource, delegate.thenApply(fn))
}

override fun <U> thenApplyAsync(fn: Function<in T, out U>): CancellableCompletableFuture<U> {
return wrap(cancellationTokenSource, delegate.thenApplyAsync(fn))
}

override fun <U> thenCompose(
fn: Function<in T, out CompletionStage<U>>
): CancellableCompletableFuture<U> {
return wrap(cancellationTokenSource, delegate.thenCompose(fn).toCompletableFuture())
}

override fun <U> thenComposeAsync(
fn: Function<in T, out CompletionStage<U>>
): CancellableCompletableFuture<U> {
return wrap(cancellationTokenSource, delegate.thenComposeAsync(fn).toCompletableFuture())
}

override fun thenAccept(action: Consumer<in T>): CancellableCompletableFuture<Void> {
return wrap(cancellationTokenSource, delegate.thenAccept(action))
}

override fun thenAcceptAsync(action: Consumer<in T>): CancellableCompletableFuture<Void> {
return wrap(cancellationTokenSource, delegate.thenAcceptAsync(action))
}

override fun thenRun(action: Runnable): CancellableCompletableFuture<Void> {
return wrap(cancellationTokenSource, delegate.thenRun(action))
}

override fun thenRunAsync(action: Runnable): CancellableCompletableFuture<Void> {
return wrap(cancellationTokenSource, delegate.thenRunAsync(action))
}

override fun <U> handle(fn: BiFunction<in T, Throwable, out U>): CancellableCompletableFuture<U> {
return wrap(cancellationTokenSource, delegate.handle(fn))
}

override fun <U> handleAsync(
fn: BiFunction<in T, Throwable, out U>
): CancellableCompletableFuture<U> {
return wrap(cancellationTokenSource, delegate.handleAsync(fn))
}

override fun whenComplete(
action: BiConsumer<in T, in Throwable>
): CancellableCompletableFuture<T> {
return wrap(cancellationTokenSource, delegate.whenComplete(action))
}

override fun whenCompleteAsync(
action: BiConsumer<in T, in Throwable>
): CancellableCompletableFuture<T> {
return wrap(cancellationTokenSource, delegate.whenCompleteAsync(action))
}

override fun exceptionally(fn: Function<Throwable, out T>): CancellableCompletableFuture<T> {
return wrap(cancellationTokenSource, delegate.exceptionally(fn))
}

companion object {
/**
* Creates a new [CancellableCompletableFuture] with a new cancellation token source.
*/
@JvmStatic
fun <T> create(): CancellableCompletableFuture<T> {
return CancellableCompletableFuture(
CancellationTokenSource(),
CompletableFuture<T>()
)
}

/**
* Wraps an existing [CompletableFuture] with cancellation support.
*
* @param tokenSource the cancellation token source to use
* @param delegate the future to wrap
*/
@JvmStatic
fun <T> wrap(
tokenSource: CancellationTokenSource,
delegate: CompletableFuture<T>
): CancellableCompletableFuture<T> {
return CancellableCompletableFuture(tokenSource, delegate)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.openai.core

import com.openai.core.http.CancellationTokenSource
import java.util.concurrent.CompletableFuture

/**
* Wraps a [CompletableFuture] with cancellation token support.
*
* This extension method provides a clean way to add cancellation support to CompletableFutures.
*
* @param tokenSource the cancellation token source to use
* @return a [CancellableCompletableFuture] that can be cancelled via the token
*/
fun <T> CompletableFuture<T>.withCancellation(
tokenSource: CancellationTokenSource
): CancellableCompletableFuture<T> {
return CancellableCompletableFuture.wrap(tokenSource, this)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.openai.core.http

/**
* A token that can be used to signal cancellation of an asynchronous operation.
*
* Cancellation tokens are created by [CancellationTokenSource] and can be passed through
* async operation chains to enable cooperative cancellation.
*/
interface CancellationToken {

/**
* Returns true if cancellation has been requested for this token.
*/
fun isCancellationRequested(): Boolean

/**
* Registers a callback to be invoked when cancellation is requested.
*
* If cancellation has already been requested, the callback will be invoked immediately
* on the calling thread.
*
* @param callback the callback to invoke on cancellation
* @return a [Registration] that can be used to unregister the callback
*/
fun register(callback: Runnable): Registration

/**
* Throws [CancellationException] if cancellation has been requested.
*/
fun throwIfCancellationRequested() {
if (isCancellationRequested()) {
throw CancellationException("Operation was cancelled")
}
}

/**
* A registration handle returned by [register] that can be used to unregister a callback.
*/
interface Registration : AutoCloseable {
/**
* Unregisters the callback associated with this registration.
* After this method returns, the callback will not be invoked, even if
* cancellation is subsequently requested.
*/
fun unregister()

/** Alias for [unregister] to support AutoCloseable. */
override fun close() = unregister()
}

companion object {
/**
* Returns a cancellation token that will never have cancellation requested.
*/
@JvmStatic
fun none(): CancellationToken = NoneCancellationToken

private object NoneCancellationToken : CancellationToken {
override fun isCancellationRequested(): Boolean = false

override fun register(callback: Runnable): Registration =
object : Registration {
override fun unregister() {}
}
}
}
}

/**
* Exception thrown when an operation is cancelled via a [CancellationToken].
*/
class CancellationException(message: String? = null, cause: Throwable? = null) :
RuntimeException(message, cause)
Loading