Skip to content

Commit 50fe2b7

Browse files
Release transport messages incrementally while reading them
Still a few small steps left to clean this up but even in this form this solution essentially up to halves the heap used for handling large bulk shard requests on non-coordinating data nodes (this is just one example, there's a couple spots where this saves a lot of memory). Also, this could be extended to be a little smarter around compression easily, allowing for potential order of magnitude savings around indexing if we lazy deserialize individual docs or play similar tricks.
1 parent b25518a commit 50fe2b7

File tree

4 files changed

+29
-47
lines changed

4 files changed

+29
-47
lines changed

server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -259,30 +259,14 @@ public void close() {
259259
}
260260
}
261261

262-
@Override
263-
public int read(byte[] b, int bOffset, int len) throws IOException {
264-
int res = super.read(b, bOffset, len);
262+
public void tryDiscard() {
265263
if (markEnd == 0) {
266-
tryDiscard();
267-
}
268-
return res;
269-
}
270-
271-
private void tryDiscard() {
272-
if (bytesReference instanceof CompositeBytesReference c) {
273-
maybeDiscardReadBytes(c.components());
274-
} else if (available() == 0) {
275-
close();
276-
}
277-
}
278-
279-
@Override
280-
public int read() throws IOException {
281-
int res = super.read();
282-
if (res == -1 && markEnd == 0) {
283-
close();
264+
if (bytesReference instanceof CompositeBytesReference c) {
265+
maybeDiscardReadBytes(c.components());
266+
} else if (available() == 0) {
267+
close();
268+
}
284269
}
285-
return res;
286270
}
287271

288272
private int markEnd = 0;

server/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ public int read(byte[] b, int off, int len) throws IOException {
108108
return delegate.read(b, off, len);
109109
}
110110

111+
@Override
112+
public void tryDiscard() {
113+
delegate.tryDiscard();
114+
}
115+
111116
@Override
112117
public void close() throws IOException {
113118
delegate.close();

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,17 @@ public void setTransportVersion(TransportVersion version) {
109109
@Override
110110
public abstract int read(byte[] b, int off, int len) throws IOException;
111111

112+
public void tryDiscard() {}
113+
112114
/**
113115
* Reads a bytes reference from this stream, copying any bytes read to a new {@code byte[]}. Use {@link #readReleasableBytesReference()}
114116
* when reading large bytes references where possible top avoid needless allocations and copying.
115117
*/
116118
public BytesReference readBytesReference() throws IOException {
117119
int length = readArraySize();
118-
return readBytesReference(length);
120+
var res = readBytesReference(length);
121+
tryDiscard();
122+
return res;
119123
}
120124

121125
/**

server/src/main/java/org/elasticsearch/transport/TransportLogger.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -118,35 +118,24 @@ private static String format(TcpChannel channel, InboundMessage message, String
118118
if (message.isPing()) {
119119
sb.append(" [ping]").append(' ').append(event).append(": ").append(6).append('B');
120120
} else {
121-
boolean success = false;
122121
Header header = message.getHeader();
123122
int networkMessageSize = header.getNetworkMessageSize();
124123
int messageLengthWithHeader = HEADER_SIZE + networkMessageSize;
125-
StreamInput streamInput = message.openOrGetStreamInput();
126-
try {
127-
final long requestId = header.getRequestId();
128-
final boolean isRequest = header.isRequest();
129-
final String type = isRequest ? "request" : "response";
130-
final String version = header.getVersion().toString();
131-
sb.append(" [length: ").append(messageLengthWithHeader);
132-
sb.append(", request id: ").append(requestId);
133-
sb.append(", type: ").append(type);
134-
sb.append(", version: ").append(version);
124+
final long requestId = header.getRequestId();
125+
final boolean isRequest = header.isRequest();
126+
final String type = isRequest ? "request" : "response";
127+
final String version = header.getVersion().toString();
128+
sb.append(" [length: ").append(messageLengthWithHeader);
129+
sb.append(", request id: ").append(requestId);
130+
sb.append(", type: ").append(type);
131+
sb.append(", version: ").append(version);
135132

136-
// TODO: Maybe Fix for BWC
137-
if (header.needsToReadVariableHeader() == false && isRequest) {
138-
sb.append(", action: ").append(header.getActionName());
139-
}
140-
sb.append(']');
141-
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
142-
success = true;
143-
} finally {
144-
if (success) {
145-
IOUtils.close(streamInput);
146-
} else {
147-
IOUtils.closeWhileHandlingException(streamInput);
148-
}
133+
// TODO: Maybe Fix for BWC
134+
if (header.needsToReadVariableHeader() == false && isRequest) {
135+
sb.append(", action: ").append(header.getActionName());
149136
}
137+
sb.append(']');
138+
sb.append(' ').append(event).append(": ").append(messageLengthWithHeader).append('B');
150139
}
151140
return sb.toString();
152141
}

0 commit comments

Comments
 (0)