Skip to content

Commit 4fc1df0

Browse files
committed
Fix
1 parent 5c2cd99 commit 4fc1df0

File tree

4 files changed

+46
-33
lines changed

4 files changed

+46
-33
lines changed

server/src/main/java/org/elasticsearch/ingest/ESONEntry.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ public abstract class ESONEntry {
3434
private final byte type;
3535
private final String key;
3636
private final ESONSource.Value value;
37-
private int offsetOrCount = -1;
37+
private int offsetOrCount;
3838

39-
ESONEntry(byte type, String key) {
40-
this(type, key, -1, null);
39+
ESONEntry(byte type, String key, int offsetOrCount) {
40+
this(type, key, offsetOrCount, null);
4141
}
4242

4343
ESONEntry(byte type, String key, int offsetOrCount, ESONSource.Value value) {
44+
assert value == null || value.offset() == offsetOrCount;
4445
this.type = type;
4546
this.key = key;
4647
this.offsetOrCount = offsetOrCount;
@@ -72,7 +73,11 @@ public static class ObjectEntry extends ESONEntry {
7273
public Map<String, ESONSource.Value> mutationMap = null;
7374

7475
public ObjectEntry(String key) {
75-
super(TYPE_OBJECT, key);
76+
this(key, -1);
77+
}
78+
79+
public ObjectEntry(String key, int fieldCount) {
80+
super(TYPE_OBJECT, key, fieldCount);
7681
}
7782

7883
public boolean hasMutations() {
@@ -90,7 +95,11 @@ public static class ArrayEntry extends ESONEntry {
9095
public List<ESONSource.Value> mutationArray = null;
9196

9297
public ArrayEntry(String key) {
93-
super(TYPE_ARRAY, key);
98+
this(key, -1);
99+
}
100+
101+
public ArrayEntry(String key, int elementCount) {
102+
super(TYPE_ARRAY, key, elementCount);
94103
}
95104

96105
public boolean hasMutations() {
@@ -106,7 +115,7 @@ public String toString() {
106115
public static class FieldEntry extends ESONEntry {
107116

108117
public FieldEntry(String key, ESONSource.Value value) {
109-
super(value.type(), key, -1, value);
118+
super(value.type(), key, value.offset(), value);
110119
}
111120

112121
public FieldEntry(String key, byte type, int offset) {

server/src/main/java/org/elasticsearch/ingest/ESONFlat.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
package org.elasticsearch.ingest;
1111

1212
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.common.bytes.BytesArray;
1314
import org.elasticsearch.common.bytes.BytesReference;
1415
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
1718
import org.elasticsearch.common.recycler.Recycler;
1819

19-
import java.io.ByteArrayOutputStream;
2020
import java.io.IOException;
2121
import java.io.UncheckedIOException;
2222
import java.nio.charset.StandardCharsets;
@@ -54,8 +54,8 @@ private static List<ESONEntry> readKeys(StreamInput in) throws IOException {
5454
byte type = in.readByte();
5555
int offsetOrCount = in.readInt();
5656
ESONEntry entry = switch (type) {
57-
case ESONEntry.TYPE_OBJECT -> new ESONEntry.ObjectEntry(key);
58-
case ESONEntry.TYPE_ARRAY -> new ESONEntry.ArrayEntry(key);
57+
case ESONEntry.TYPE_OBJECT -> new ESONEntry.ObjectEntry(key, offsetOrCount);
58+
case ESONEntry.TYPE_ARRAY -> new ESONEntry.ArrayEntry(key, offsetOrCount);
5959
default -> new ESONEntry.FieldEntry(key, type, offsetOrCount);
6060
};
6161
entry.offsetOrCount(offsetOrCount);
@@ -82,29 +82,33 @@ public BytesReference getSerializedKeyBytes() {
8282
streamOutput.writeInt(entry.offsetOrCount());
8383
}
8484
BytesReference bytes = streamOutput.bytes();
85-
ByteArrayOutputStream os = new ByteArrayOutputStream(bytes.length());
86-
bytes.writeTo(os);
87-
serializedKeyBytes.set(bytes);
85+
final BytesRef bytesRef;
86+
if (bytes.hasArray()) {
87+
bytesRef = BytesRef.deepCopyOf(bytes.toBytesRef());
88+
} else {
89+
bytesRef = bytes.toBytesRef();
90+
}
91+
serializedKeyBytes.set(new BytesArray(bytesRef));
8892
} catch (IOException e) {
8993
throw new UncheckedIOException(e);
9094
}
9195
}
9296
return serializedKeyBytes.get();
9397
}
9498

95-
private static final ThreadLocal<BytesRef> BYTES_REF = ThreadLocal.withInitial(() -> new BytesRef(new byte[16384]));
99+
private static final ThreadLocal<byte[]> BYTES_REF = ThreadLocal.withInitial(() -> new byte[16384]);
96100

97101
public static Recycler<BytesRef> getBytesRefRecycler() {
98102
return new Recycler<>() {
99103

100-
private boolean first = true;
104+
private boolean first = false;
101105

102106
@Override
103107
public V<BytesRef> obtain() {
104108
final BytesRef bytesRef;
105109
if (first) {
106110
first = false;
107-
bytesRef = BYTES_REF.get();
111+
bytesRef = new BytesRef(BYTES_REF.get());
108112
} else {
109113
bytesRef = new BytesRef(new byte[16384]);
110114
}

server/src/main/java/org/elasticsearch/ingest/ESONIndexed.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -646,17 +646,13 @@ private static void handleObject(List<ESONEntry> flatKeyArray, Object object, St
646646
throws IOException {
647647
Object obj = unwrapObject(object);
648648
if (obj instanceof Map<?, ?> map) {
649-
ESONEntry.ObjectEntry objectEntry = new ESONEntry.ObjectEntry(key);
650-
flatKeyArray.add(objectEntry);
651-
objectEntry.offsetOrCount(map.size());
649+
flatKeyArray.add(new ESONEntry.ObjectEntry(key, map.size()));
652650
for (Map.Entry<?, ?> entry1 : map.entrySet()) {
653651
Object value = entry1.getValue();
654652
handleObject(flatKeyArray, value, entry1.getKey().toString(), newOffset, newValuesOut);
655653
}
656654
} else if (obj instanceof List<?> list) {
657-
ESONEntry.ArrayEntry arrayEntry = new ESONEntry.ArrayEntry(key);
658-
flatKeyArray.add(arrayEntry);
659-
arrayEntry.offsetOrCount(list.size());
655+
flatKeyArray.add(new ESONEntry.ArrayEntry(key, list.size()));
660656
for (Object value : list) {
661657
handleObject(flatKeyArray, value, null, newOffset, newValuesOut);
662658
}

server/src/main/java/org/elasticsearch/ingest/ESONSource.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.xcontent.XContentParser;
1919
import org.elasticsearch.xcontent.XContentString;
2020

21-
import java.io.ByteArrayOutputStream;
2221
import java.io.IOException;
2322
import java.math.BigDecimal;
2423
import java.math.BigInteger;
@@ -53,9 +52,14 @@ public ESONIndexed.ESONObject parse(XContentParser parser) throws IOException {
5352

5453
try (RecyclerBytesStreamOutput bytes = new RecyclerBytesStreamOutput(refRecycler)) {
5554
parseObject(parser, bytes, keyArray, null);
56-
ByteArrayOutputStream bao = new ByteArrayOutputStream(bytes.size());
57-
bytes.bytes().writeTo(bao);
58-
return new ESONIndexed.ESONObject(0, new ESONFlat(keyArray, new Values(new BytesArray(bao.toByteArray()))));
55+
BytesReference bytesReference = bytes.bytes();
56+
final BytesRef bytesRef;
57+
if (bytesReference.hasArray()) {
58+
bytesRef = BytesRef.deepCopyOf(bytesReference.toBytesRef());
59+
} else {
60+
bytesRef = bytesReference.toBytesRef();
61+
}
62+
return new ESONIndexed.ESONObject(0, new ESONFlat(keyArray, new Values(new BytesArray(bytesRef))));
5963
}
6064

6165
}
@@ -119,40 +123,40 @@ private static ESONEntry.FieldEntry parseSimpleValue(
119123
RecyclerBytesStreamOutput bytes,
120124
XContentParser.Token token
121125
) throws IOException {
122-
long position = bytes.position();
126+
int position = Math.toIntExact(bytes.position());
123127

124128
return switch (token) {
125129
case VALUE_STRING -> {
126130
XContentString.UTF8Bytes stringBytes = parser.optimizedText().bytes();
127131
bytes.writeVInt(stringBytes.length());
128132
bytes.writeBytes(stringBytes.bytes(), stringBytes.offset(), stringBytes.length());
129-
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.STRING, (int) position);
133+
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.STRING, position);
130134
}
131135
case VALUE_NUMBER -> {
132136
XContentParser.NumberType numberType = parser.numberType();
133137
yield switch (numberType) {
134138
case INT -> {
135139
bytes.writeInt(parser.intValue());
136-
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_INT, (int) position);
140+
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_INT, position);
137141
}
138142
case LONG -> {
139143
bytes.writeLong(parser.longValue());
140-
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_LONG, (int) position);
144+
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_LONG, position);
141145
}
142146
case FLOAT -> {
143147
bytes.writeFloat(parser.floatValue());
144-
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_FLOAT, (int) position);
148+
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_FLOAT, position);
145149
}
146150
case DOUBLE -> {
147151
bytes.writeDouble(parser.doubleValue());
148-
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_DOUBLE, (int) position);
152+
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.TYPE_DOUBLE, position);
149153
}
150154
case BIG_INTEGER, BIG_DECIMAL -> {
151155
byte type = numberType == XContentParser.NumberType.BIG_INTEGER ? ESONEntry.BIG_INTEGER : ESONEntry.BIG_DECIMAL;
152156
byte[] numberBytes = parser.text().getBytes(StandardCharsets.UTF_8);
153157
bytes.writeVInt(numberBytes.length);
154158
bytes.writeBytes(numberBytes, 0, numberBytes.length);
155-
yield new ESONEntry.FieldEntry(fieldName, type, (int) position);
159+
yield new ESONEntry.FieldEntry(fieldName, type, position);
156160
}
157161
};
158162
}
@@ -164,7 +168,7 @@ yield switch (numberType) {
164168
byte[] binaryValue = parser.binaryValue();
165169
bytes.writeVInt(binaryValue.length);
166170
bytes.write(binaryValue);
167-
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.BINARY, (int) position);
171+
yield new ESONEntry.FieldEntry(fieldName, ESONEntry.BINARY, position);
168172
}
169173
default -> throw new IllegalArgumentException("Unexpected token: " + token);
170174
};

0 commit comments

Comments
 (0)