Skip to content

Commit 5043373

Browse files
committed
Change
1 parent adfd67e commit 5043373

File tree

6 files changed

+106
-15
lines changed

6 files changed

+106
-15
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ static TransportVersion def(int id) {
359359
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
360360
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
361361
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
362+
public static final TransportVersion STRUCTURED_SOURCE = def(9_138_0_00);
362363

363364
/*
364365
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,10 @@ BulkRequest internalAdd(UpdateRequest request) {
190190

191191
requests.add(request);
192192
if (request.doc() != null) {
193-
sizeInBytes += request.doc().source().length();
193+
sizeInBytes += request.doc().sourceSize();
194194
}
195195
if (request.upsertRequest() != null) {
196-
sizeInBytes += request.upsertRequest().source().length();
196+
sizeInBytes += request.upsertRequest().sourceSize();
197197
}
198198
if (request.script() != null) {
199199
sizeInBytes += request.script().getIdOrCode().length() * 2;

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,12 @@ public long totalSizeInBytes() {
9191
DocWriteRequest<?> request = items[i].request();
9292
if (request instanceof IndexRequest) {
9393
if (((IndexRequest) request).source() != null) {
94-
totalSizeInBytes += ((IndexRequest) request).source().length();
94+
totalSizeInBytes += ((IndexRequest) request).sourceSize();
9595
}
9696
} else if (request instanceof UpdateRequest) {
9797
IndexRequest doc = ((UpdateRequest) request).doc();
9898
if (doc != null && doc.source() != null) {
99-
totalSizeInBytes += ((UpdateRequest) request).doc().source().length();
99+
totalSizeInBytes += ((UpdateRequest) request).doc().sourceSize();
100100
}
101101
}
102102
}
@@ -109,12 +109,12 @@ public long maxOperationSizeInBytes() {
109109
DocWriteRequest<?> request = items[i].request();
110110
if (request instanceof IndexRequest) {
111111
if (((IndexRequest) request).source() != null) {
112-
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).source().length());
112+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).sourceSize());
113113
}
114114
} else if (request instanceof UpdateRequest) {
115115
IndexRequest doc = ((UpdateRequest) request).doc();
116116
if (doc != null && doc.source() != null) {
117-
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().source().length());
117+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().sourceSize());
118118
}
119119
}
120120
}

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,15 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
174174
}
175175
id = in.readOptionalString();
176176
routing = in.readOptionalString();
177+
if (in.getTransportVersion().onOrAfter(TransportVersions.STRUCTURED_SOURCE)) {
178+
if (in.readBoolean()) {
179+
structuredSource = new ESONFlat(in);
180+
} else {
181+
source = in.readBytesReference();
182+
}
183+
} else {
184+
source = in.readBytesReference();
185+
}
177186
source = in.readBytesReference();
178187
opType = OpType.fromId(in.readByte());
179188
version = in.readLong();
@@ -424,6 +433,10 @@ public BytesReference source() {
424433
return source;
425434
}
426435

436+
public int sourceSize() {
437+
return useStructuredSource ? structuredSource.values().data().length() : source.length();
438+
}
439+
427440
public void setStructuredSource(ESONIndexed.ESONObject esonSource) {
428441
this.useStructuredSource = true;
429442
this.structuredSource = esonSource.esonFlat();
@@ -817,7 +830,17 @@ private void writeBody(StreamOutput out) throws IOException {
817830
}
818831
out.writeOptionalString(id);
819832
out.writeOptionalString(routing);
820-
out.writeBytesReference(source());
833+
if (out.getTransportVersion().onOrAfter(TransportVersions.STRUCTURED_SOURCE)) {
834+
out.writeBoolean(useStructuredSource);
835+
if (useStructuredSource) {
836+
out.writeBytesReference(structuredSource.getSerializedKeyBytes());
837+
out.writeBytesReference(structuredSource.values().data());
838+
} else {
839+
out.writeBytesReference(source());
840+
}
841+
} else {
842+
out.writeBytesReference(source());
843+
}
821844
out.writeByte(opType.getId());
822845
out.writeLong(version);
823846
out.writeByte(versionType.getValue());

server/src/main/java/org/elasticsearch/ingest/ESONEntry.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,21 @@ public FieldEntry(String key, ESONSource.Value value) {
101101
this.value = value;
102102
}
103103

104+
public FieldEntry(String key, byte type, int offset) {
105+
this(key, parseValue(type, offset));
106+
}
107+
108+
private static ESONSource.Value parseValue(byte type, int offset) {
109+
return switch (type) {
110+
case TYPE_NULL -> ESONSource.ConstantValue.NULL;
111+
case TYPE_FALSE -> ESONSource.ConstantValue.FALSE;
112+
case TYPE_TRUE -> ESONSource.ConstantValue.TRUE;
113+
case TYPE_INT, TYPE_DOUBLE, TYPE_FLOAT, TYPE_LONG -> new ESONSource.FixedValue(offset, type);
114+
case STRING, BINARY, BIG_INTEGER, BIG_DECIMAL -> new ESONSource.VariableValue(offset, type);
115+
default -> throw new IllegalArgumentException("Unknown type: " + type);
116+
};
117+
}
118+
104119
@Override
105120
public String toString() {
106121
return "FieldEntry{" + "value=" + value + '}';

server/src/main/java/org/elasticsearch/ingest/ESONFlat.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,74 @@
99

1010
package org.elasticsearch.ingest;
1111

12-
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.bytes.BytesReference;
13+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
14+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
15+
import org.elasticsearch.common.io.stream.StreamInput;
1316

1417
import java.io.IOException;
18+
import java.io.UncheckedIOException;
19+
import java.util.ArrayList;
1520
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicReference;
1622

17-
public record ESONFlat(List<ESONEntry> keys, ESONSource.Values values) {
23+
public record ESONFlat(List<ESONEntry> keys, ESONSource.Values values, AtomicReference<BytesReference> serializedKeyBytes) {
1824

19-
public void toBytes(StreamOutput output) throws IOException {
20-
for (ESONEntry entry : keys) {
21-
22-
}
23-
output.writeBytesReference(values.data());
25+
public ESONFlat(List<ESONEntry> keys, ESONSource.Values values) {
26+
this(keys, values, new AtomicReference<>());
27+
}
2428

29+
public ESONFlat(StreamInput in) throws IOException {
30+
this(readKeys(in), new ESONSource.Values(in.readBytesReference()), new AtomicReference<>());
2531
}
2632

27-
public void fromBytes() {
33+
private static List<ESONEntry> readKeys(StreamInput in) throws IOException {
34+
try (
35+
ReleasableBytesReference bytesReference = in.readReleasableBytesReference();
36+
StreamInput streamInput = bytesReference.streamInput()
37+
) {
38+
int expected = streamInput.readVInt();
39+
ArrayList<ESONEntry> keys = new ArrayList<>(expected);
40+
for (int i = 0; i < expected; ++i) {
41+
// TODO: Use UTF-8 byte length eventually
42+
String key = streamInput.readString();
43+
byte type = streamInput.readByte();
44+
int offsetOrCount = streamInput.readInt();
45+
ESONEntry entry = switch (type) {
46+
case ESONEntry.TYPE_OBJECT -> new ESONEntry.ObjectEntry(key);
47+
case ESONEntry.TYPE_ARRAY -> new ESONEntry.ArrayEntry(key);
48+
default -> new ESONEntry.FieldEntry(key, type, offsetOrCount);
49+
};
50+
entry.offsetOrCount(offsetOrCount);
51+
keys.add(entry);
52+
}
53+
return keys;
54+
}
55+
}
2856

57+
public BytesReference getSerializedKeyBytes() {
58+
if (serializedKeyBytes.get() == null) {
59+
int estimate = 0;
60+
for (ESONEntry entry : keys) {
61+
estimate += entry.key().length() + 5;
62+
}
63+
try (BytesStreamOutput streamOutput = new BytesStreamOutput((int) (estimate * 1.1))) {
64+
streamOutput.writeVInt(keys.size());
65+
for (ESONEntry entry : keys) {
66+
String key = entry.key();
67+
// byte[] bytes = key == null ? EMPTY_KET : key.getBytes(StandardCharsets.UTF_8);
68+
// streamOutput.writeVInt(bytes.length);
69+
// streamOutput.writeBytes(bytes);
70+
// TODO: Use UTF-8 byte length eventually
71+
streamOutput.writeString(key);
72+
streamOutput.writeByte(entry.type());
73+
streamOutput.writeInt(entry.offsetOrCount());
74+
}
75+
serializedKeyBytes.set(streamOutput.bytes());
76+
} catch (IOException e) {
77+
throw new UncheckedIOException(e);
78+
}
79+
}
80+
return serializedKeyBytes.get();
2981
}
3082
}

0 commit comments

Comments
 (0)