Skip to content

Commit fc1444d

Browse files
authored
openai ws tweaks (#1905)
1 parent cf50eb4 commit fc1444d

File tree

2 files changed

+53
-45
lines changed

2 files changed

+53
-45
lines changed

.changeset/yellow-adults-study.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/ai-openai": patch
3+
---
4+
5+
openai ws tweaks

packages/ai/openai/src/OpenAiClient.ts

Lines changed: 48 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
* @since 1.0.0
88
*/
99
import * as Array from "effect/Array"
10-
import * as Cause from "effect/Cause"
1110
import type * as Config from "effect/Config"
1211
import * as Effect from "effect/Effect"
1312
import { identity } from "effect/Function"
@@ -374,8 +373,12 @@ export class OpenAiSocket extends ServiceMap.Service<OpenAiSocket, {
374373
const makeSocket = Effect.gen(function*() {
375374
const client = yield* OpenAiClient
376375
const tracker = yield* ResponseIdTracker.make
377-
const request = yield* Effect.orDie(client.client.httpClient.preprocess(HttpClientRequest.post("/responses")))
376+
const socketScope = yield* Effect.scope
377+
const makeRequest = Effect.orDie(client.client.httpClient.preprocess(HttpClientRequest.post("/responses")))
378+
const makeWebSocket = yield* Socket.WebSocketConstructor
379+
378380
const decoder = new TextDecoder()
381+
379382
const queueRef: RcRef.RcRef<
380383
{
381384
readonly send: (message: typeof Generated.CreateResponse.Encoded) => Effect.Effect<void, AiError.AiError>
@@ -384,18 +387,22 @@ const makeSocket = Effect.gen(function*() {
384387
> = yield* RcRef.make({
385388
idleTimeToLive: 60_000,
386389
acquire: Effect.gen(function*() {
387-
const request = yield* Effect.orDie(client.client.httpClient.preprocess(HttpClientRequest.post("/responses")))
388-
const socket = yield* Socket.makeWebSocket(request.url.replace(/^http/, "ws"), {
389-
closeCodeIsError: Function.constTrue
390-
}).pipe(
391-
Effect.updateService(Socket.WebSocketConstructor, (f) => (url) =>
392-
f(url, {
390+
const scope = yield* Effect.scope
391+
const request = yield* makeRequest
392+
const socket = yield* Socket.makeWebSocket(request.url.replace(/^http/, "ws")).pipe(
393+
Effect.provideService(Socket.WebSocketConstructor, (url) =>
394+
makeWebSocket(url, {
393395
headers: request.headers
394396
} as any))
395397
)
396398
const write = yield* socket.writer
397399

398-
let incoming = yield* Queue.unbounded<ResponseStreamEvent, AiError.AiError>()
400+
yield* Scope.addFinalizerExit(scope, () => {
401+
tracker.clearUnsafe()
402+
return Effect.void
403+
})
404+
405+
const incoming = yield* Queue.unbounded<ResponseStreamEvent, AiError.AiError>()
399406
const send = (message: typeof Generated.CreateResponse.Encoded) =>
400407
write(JSON.stringify({
401408
type: "response.create",
@@ -421,13 +428,9 @@ const makeSocket = Effect.gen(function*() {
421428
)
422429

423430
yield* socket.runRaw((msg) => {
424-
if (!incoming) return
425431
const text = typeof msg === "string" ? msg : decoder.decode(msg)
426432
try {
427433
const event = decodeEvent(text)
428-
if (event.type === "error") {
429-
tracker.clearUnsafe()
430-
}
431434
if (event.type === "error" && "status" in event) {
432435
const json = JSON.stringify(event.error)
433436
return Effect.fail(
@@ -455,42 +458,44 @@ const makeSocket = Effect.gen(function*() {
455458
Queue.offerUnsafe(incoming, event)
456459
} catch {}
457460
}).pipe(
458-
Effect.catchCause((cause) =>
459-
Queue.fail(
460-
incoming,
461-
AiError.make({
462-
module: "OpenAiClient",
463-
method: "createResponseStream",
464-
reason: new AiError.NetworkError({
465-
reason: "TransportError",
466-
request: {
467-
method: "POST",
468-
url: request.url,
469-
urlParams: [],
470-
hash: undefined,
471-
headers: request.headers
472-
},
473-
description: Cause.pretty(cause)
474-
})
461+
Effect.catchTag("SocketError", (error) =>
462+
AiError.make({
463+
module: "OpenAiClient",
464+
method: "createResponseStream",
465+
reason: new AiError.NetworkError({
466+
reason: "TransportError",
467+
request: {
468+
method: "POST",
469+
url: request.url,
470+
urlParams: [],
471+
hash: undefined,
472+
headers: request.headers
473+
},
474+
description: error.message
475475
})
476-
)
477-
),
478-
Effect.ensuring(RcRef.invalidate(queueRef)),
476+
}).asEffect()),
477+
Effect.catchCause((cause) => Queue.failCause(incoming, cause)),
478+
Effect.ensuring(Effect.forkIn(RcRef.invalidate(queueRef), socketScope, {
479+
startImmediately: true
480+
})),
479481
Effect.forkScoped({ startImmediately: true })
480482
)
481483

482484
return { send, incoming } as const
483485
})
484486
})
485487

486-
yield* Effect.forkScoped(RcRef.get(queueRef))
488+
// Prime the websocket
489+
yield* Effect.scoped(RcRef.get(queueRef))
487490

488491
// Websocket mode only allows one request at a time
489492
const semaphore = Semaphore.makeUnsafe(1)
493+
const request = yield* makeRequest
490494

491495
return OpenAiSocket.serviceMap({
492496
createResponseStream(options) {
493-
const stream = Effect.gen(function*() {
497+
const stream = Stream.unwrap(Effect.gen(function*() {
498+
const scope = yield* Effect.scope
494499
yield* Effect.acquireRelease(
495500
semaphore.take(1),
496501
() => semaphore.release(1),
@@ -499,24 +504,22 @@ const makeSocket = Effect.gen(function*() {
499504
const { send, incoming } = yield* RcRef.get(queueRef)
500505
let done = false
501506

502-
yield* Effect.acquireRelease(
503-
send(options),
504-
() => {
505-
if (done) return Effect.void
506-
tracker.clearUnsafe()
507-
return RcRef.invalidate(queueRef)
508-
},
509-
{ interruptible: true }
510-
).pipe(
507+
yield* Scope.addFinalizerExit(
508+
scope,
509+
() => done ? Effect.void : RcRef.invalidate(queueRef)
510+
)
511+
512+
yield* send(options).pipe(
511513
Effect.forkScoped({ startImmediately: true })
512514
)
515+
513516
return Stream.fromQueue(incoming).pipe(
514517
Stream.takeUntil((e) => {
515518
done = e.type === "response.completed" || e.type === "response.incomplete"
516519
return done
517520
})
518521
)
519-
}).pipe(Stream.unwrap)
522+
}))
520523

521524
return Effect.succeed([
522525
HttpClientResponse.fromWeb(request, new Response()),

0 commit comments

Comments
 (0)