Skip to content

Commit 8feecd2

Browse files
authored
EFF-717 OpenAiSocket should cancel the request on interrupt (#1759)
1 parent f2f75ee commit 8feecd2

File tree

3 files changed

+68
-3
lines changed

3 files changed

+68
-3
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/ai-openai": patch
3+
---
4+
5+
Ensure OpenAiSocket sends a `{"type":"response.cancel"}` websocket event when a response stream is interrupted.

packages/ai/openai/src/OpenAiClient.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import * as Array from "effect/Array"
1010
import * as Cause from "effect/Cause"
1111
import type * as Config from "effect/Config"
1212
import * as Effect from "effect/Effect"
13+
import * as Exit from "effect/Exit"
1314
import { identity } from "effect/Function"
1415
import * as Function from "effect/Function"
1516
import * as Layer from "effect/Layer"
@@ -420,6 +421,10 @@ const makeSocket = Effect.gen(function*() {
420421
)
421422
)
422423

424+
const cancel = Effect.suspend(() => write(JSON.stringify({ type: "response.cancel" }))).pipe(
425+
Effect.ignore
426+
)
427+
423428
const decoder = new TextDecoder()
424429
const decode = Schema.decodeUnknownSync(Schema.fromJsonString(Generated.ResponseStreamEvent))
425430
yield* socket.runRaw((msg) => {
@@ -462,7 +467,7 @@ const makeSocket = Effect.gen(function*() {
462467
Effect.forkScoped
463468
)
464469

465-
return { send } as const
470+
return { send, cancel } as const
466471
})
467472
})
468473

@@ -476,11 +481,12 @@ const makeSocket = Effect.gen(function*() {
476481
semaphore.take(1),
477482
() => semaphore.release(1)
478483
)
479-
const { send } = yield* RcRef.get(queueRef)
484+
const { send, cancel } = yield* RcRef.get(queueRef)
480485
const incoming = yield* Queue.unbounded<ResponseStreamEvent, AiError.AiError | Cause.Done>()
481486
yield* Effect.forkScoped(send(incoming, options), { startImmediately: true })
482487
return Stream.fromQueue(incoming).pipe(
483-
Stream.takeUntil((e) => e.type === "response.completed" || e.type === "response.incomplete")
488+
Stream.takeUntil((e) => e.type === "response.completed" || e.type === "response.incomplete"),
489+
Stream.onExit((exit) => Exit.hasInterrupts(exit) ? cancel : Effect.void)
484490
)
485491
}).pipe(Stream.unwrap)
486492

packages/ai/openai/test/OpenAiClient.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import * as Errors from "@effect/ai-openai/internal/errors"
33
import * as OpenAiClient from "@effect/ai-openai/OpenAiClient"
44
import { assert, describe, it } from "@effect/vitest"
55
import { Config, ConfigProvider, Effect, Layer, Redacted, Schema, Stream } from "effect"
6+
import * as Fiber from "effect/Fiber"
67
import type * as AiError from "effect/unstable/ai/AiError"
78
import * as HttpClient from "effect/unstable/http/HttpClient"
89
import * as HttpClientError from "effect/unstable/http/HttpClientError"
910
import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest"
1011
import * as HttpClientResponse from "effect/unstable/http/HttpClientResponse"
12+
import * as Socket from "effect/unstable/socket/Socket"
13+
import { WS } from "vitest-websocket-mock"
1114

1215
// =============================================================================
1316
// Mock Helpers
@@ -631,5 +634,56 @@ describe("OpenAiClient", () => {
631634
assert.strictEqual(result.reason._tag, "InternalProviderError")
632635
}).pipe(Effect.provide(MainLayer))
633636
})
637+
638+
it.effect("sends response.cancel on interrupt in websocket mode", () => {
639+
const port = 42345
640+
const apiUrl = `http://localhost:${port}/v1`
641+
const serverUrl = `ws://localhost:${port}/v1/responses`
642+
643+
const HttpClientLayer = Layer.succeed(
644+
HttpClient.HttpClient,
645+
makeMockHttpClient((request) => Effect.succeed(makeMockResponse({ status: 200, body: {}, request })))
646+
)
647+
648+
const MainLayer = OpenAiClient.layer({
649+
apiKey: Redacted.make("test-key"),
650+
apiUrl
651+
}).pipe(Layer.provide(HttpClientLayer))
652+
653+
return Effect.gen(function*() {
654+
const server = yield* Effect.acquireRelease(
655+
Effect.sync(() => new WS(serverUrl, { jsonProtocol: true })),
656+
(server) =>
657+
Effect.sync(() => {
658+
server.close()
659+
WS.clean()
660+
})
661+
)
662+
663+
const client = yield* OpenAiClient.OpenAiClient
664+
const fiber = yield* Effect.forkScoped(
665+
OpenAiClient.withWebSocketMode(
666+
client.createResponseStream({
667+
model: "gpt-4o",
668+
input: "test"
669+
}).pipe(
670+
Effect.andThen(([_, stream]) => Stream.runDrain(stream))
671+
)
672+
),
673+
{ startImmediately: true }
674+
)
675+
676+
const createEvent = yield* Effect.promise(() => server.nextMessage as Promise<any>)
677+
assert.strictEqual(createEvent.type, "response.create")
678+
679+
yield* Fiber.interrupt(fiber)
680+
681+
const cancelEvent = yield* Effect.promise(() => server.nextMessage as Promise<any>)
682+
assert.deepStrictEqual(cancelEvent, { type: "response.cancel" })
683+
}).pipe(
684+
Effect.provide(MainLayer),
685+
Effect.provideService(Socket.WebSocketConstructor, (url) => new globalThis.WebSocket(url))
686+
)
687+
})
634688
})
635689
})

0 commit comments

Comments
 (0)