Skip to content

Commit 619bf9b

Browse files
DaveCTurnercbuescher
authored andcommitted
Add general read/write optional support (elastic#112276)
Today `StreamOutput#writeOptionalWriteable` allows to write a possibly-null value that implements `Writeable` and therefore carries its own serialization, but sometimes we want to write an optional value and provide a custom `Writer` too. This commit adds `StreamOutput#writeOptional` and a corresponding `StreamInput#readOptional` to support this.
1 parent 9be77ca commit 619bf9b

File tree

10 files changed

+77
-64
lines changed

10 files changed

+77
-64
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,11 @@ public void writeTo(StreamOutput out) throws IOException {
101101
out.writeOptionalWriteable(primaryResponse);
102102
}
103103

104-
public void writeThin(StreamOutput out) throws IOException {
105-
out.writeVInt(id);
106-
DocWriteRequest.writeDocumentRequestThin(out, request);
107-
out.writeOptionalWriteable(primaryResponse == null ? null : primaryResponse::writeThin);
108-
}
104+
public static final Writer<BulkItemRequest> THIN_WRITER = (out, item) -> {
105+
out.writeVInt(item.id);
106+
DocWriteRequest.writeDocumentRequestThin(out, item.request);
107+
out.writeOptional(BulkItemResponse.THIN_WRITER, item.primaryResponse);
108+
};
109109

110110
@Override
111111
public long ramBytesUsed() {

server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -264,15 +264,15 @@ public String toString() {
264264
id = in.readVInt();
265265
opType = OpType.fromId(in.readByte());
266266
response = readResponse(shardId, in);
267-
failure = in.readBoolean() ? new Failure(in) : null;
267+
failure = in.readOptionalWriteable(Failure::new);
268268
assertConsistent();
269269
}
270270

271271
BulkItemResponse(StreamInput in) throws IOException {
272272
id = in.readVInt();
273273
opType = OpType.fromId(in.readByte());
274274
response = readResponse(in);
275-
failure = in.readBoolean() ? new Failure(in) : null;
275+
failure = in.readOptionalWriteable(Failure::new);
276276
assertConsistent();
277277
}
278278

@@ -384,31 +384,21 @@ public void writeTo(StreamOutput out) throws IOException {
384384
writeResponseType(out);
385385
response.writeTo(out);
386386
}
387-
if (failure == null) {
388-
out.writeBoolean(false);
389-
} else {
390-
out.writeBoolean(true);
391-
failure.writeTo(out);
392-
}
387+
out.writeOptionalWriteable(failure);
393388
}
394389

395-
public void writeThin(StreamOutput out) throws IOException {
396-
out.writeVInt(id);
397-
out.writeByte(opType.getId());
390+
public static final Writer<BulkItemResponse> THIN_WRITER = (out, item) -> {
391+
out.writeVInt(item.id);
392+
out.writeByte(item.opType.getId());
398393

399-
if (response == null) {
394+
if (item.response == null) {
400395
out.writeByte((byte) 2);
401396
} else {
402-
writeResponseType(out);
403-
response.writeThin(out);
397+
item.writeResponseType(out);
398+
item.response.writeThin(out);
404399
}
405-
if (failure == null) {
406-
out.writeBoolean(false);
407-
} else {
408-
out.writeBoolean(true);
409-
failure.writeTo(out);
410-
}
411-
}
400+
out.writeOptionalWriteable(item.failure);
401+
};
412402

413403
private void writeResponseType(StreamOutput out) throws IOException {
414404
if (response instanceof SimulateIndexResponse) {

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,7 @@ public void writeTo(StreamOutput out) throws IOException {
130130
throw new IllegalStateException("Inference metadata should have been consumed before writing to the stream");
131131
}
132132
super.writeTo(out);
133-
out.writeArray((o, item) -> {
134-
if (item != null) {
135-
o.writeBoolean(true);
136-
item.writeThin(o);
137-
} else {
138-
o.writeBoolean(false);
139-
}
140-
}, items);
133+
out.writeArray((o, item) -> o.writeOptional(BulkItemRequest.THIN_WRITER, item), items);
141134
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_VALIDATES_MAPPINGS)) {
142135
out.writeBoolean(isSimulated);
143136
}

server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,6 @@ public void setForcedRefresh(boolean forcedRefresh) {
5656
public void writeTo(StreamOutput out) throws IOException {
5757
super.writeTo(out);
5858
shardId.writeTo(out);
59-
out.writeArray((o, item) -> item.writeThin(o), responses);
59+
out.writeArray(BulkItemResponse.THIN_WRITER, responses);
6060
}
6161
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,8 +1095,23 @@ public <T> T[] readOptionalArray(Writeable.Reader<T> reader, IntFunction<T[]> ar
10951095
return readBoolean() ? readArray(reader, arraySupplier) : null;
10961096
}
10971097

1098+
/**
1099+
* Reads a possibly-null value using the given {@link org.elasticsearch.common.io.stream.Writeable.Reader}.
1100+
*
1101+
* @see StreamOutput#writeOptionalWriteable
1102+
*/
1103+
// just an alias for readOptional() since we don't actually care whether T extends Writeable
10981104
@Nullable
10991105
public <T extends Writeable> T readOptionalWriteable(Writeable.Reader<T> reader) throws IOException {
1106+
return readOptional(reader);
1107+
}
1108+
1109+
/**
1110+
* Reads a possibly-null value using the given {@link org.elasticsearch.common.io.stream.Writeable.Reader}.
1111+
*
1112+
* @see StreamOutput#writeOptional
1113+
*/
1114+
public <T> T readOptional(Writeable.Reader<T> reader) throws IOException {
11001115
if (readBoolean()) {
11011116
T t = reader.read(this);
11021117
if (t == null) {

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,12 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
10151015
writeOptionalArray(StreamOutput::writeWriteable, array);
10161016
}
10171017

1018+
/**
1019+
* Writes a boolean value indicating whether the given object is {@code null}, followed by the object's serialization if it is not
1020+
* {@code null}.
1021+
*
1022+
* @see StreamInput#readOptionalWriteable
1023+
*/
10181024
public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
10191025
if (writeable != null) {
10201026
writeBoolean(true);
@@ -1024,6 +1030,21 @@ public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOExcep
10241030
}
10251031
}
10261032

1033+
/**
1034+
* Writes a boolean value indicating whether the given object is {@code null}, followed by the object's serialization if it is not
1035+
* {@code null}.
1036+
*
1037+
* @see StreamInput#readOptional
1038+
*/
1039+
public <T> void writeOptional(Writer<T> writer, @Nullable T maybeItem) throws IOException {
1040+
if (maybeItem != null) {
1041+
writeBoolean(true);
1042+
writer.write(this, maybeItem);
1043+
} else {
1044+
writeBoolean(false);
1045+
}
1046+
}
1047+
10271048
/**
10281049
* This method allow to use a method reference when writing collection elements such as
10291050
* {@code out.writeMap(map, StreamOutput::writeString, StreamOutput::writeWriteable)}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ private static Bucket createFromStream(StreamInput in, DocValueFormat format, bo
7272
String key = in.getTransportVersion().equals(TransportVersions.V_8_0_0) ? in.readString()
7373
: in.getTransportVersion().onOrAfter(TransportVersions.V_7_17_1) ? in.readOptionalString()
7474
: in.readString();
75-
BytesRef from = in.readBoolean() ? in.readBytesRef() : null;
76-
BytesRef to = in.readBoolean() ? in.readBytesRef() : null;
75+
BytesRef from = in.readOptional(StreamInput::readBytesRef);
76+
BytesRef to = in.readOptional(StreamInput::readBytesRef);
7777
long docCount = in.readLong();
7878
InternalAggregations aggregations = InternalAggregations.readFrom(in);
7979

@@ -89,14 +89,8 @@ public void writeTo(StreamOutput out) throws IOException {
8989
} else {
9090
out.writeString(key == null ? generateKey(from, to, format) : key);
9191
}
92-
out.writeBoolean(from != null);
93-
if (from != null) {
94-
out.writeBytesRef(from);
95-
}
96-
out.writeBoolean(to != null);
97-
if (to != null) {
98-
out.writeBytesRef(to);
99-
}
92+
out.writeOptional(StreamOutput::writeBytesRef, from);
93+
out.writeOptional(StreamOutput::writeBytesRef, to);
10094
out.writeLong(docCount);
10195
aggregations.writeTo(out);
10296
}

server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,17 @@ public void checkZonedDateTimeSerialization(TransportVersion tv) throws IOExcept
761761
}
762762
}
763763

764+
public void testOptional() throws IOException {
765+
try (var output = new BytesStreamOutput()) {
766+
output.writeOptional(StreamOutput::writeString, "not-null");
767+
output.writeOptional(StreamOutput::writeString, null);
768+
769+
final var input = getStreamInput(output.bytes());
770+
assertEquals("not-null", input.readOptional(StreamInput::readString));
771+
assertNull(input.readOptional(StreamInput::readString));
772+
}
773+
}
774+
764775
private void assertSerialization(
765776
CheckedConsumer<StreamOutput, IOException> outputAssertions,
766777
CheckedConsumer<StreamInput, IOException> inputAssertions,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public RollupJobStatus(IndexerState state, @Nullable Map<String, Object> positio
7474

7575
public RollupJobStatus(StreamInput in) throws IOException {
7676
state = IndexerState.fromStream(in);
77-
currentPosition = in.readBoolean() ? new TreeMap<>(in.readGenericMap()) : null;
77+
currentPosition = in.readOptional(CURRENT_POSITION_READER);
7878
if (in.getTransportVersion().before(TransportVersions.V_8_0_0)) {
7979
// 7.x nodes serialize `upgradedDocumentID` flag. We don't need it anymore, but
8080
// we need to pull it off the stream
@@ -83,6 +83,8 @@ public RollupJobStatus(StreamInput in) throws IOException {
8383
}
8484
}
8585

86+
private static final Reader<TreeMap<String, Object>> CURRENT_POSITION_READER = in -> new TreeMap<>(in.readGenericMap());
87+
8688
public IndexerState getIndexerState() {
8789
return state;
8890
}
@@ -118,10 +120,7 @@ public String getWriteableName() {
118120
@Override
119121
public void writeTo(StreamOutput out) throws IOException {
120122
state.writeTo(out);
121-
out.writeBoolean(currentPosition != null);
122-
if (currentPosition != null) {
123-
out.writeGenericMap(currentPosition);
124-
}
123+
out.writeOptional(StreamOutput::writeGenericMap, currentPosition);
125124
if (out.getTransportVersion().before(TransportVersions.V_8_0_0)) {
126125
// 7.x nodes expect a boolean `upgradedDocumentID` flag. We don't have it anymore,
127126
// but we need to tell them we are upgraded in case there is a mixed cluster

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/execute/ExecuteWatchRequest.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,8 @@ public ExecuteWatchRequest(StreamInput in) throws IOException {
5959
id = in.readOptionalString();
6060
ignoreCondition = in.readBoolean();
6161
recordExecution = in.readBoolean();
62-
if (in.readBoolean()) {
63-
alternativeInput = in.readGenericMap();
64-
}
65-
if (in.readBoolean()) {
66-
triggerData = in.readGenericMap();
67-
}
62+
alternativeInput = in.readOptional(StreamInput::readGenericMap);
63+
triggerData = in.readOptional(StreamInput::readGenericMap);
6864
long actionModesCount = in.readLong();
6965
actionModes = new HashMap<>();
7066
for (int i = 0; i < actionModesCount; i++) {
@@ -83,14 +79,8 @@ public void writeTo(StreamOutput out) throws IOException {
8379
out.writeOptionalString(id);
8480
out.writeBoolean(ignoreCondition);
8581
out.writeBoolean(recordExecution);
86-
out.writeBoolean(alternativeInput != null);
87-
if (alternativeInput != null) {
88-
out.writeGenericMap(alternativeInput);
89-
}
90-
out.writeBoolean(triggerData != null);
91-
if (triggerData != null) {
92-
out.writeGenericMap(triggerData);
93-
}
82+
out.writeOptional(StreamOutput::writeGenericMap, alternativeInput);
83+
out.writeOptional(StreamOutput::writeGenericMap, triggerData);
9484
out.writeLong(actionModes.size());
9585
for (Map.Entry<String, ActionExecutionMode> entry : actionModes.entrySet()) {
9686
out.writeString(entry.getKey());

0 commit comments

Comments
 (0)