Skip to content

Commit aef3473

Browse files
stainless-botStainless Bot
authored andcommitted
feat(client): support error property
chore: unknown commit message
1 parent 734939c commit aef3473

File tree

5 files changed

+180
-107
lines changed

5 files changed

+180
-107
lines changed

openai-java-core/src/main/kotlin/com/openai/core/handlers/SseHandler.kt

Lines changed: 107 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -3,133 +3,149 @@
33
package com.openai.core.handlers
44

55
import com.fasterxml.jackson.databind.json.JsonMapper
6-
import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
6+
import com.openai.core.JsonValue
77
import com.openai.core.http.HttpResponse
88
import com.openai.core.http.HttpResponse.Handler
99
import com.openai.core.http.SseMessage
1010
import com.openai.core.http.StreamResponse
1111
import com.openai.errors.OpenAIException
1212
import java.util.stream.Stream
13+
import kotlin.jvm.optionals.getOrNull
1314
import kotlin.streams.asStream
1415

15-
@JvmSynthetic internal fun sseHandler(): Handler<StreamResponse<SseMessage>> = SseHandlerInternal
16-
17-
private object SseHandlerInternal : Handler<StreamResponse<SseMessage>> {
18-
19-
override fun handle(response: HttpResponse): StreamResponse<SseMessage> {
20-
val sequence = sequence {
21-
response.body().bufferedReader().buffered().useLines { lines ->
22-
val state = SseState()
23-
var done = false
24-
for (line in lines) {
25-
// Stop emitting messages, but iterate through the full stream.
26-
if (done) {
27-
continue
28-
}
29-
val message = state.decode(line) ?: continue
30-
31-
if (message.data.startsWith("[DONE]")) {
32-
// In this case we don't break because we still want to iterate through the
33-
// full stream.
34-
done = true
35-
continue
36-
}
37-
38-
if (message.event == null) {
39-
yield(message)
16+
@JvmSynthetic
17+
internal fun sseHandler(jsonMapper: JsonMapper): Handler<StreamResponse<SseMessage>> =
18+
object : Handler<StreamResponse<SseMessage>> {
19+
20+
override fun handle(response: HttpResponse): StreamResponse<SseMessage> {
21+
val sequence = sequence {
22+
response.body().bufferedReader().buffered().useLines { lines ->
23+
val state = SseState(jsonMapper)
24+
var done = false
25+
for (line in lines) {
26+
// Stop emitting messages, but iterate through the full stream.
27+
if (done) {
28+
continue
29+
}
30+
val message = state.decode(line) ?: continue
31+
32+
if (message.data.startsWith("[DONE]")) {
33+
// In this case we don't break because we still want to iterate through
34+
// the full stream.
35+
done = true
36+
continue
37+
}
38+
39+
if (message.event == null) {
40+
val error =
41+
message.json<JsonValue>().asObject().getOrNull()?.get("error")
42+
if (error != null) {
43+
val errorMessage =
44+
error.asString().getOrNull()
45+
?: error
46+
.asObject()
47+
.getOrNull()
48+
?.get("message")
49+
?.asString()
50+
?.getOrNull()
51+
?: "An error occurred during streaming"
52+
throw OpenAIException(errorMessage)
53+
}
54+
yield(message)
55+
}
4056
}
4157
}
4258
}
43-
}
4459

45-
return object : StreamResponse<SseMessage> {
46-
override fun stream(): Stream<SseMessage> = sequence.asStream()
60+
return object : StreamResponse<SseMessage> {
61+
override fun stream(): Stream<SseMessage> = sequence.asStream()
4762

48-
override fun close() = response.close()
63+
override fun close() = response.close()
64+
}
4965
}
5066
}
5167

52-
private class SseState(
53-
var event: String? = null,
54-
val data: MutableList<String> = mutableListOf(),
55-
var lastId: String? = null,
56-
var retry: Int? = null
57-
) {
58-
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
59-
fun decode(line: String): SseMessage? {
60-
if (line.isEmpty()) {
61-
return flush()
62-
}
68+
private class SseState(
69+
val jsonMapper: JsonMapper,
70+
var event: String? = null,
71+
val data: MutableList<String> = mutableListOf(),
72+
var lastId: String? = null,
73+
var retry: Int? = null
74+
) {
75+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
76+
fun decode(line: String): SseMessage? {
77+
if (line.isEmpty()) {
78+
return flush()
79+
}
6380

64-
if (line.startsWith(':')) {
65-
return null
66-
}
81+
if (line.startsWith(':')) {
82+
return null
83+
}
6784

68-
val fieldName: String
69-
var value: String
85+
val fieldName: String
86+
var value: String
7087

71-
val colonIndex = line.indexOf(':')
72-
if (colonIndex == -1) {
73-
fieldName = line
74-
value = ""
75-
} else {
76-
fieldName = line.substring(0, colonIndex)
77-
value = line.substring(colonIndex + 1)
78-
}
88+
val colonIndex = line.indexOf(':')
89+
if (colonIndex == -1) {
90+
fieldName = line
91+
value = ""
92+
} else {
93+
fieldName = line.substring(0, colonIndex)
94+
value = line.substring(colonIndex + 1)
95+
}
7996

80-
if (value.startsWith(' ')) {
81-
value = value.substring(1)
82-
}
97+
if (value.startsWith(' ')) {
98+
value = value.substring(1)
99+
}
83100

84-
when (fieldName) {
85-
"event" -> event = value
86-
"data" -> data.add(value)
87-
"id" -> {
88-
if (!value.contains('\u0000')) {
89-
lastId = value
90-
}
101+
when (fieldName) {
102+
"event" -> event = value
103+
"data" -> data.add(value)
104+
"id" -> {
105+
if (!value.contains('\u0000')) {
106+
lastId = value
91107
}
92-
"retry" -> value.toIntOrNull()?.let { retry = it }
93108
}
94-
95-
return null
109+
"retry" -> value.toIntOrNull()?.let { retry = it }
96110
}
97111

98-
private fun flush(): SseMessage? {
99-
if (isEmpty()) {
100-
return null
101-
}
102-
103-
val message =
104-
SseMessage.builder()
105-
.event(event)
106-
.data(data.joinToString("\n"))
107-
.id(lastId)
108-
.retry(retry)
109-
.build()
110-
111-
// NOTE: Per the SSE spec, do not reset lastId.
112-
event = null
113-
data.clear()
114-
retry = null
112+
return null
113+
}
115114

116-
return message
115+
private fun flush(): SseMessage? {
116+
if (isEmpty()) {
117+
return null
117118
}
118119

119-
private fun isEmpty(): Boolean =
120-
event.isNullOrEmpty() && data.isEmpty() && lastId.isNullOrEmpty() && retry == null
120+
val message =
121+
SseMessage.builder()
122+
.jsonMapper(jsonMapper)
123+
.event(event)
124+
.data(data.joinToString("\n"))
125+
.id(lastId)
126+
.retry(retry)
127+
.build()
128+
129+
// NOTE: Per the SSE spec, do not reset lastId.
130+
event = null
131+
data.clear()
132+
retry = null
133+
134+
return message
121135
}
136+
137+
private fun isEmpty(): Boolean =
138+
event.isNullOrEmpty() && data.isEmpty() && lastId.isNullOrEmpty() && retry == null
122139
}
123140

124141
@JvmSynthetic
125-
internal inline fun <reified T> Handler<StreamResponse<SseMessage>>.mapJson(
126-
jsonMapper: JsonMapper
127-
): Handler<StreamResponse<T>> =
142+
internal inline fun <reified T> Handler<StreamResponse<SseMessage>>.mapJson():
143+
Handler<StreamResponse<T>> =
128144
object : Handler<StreamResponse<T>> {
129145
override fun handle(response: HttpResponse): StreamResponse<T> =
130146
this@mapJson.handle(response).map {
131147
try {
132-
jsonMapper.readValue(it.data, jacksonTypeRef())
148+
it.json<T>()
133149
} catch (e: Exception) {
134150
throw OpenAIException("Error reading response", e)
135151
}

openai-java-core/src/main/kotlin/com/openai/core/http/SseMessage.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.openai.core.http
22

3+
import com.fasterxml.jackson.databind.json.JsonMapper
4+
import com.fasterxml.jackson.module.kotlin.jacksonTypeRef
5+
import com.openai.errors.OpenAIException
36
import java.util.Objects
47

58
internal class SseMessage
69
private constructor(
10+
val jsonMapper: JsonMapper,
711
val event: String?,
812
val data: String,
913
val id: String?,
@@ -16,11 +20,14 @@ private constructor(
1620

1721
class Builder {
1822

23+
private var jsonMapper: JsonMapper? = null
1924
private var event: String? = null
2025
private var data: String = ""
2126
private var id: String? = null
2227
private var retry: Int? = null
2328

29+
fun jsonMapper(jsonMapper: JsonMapper) = apply { this.jsonMapper = jsonMapper }
30+
2431
fun event(event: String?) = apply { this.event = event }
2532

2633
fun data(data: String) = apply { this.data = data }
@@ -29,7 +36,17 @@ private constructor(
2936

3037
fun retry(retry: Int?) = apply { this.retry = retry }
3138

32-
fun build(): SseMessage = SseMessage(event, data, id, retry)
39+
fun build(): SseMessage = SseMessage(jsonMapper!!, event, data, id, retry)
40+
}
41+
42+
inline fun <reified T> json(): T = jsonMapper.readerFor(jacksonTypeRef<T>()).readValue(jsonNode)
43+
44+
private val jsonNode by lazy {
45+
try {
46+
jsonMapper.readTree(data)
47+
} catch (e: Exception) {
48+
throw OpenAIException("Error deserializing json", e)
49+
}
3350
}
3451

3552
override fun equals(other: Any?): Boolean {

openai-java-core/src/main/kotlin/com/openai/services/blocking/CompletionServiceImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ constructor(
5757
}
5858

5959
private val createStreamingHandler: Handler<StreamResponse<Completion>> =
60-
sseHandler().mapJson<Completion>(clientOptions.jsonMapper).withErrorHandler(errorHandler)
60+
sseHandler(clientOptions.jsonMapper).mapJson<Completion>().withErrorHandler(errorHandler)
6161

6262
/** Creates a completion for the provided prompt and parameters. */
6363
override fun createStreaming(

openai-java-core/src/main/kotlin/com/openai/services/blocking/chat/CompletionServiceImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ constructor(
6363
}
6464

6565
private val createStreamingHandler: Handler<StreamResponse<ChatCompletionChunk>> =
66-
sseHandler()
67-
.mapJson<ChatCompletionChunk>(clientOptions.jsonMapper)
66+
sseHandler(clientOptions.jsonMapper)
67+
.mapJson<ChatCompletionChunk>()
6868
.withErrorHandler(errorHandler)
6969

7070
/**

0 commit comments

Comments
 (0)