Skip to content

Commit 8f22f56

Browse files
Refactoring some duplication
1 parent 10a5b12 commit 8f22f56

File tree

3 files changed

+39
-32
lines changed

3 files changed

+39
-32
lines changed

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/DelegatingProcessor.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
12+
import org.elasticsearch.xcontent.XContentParserConfiguration;
13+
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
14+
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
15+
16+
import java.io.IOException;
17+
import java.util.ArrayDeque;
18+
import java.util.Deque;
19+
import java.util.Iterator;
1320
import java.util.concurrent.Flow;
1421
import java.util.concurrent.atomic.AtomicBoolean;
1522
import java.util.concurrent.atomic.AtomicLong;
@@ -25,6 +32,33 @@ public abstract class DelegatingProcessor<T, R> implements Flow.Processor<T, R>
2532
private Flow.Subscriber<? super R> downstream;
2633
private Flow.Subscription upstream;
2734

35+
public static <ParsedChunk> Deque<ParsedChunk> parseEvent(
36+
Deque<ServerSentEvent> item,
37+
ParseChunkFunction<ParsedChunk> parseFunction,
38+
XContentParserConfiguration parserConfig,
39+
Logger logger
40+
) throws Exception {
41+
var results = new ArrayDeque<ParsedChunk>(item.size());
42+
for (ServerSentEvent event : item) {
43+
if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
44+
try {
45+
var delta = parseFunction.apply(parserConfig, event);
46+
delta.forEachRemaining(results::offer);
47+
} catch (Exception e) {
48+
logger.warn("Failed to parse event from inference provider: {}", event);
49+
throw e;
50+
}
51+
}
52+
}
53+
54+
return results;
55+
}
56+
57+
@FunctionalInterface
58+
public interface ParseChunkFunction<ParsedChunk> {
59+
Iterator<ParsedChunk> apply(XContentParserConfiguration parserConfig, ServerSentEvent event) throws IOException;
60+
}
61+
2862
@Override
2963
public void subscribe(Flow.Subscriber<? super R> subscriber) {
3064
if (downstream != null) {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/openai/OpenAiStreamingProcessor.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
import org.elasticsearch.xpack.core.inference.results.StreamingChatCompletionResults;
1919
import org.elasticsearch.xpack.inference.common.DelegatingProcessor;
2020
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
21-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
2221

2322
import java.io.IOException;
24-
import java.util.ArrayDeque;
2523
import java.util.Collections;
2624
import java.util.Deque;
2725
import java.util.Iterator;
@@ -115,19 +113,7 @@ public class OpenAiStreamingProcessor extends DelegatingProcessor<Deque<ServerSe
115113
@Override
116114
protected void next(Deque<ServerSentEvent> item) throws Exception {
117115
var parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE);
118-
119-
var results = new ArrayDeque<StreamingChatCompletionResults.Result>(item.size());
120-
for (ServerSentEvent event : item) {
121-
if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
122-
try {
123-
var delta = parse(parserConfig, event);
124-
delta.forEachRemaining(results::offer);
125-
} catch (Exception e) {
126-
log.warn("Failed to parse event from inference provider: {}", event);
127-
throw e;
128-
}
129-
}
130-
}
116+
var results = parseEvent(item, OpenAiStreamingProcessor::parse, parserConfig, log);
131117

132118
if (results.isEmpty()) {
133119
upstream().request(1);
@@ -136,7 +122,7 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
136122
}
137123
}
138124

139-
private Iterator<StreamingChatCompletionResults.Result> parse(XContentParserConfiguration parserConfig, ServerSentEvent event)
125+
private static Iterator<StreamingChatCompletionResults.Result> parse(XContentParserConfiguration parserConfig, ServerSentEvent event)
140126
throws IOException {
141127
if (DONE_MESSAGE.equalsIgnoreCase(event.value())) {
142128
return Collections.emptyIterator();

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/openai/OpenAiUnifiedStreamingProcessor.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.xpack.core.inference.results.StreamingUnifiedChatCompletionResults;
2121
import org.elasticsearch.xpack.inference.common.DelegatingProcessor;
2222
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEvent;
23-
import org.elasticsearch.xpack.inference.external.response.streaming.ServerSentEventField;
2423

2524
import java.io.IOException;
2625
import java.util.ArrayDeque;
@@ -72,19 +71,7 @@ protected void onRequest(long n) {
7271
@Override
7372
protected void next(Deque<ServerSentEvent> item) throws Exception {
7473
var parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE);
75-
76-
var results = new ArrayDeque<StreamingUnifiedChatCompletionResults.ChatCompletionChunk>(item.size());
77-
for (ServerSentEvent event : item) {
78-
if (ServerSentEventField.DATA == event.name() && event.hasValue()) {
79-
try {
80-
var delta = parse(parserConfig, event);
81-
delta.forEachRemaining(results::offer);
82-
} catch (Exception e) {
83-
logger.warn("Failed to parse event from inference provider: {}", event);
84-
throw e;
85-
}
86-
}
87-
}
74+
var results = parseEvent(item, OpenAiUnifiedStreamingProcessor::parse, parserConfig, logger);
8875

8976
if (results.isEmpty()) {
9077
upstream().request(1);
@@ -101,7 +88,7 @@ protected void next(Deque<ServerSentEvent> item) throws Exception {
10188
}
10289
}
10390

104-
private Iterator<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> parse(
91+
private static Iterator<StreamingUnifiedChatCompletionResults.ChatCompletionChunk> parse(
10592
XContentParserConfiguration parserConfig,
10693
ServerSentEvent event
10794
) throws IOException {

0 commit comments

Comments
 (0)