Skip to content

Commit 273f4c6

Browse files
authored
handle openai ws error events (#1760)
1 parent e7efc87 commit 273f4c6

File tree

4 files changed

+62
-14
lines changed

4 files changed

+62
-14
lines changed

.changeset/ready-olives-divide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/ai-openai": patch
3+
---
4+
5+
handle openai ws error events

packages/ai/openai/src/OpenAiClient.ts

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import * as Array from "effect/Array"
1010
import * as Cause from "effect/Cause"
1111
import type * as Config from "effect/Config"
1212
import * as Effect from "effect/Effect"
13+
import * as Exit from "effect/Exit"
1314
import { identity } from "effect/Function"
1415
import * as Function from "effect/Function"
1516
import * as Layer from "effect/Layer"
@@ -423,14 +424,30 @@ const makeSocket = Effect.gen(function*() {
423424
const cancel = Effect.suspend(() => write(JSON.stringify({ type: "response.cancel" }))).pipe(
424425
Effect.ignore
425426
)
427+
const reset = () => {
428+
currentQueue = null
429+
}
426430

427431
const decoder = new TextDecoder()
428-
const decode = Schema.decodeUnknownSync(Schema.fromJsonString(Generated.ResponseStreamEvent))
429432
yield* socket.runRaw((msg) => {
430433
if (!currentQueue) return
431434
const text = typeof msg === "string" ? msg : decoder.decode(msg)
432435
try {
433-
Queue.offerUnsafe(currentQueue, decode(text))
436+
const event = decodeEvent(text)
437+
if (event.type === "error" && "status" in event) {
438+
return Queue.fail(
439+
currentQueue,
440+
AiError.make({
441+
module: "OpenAiClient",
442+
method: "createResponseStream",
443+
reason: AiError.reasonFromHttpStatus({
444+
status: event.status,
445+
metadata: event.error
446+
})
447+
})
448+
)
449+
}
450+
Queue.offerUnsafe(currentQueue, event)
434451
} catch {}
435452
}).pipe(
436453
Effect.catchCause((cause) => {
@@ -456,17 +473,16 @@ const makeSocket = Effect.gen(function*() {
456473
) :
457474
Effect.void
458475
}),
459-
Effect.retry(
476+
Effect.repeat(
460477
Schedule.exponential(100, 1.5).pipe(
461478
Schedule.either(Schedule.spaced({ seconds: 5 })),
462479
Schedule.jittered
463480
)
464481
),
465-
Effect.orDie,
466482
Effect.forkScoped
467483
)
468484

469-
return { send, cancel } as const
485+
return { send, cancel, reset } as const
470486
})
471487
})
472488

@@ -478,15 +494,22 @@ const makeSocket = Effect.gen(function*() {
478494
const stream = Effect.gen(function*() {
479495
yield* Effect.acquireRelease(
480496
semaphore.take(1),
481-
() => semaphore.release(1)
497+
() => semaphore.release(1),
498+
{ interruptible: true }
482499
)
483-
const { send, cancel } = yield* RcRef.get(queueRef)
500+
const { send, cancel, reset } = yield* RcRef.get(queueRef)
484501
const incoming = yield* Queue.unbounded<ResponseStreamEvent, AiError.AiError | Cause.Done>()
485502
let done = false
486503

487504
yield* Effect.acquireRelease(
488505
send(incoming, options),
489-
() => done ? Effect.void : cancel
506+
(_, exit) => {
507+
reset()
508+
if (Exit.isFailure(exit) && !Exit.hasInterrupts(exit)) return Effect.void
509+
else if (done) return Effect.void
510+
return cancel
511+
},
512+
{ interruptible: true }
490513
).pipe(
491514
Effect.forkScoped({ startImmediately: true })
492515
)
@@ -508,6 +531,18 @@ const makeSocket = Effect.gen(function*() {
508531
)
509532
})
510533

534+
const ErrorEvent = Schema.Struct({
535+
type: Schema.Literal("error"),
536+
status: Schema.Number,
537+
error: Schema.Struct({
538+
type: Schema.String,
539+
message: Schema.String
540+
})
541+
})
542+
543+
const AllEvents = Schema.Union([Generated.ResponseStreamEvent, ErrorEvent])
544+
const decodeEvent = Schema.decodeUnknownSync(Schema.fromJsonString(AllEvents))
545+
511546
/**
512547
* Uses OpenAI's websocket mode for all responses within the provided effect.
513548
*

packages/effect/src/Effect.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6236,8 +6236,9 @@ export const scopedWith: <A, E, R>(
62366236
*/
62376237
export const acquireRelease: <A, E, R>(
62386238
acquire: Effect<A, E, R>,
6239-
release: (a: A, exit: Exit.Exit<unknown, unknown>) => Effect<unknown>
6240-
) => Effect<A, E, R | Scope> = internal.acquireRelease
6239+
release: (a: A, exit: Exit.Exit<unknown, unknown>) => Effect<unknown>,
6240+
options?: { readonly interruptible?: boolean }
6241+
) => Effect<A, E, Scope | R> = internal.acquireRelease
62416242

62426243
/**
62436244
* This function is used to ensure that an `Effect` value that represents the

packages/effect/src/internal/effect.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3794,11 +3794,18 @@ export const scopedWith = <A, E, R>(
37943794
/** @internal */
37953795
export const acquireRelease = <A, E, R>(
37963796
acquire: Effect.Effect<A, E, R>,
3797-
release: (a: A, exit: Exit.Exit<unknown, unknown>) => Effect.Effect<unknown>
3797+
release: (a: A, exit: Exit.Exit<unknown, unknown>) => Effect.Effect<unknown>,
3798+
options?: { readonly interruptible?: boolean }
37983799
): Effect.Effect<A, E, R | Scope.Scope> =>
3799-
uninterruptible(
3800-
flatMap(scope, (scope) =>
3801-
tap(acquire, (a) => scopeAddFinalizerExit(scope, (exit) => internalCall(() => release(a, exit)))))
3800+
uninterruptibleMask((restore) =>
3801+
flatMap(
3802+
scope,
3803+
(scope) =>
3804+
tap(
3805+
options?.interruptible ? restore(acquire) : acquire,
3806+
(a) => scopeAddFinalizerExit(scope, (exit) => release(a, exit))
3807+
)
3808+
)
38023809
)
38033810

38043811
/** @internal */

0 commit comments

Comments
 (0)