Skip to content

Commit cb028d3

Browse files
committed
Adding deque utils helper
1 parent 105e35e commit cb028d3

File tree

4 files changed

+113
-54
lines changed

4 files changed

+113
-54
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.Writeable;
12+
13+
import java.io.IOException;
14+
import java.util.ArrayDeque;
15+
import java.util.Deque;
16+
17+
public final class DequeUtils {
18+
19+
private DequeUtils() {
20+
// util functions only
21+
}
22+
23+
public static <T> Deque<T> readDeque(StreamInput in, Writeable.Reader<T> reader) throws IOException {
24+
return in.readCollection(ArrayDeque::new, ((stream, deque) -> deque.offer(reader.read(in))));
25+
}
26+
27+
public static boolean dequeEquals(Deque<?> thisDeque, Deque<?> otherDeque) {
28+
if (thisDeque.size() != otherDeque.size()) {
29+
return false;
30+
}
31+
var thisIter = thisDeque.iterator();
32+
var otherIter = otherDeque.iterator();
33+
while (thisIter.hasNext() && otherIter.hasNext()) {
34+
if (thisIter.next().equals(otherIter.next()) == false) {
35+
return false;
36+
}
37+
}
38+
return true;
39+
}
40+
41+
public static int dequeHashCode(Deque<?> deque) {
42+
if (deque == null) {
43+
return 0;
44+
}
45+
return deque.stream().reduce(1, (hashCode, chunk) -> 31 * hashCode + (chunk == null ? 0 : chunk.hashCode()), Integer::sum);
46+
}
47+
48+
public static <T> Deque<T> of(T elem) {
49+
var deque = new ArrayDeque<T>(1);
50+
deque.offer(elem);
51+
return deque;
52+
}
53+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/results/StreamingChatCompletionResults.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
import org.elasticsearch.xcontent.ToXContent;
1818

1919
import java.io.IOException;
20-
import java.util.ArrayDeque;
2120
import java.util.Deque;
2221
import java.util.Iterator;
2322
import java.util.concurrent.Flow;
2423

24+
import static org.elasticsearch.xpack.core.inference.DequeUtils.dequeEquals;
25+
import static org.elasticsearch.xpack.core.inference.DequeUtils.dequeHashCode;
26+
import static org.elasticsearch.xpack.core.inference.DequeUtils.readDeque;
2527
import static org.elasticsearch.xpack.core.inference.results.ChatCompletionResults.COMPLETION;
2628

2729
/**
@@ -40,11 +42,7 @@ public record Results(Deque<Result> results) implements InferenceServiceResults.
4042
public static final String NAME = "streaming_chat_completion_results";
4143

4244
public Results(StreamInput in) throws IOException {
43-
this(deque(in));
44-
}
45-
46-
private static Deque<Result> deque(StreamInput in) throws IOException {
47-
return in.readCollection(ArrayDeque::new, ((stream, deque) -> deque.offer(new Result(stream))));
45+
this(readDeque(in, Result::new));
4846
}
4947

5048
@Override
@@ -75,31 +73,10 @@ public boolean equals(Object o) {
7573
return dequeEquals(this.results, other.results());
7674
}
7775

78-
private static boolean dequeEquals(Deque<?> thisDeque, Deque<?> otherDeque) {
79-
if (thisDeque.size() != otherDeque.size()) {
80-
return false;
81-
}
82-
var thisIter = thisDeque.iterator();
83-
var otherIter = otherDeque.iterator();
84-
while (thisIter.hasNext() && otherIter.hasNext()) {
85-
if (thisIter.next().equals(otherIter.next()) == false) {
86-
return false;
87-
}
88-
}
89-
return true;
90-
}
91-
9276
@Override
9377
public int hashCode() {
9478
return dequeHashCode(results);
9579
}
96-
97-
private static int dequeHashCode(Deque<?> deque) {
98-
if (deque == null) {
99-
return 0;
100-
}
101-
return deque.stream().reduce(1, (hashCode, chunk) -> 31 * hashCode + (chunk == null ? 0 : chunk.hashCode()), Integer::sum);
102-
}
10380
}
10481

10582
public record Result(String delta) implements ChunkedToXContent, Writeable {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/results/StreamingUnifiedChatCompletionResults.java

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
import org.elasticsearch.xcontent.ToXContent;
1919

2020
import java.io.IOException;
21-
import java.util.ArrayDeque;
2221
import java.util.Collections;
2322
import java.util.Deque;
2423
import java.util.Iterator;
2524
import java.util.List;
2625
import java.util.concurrent.Flow;
2726

2827
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk;
28+
import static org.elasticsearch.xpack.core.inference.DequeUtils.dequeEquals;
29+
import static org.elasticsearch.xpack.core.inference.DequeUtils.dequeHashCode;
30+
import static org.elasticsearch.xpack.core.inference.DequeUtils.readDeque;
2931

3032
/**
3133
* Chat Completion results that only contain a Flow.Publisher.
@@ -69,11 +71,7 @@ public record Results(Deque<ChatCompletionChunk> chunks) implements InferenceSer
6971
public static String NAME = "streaming_unified_chat_completion_results";
7072

7173
public Results(StreamInput in) throws IOException {
72-
this(deque(in));
73-
}
74-
75-
private static Deque<ChatCompletionChunk> deque(StreamInput in) throws IOException {
76-
return in.readCollection(ArrayDeque::new, ((stream, deque) -> deque.offer(new ChatCompletionChunk(stream))));
74+
this(readDeque(in, ChatCompletionChunk::new));
7775
}
7876

7977
@Override
@@ -98,31 +96,10 @@ public boolean equals(Object o) {
9896
return dequeEquals(chunks, results.chunks());
9997
}
10098

101-
private static boolean dequeEquals(Deque<?> thisDeque, Deque<?> otherDeque) {
102-
if (thisDeque.size() != otherDeque.size()) {
103-
return false;
104-
}
105-
var thisIter = thisDeque.iterator();
106-
var otherIter = otherDeque.iterator();
107-
while (thisIter.hasNext() && otherIter.hasNext()) {
108-
if (thisIter.next().equals(otherIter.next()) == false) {
109-
return false;
110-
}
111-
}
112-
return true;
113-
}
114-
11599
@Override
116100
public int hashCode() {
117101
return dequeHashCode(chunks);
118102
}
119-
120-
private static int dequeHashCode(Deque<?> deque) {
121-
if (deque == null) {
122-
return 0;
123-
}
124-
return deque.stream().reduce(1, (hashCode, chunk) -> 31 * hashCode + (chunk == null ? 0 : chunk.hashCode()), Integer::sum);
125-
}
126103
}
127104

128105
public record ChatCompletionChunk(String id, List<Choice> choices, String model, String object, ChatCompletionChunk.Usage usage)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference;
9+
10+
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
11+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.test.ESTestCase;
14+
15+
import java.io.IOException;
16+
17+
import static org.mockito.Mockito.mock;
18+
19+
public class DequeUtilsTests extends ESTestCase {
20+
21+
public void testEqualsAndHashCodeWithSameObject() {
22+
var someObject = mock();
23+
var dequeOne = DequeUtils.of(someObject);
24+
var dequeTwo = DequeUtils.of(someObject);
25+
assertTrue(DequeUtils.dequeEquals(dequeOne, dequeTwo));
26+
assertEquals(DequeUtils.dequeHashCode(dequeOne), DequeUtils.dequeHashCode(dequeTwo));
27+
}
28+
29+
public void testEqualsAndHashCodeWithEqualsObject() {
30+
var dequeOne = DequeUtils.of("the same string");
31+
var dequeTwo = DequeUtils.of("the same string");
32+
assertTrue(DequeUtils.dequeEquals(dequeOne, dequeTwo));
33+
assertEquals(DequeUtils.dequeHashCode(dequeOne), DequeUtils.dequeHashCode(dequeTwo));
34+
}
35+
36+
public void testNotEqualsAndHashCode() {
37+
var dequeOne = DequeUtils.of(mock());
38+
var dequeTwo = DequeUtils.of(mock());
39+
assertFalse(DequeUtils.dequeEquals(dequeOne, dequeTwo));
40+
assertNotEquals(DequeUtils.dequeHashCode(dequeOne), DequeUtils.dequeHashCode(dequeTwo));
41+
}
42+
43+
public void testReadFromStream() throws IOException {
44+
var dequeOne = DequeUtils.of("this is a string");
45+
var out = new BytesStreamOutput();
46+
out.writeStringCollection(dequeOne);
47+
var in = new ByteArrayStreamInput(out.bytes().array());
48+
var dequeTwo = DequeUtils.readDeque(in, StreamInput::readString);
49+
assertTrue(DequeUtils.dequeEquals(dequeOne, dequeTwo));
50+
assertEquals(DequeUtils.dequeHashCode(dequeOne), DequeUtils.dequeHashCode(dequeTwo));
51+
}
52+
}

0 commit comments

Comments
 (0)