Skip to content

Commit b25518a

Browse files
wee
1 parent 8e3810c commit b25518a

File tree

5 files changed

+180
-33
lines changed

5 files changed

+180
-33
lines changed

server/src/main/java/org/elasticsearch/common/bytes/CompositeBytesReference.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ public BytesReference slice(int from, int length) {
168168
return CompositeBytesReference.ofMultiple(inSlice);
169169
}
170170

171+
public BytesReference[] components() {
172+
return references;
173+
}
174+
171175
private int getOffsetIndex(int offset) {
172176
final int i = Arrays.binarySearch(offsets, offset);
173177
return i < 0 ? (-(i + 1)) - 1 : i;

server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,144 @@ public long ramBytesUsed() {
157157
return delegate.ramBytesUsed();
158158
}
159159

160+
public static StreamInput consumingStreamInput(ReleasableBytesReference... references) throws IOException {
161+
final BytesReference bytesReference;
162+
final RefCounted[] refs = new RefCounted[references.length];
163+
if (references.length == 1) {
164+
final var ref = references[0];
165+
bytesReference = ref;
166+
refs[0] = ref.refCounted;
167+
} else {
168+
bytesReference = CompositeBytesReference.of(references);
169+
for (int i = 0; i < references.length; i++) {
170+
refs[i] = references[i].refCounted;
171+
}
172+
}
173+
return new BytesReferenceStreamInput(bytesReference) {
174+
private ReleasableBytesReference retainAndSkip(int len) throws IOException {
175+
if (len == 0) {
176+
return ReleasableBytesReference.empty();
177+
}
178+
179+
int offset = offset();
180+
skip(len);
181+
// move the stream manually since creating the slice didn't move it
182+
if (bytesReference instanceof ReleasableBytesReference releasable) {
183+
ReleasableBytesReference res = releasable.retainedSlice(offset, len);
184+
if (markEnd == 0 && available() == 0) {
185+
close();
186+
}
187+
return res;
188+
}
189+
assert bytesReference instanceof CompositeBytesReference;
190+
final CompositeBytesReference composite = (CompositeBytesReference) bytesReference;
191+
// instead of reading the bytes from a stream we just create a slice of the underlying bytes
192+
final BytesReference result = composite.slice(offset, len);
193+
if (result instanceof ReleasableBytesReference releasable) {
194+
return releasable.retain();
195+
}
196+
assert result instanceof CompositeBytesReference;
197+
var compositeSlice = (CompositeBytesReference) result;
198+
var components = compositeSlice.components();
199+
final RefCounted[] refCounteds = new RefCounted[components.length];
200+
for (int i = 0; i < components.length; i++) {
201+
refCounteds[i] = ((ReleasableBytesReference) components[i]).retain();
202+
}
203+
if (markEnd == 0) {
204+
maybeDiscardReadBytes(composite.components());
205+
}
206+
return new ReleasableBytesReference(compositeSlice, () -> {
207+
for (int i = 0; i < refCounteds.length; i++) {
208+
refCounteds[i].decRef();
209+
refCounteds[i] = null;
210+
}
211+
});
212+
}
213+
214+
private void maybeDiscardReadBytes(BytesReference[] components) {
215+
int offset = offset();
216+
int p = 0;
217+
for (int i = 0; i < components.length; i++) {
218+
p += components[i].length();
219+
if (p >= offset) {
220+
return;
221+
}
222+
var r = refs[i];
223+
if (r != null) {
224+
r.decRef();
225+
refs[i] = null;
226+
}
227+
}
228+
}
229+
230+
@Override
231+
public ReleasableBytesReference readReleasableBytesReference() throws IOException {
232+
final int len = readVInt();
233+
return retainAndSkip(len);
234+
}
235+
236+
@Override
237+
public ReleasableBytesReference readReleasableBytesReference(int len) throws IOException {
238+
return retainAndSkip(len);
239+
}
240+
241+
@Override
242+
public ReleasableBytesReference readAllToReleasableBytesReference() throws IOException {
243+
return retainAndSkip(bytesReference.length() - offset());
244+
}
245+
246+
@Override
247+
public boolean supportReadAllToReleasableBytesReference() {
248+
return true;
249+
}
250+
251+
@Override
252+
public void close() {
253+
for (int i = 0; i < refs.length; i++) {
254+
RefCounted ref = refs[i];
255+
if (ref != null) {
256+
refs[i] = null;
257+
ref.decRef();
258+
}
259+
}
260+
}
261+
262+
@Override
263+
public int read(byte[] b, int bOffset, int len) throws IOException {
264+
int res = super.read(b, bOffset, len);
265+
if (markEnd == 0) {
266+
tryDiscard();
267+
}
268+
return res;
269+
}
270+
271+
private void tryDiscard() {
272+
if (bytesReference instanceof CompositeBytesReference c) {
273+
maybeDiscardReadBytes(c.components());
274+
} else if (available() == 0) {
275+
close();
276+
}
277+
}
278+
279+
@Override
280+
public int read() throws IOException {
281+
int res = super.read();
282+
if (res == -1 && markEnd == 0) {
283+
close();
284+
}
285+
return res;
286+
}
287+
288+
private int markEnd = 0;
289+
290+
@Override
291+
public void mark(int readLimit) {
292+
super.mark(readLimit);
293+
markEnd = offset() + readLimit;
294+
}
295+
};
296+
}
297+
160298
@Override
161299
public StreamInput streamInput() throws IOException {
162300
assert hasReferences();

server/src/main/java/org/elasticsearch/transport/InboundAggregator.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
import org.elasticsearch.common.breaker.CircuitBreaker;
1313
import org.elasticsearch.common.breaker.CircuitBreakingException;
14-
import org.elasticsearch.common.bytes.BytesReference;
15-
import org.elasticsearch.common.bytes.CompositeBytesReference;
1614
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1715
import org.elasticsearch.core.Releasable;
1816
import org.elasticsearch.core.Releasables;
@@ -95,19 +93,30 @@ public void aggregate(ReleasableBytesReference content) {
9593

9694
public InboundMessage finishAggregation() throws IOException {
9795
ensureOpen();
98-
final ReleasableBytesReference releasableContent;
96+
final ReleasableBytesReference[] releasableContent;
97+
final int len;
9998
if (isFirstContent()) {
100-
releasableContent = ReleasableBytesReference.empty();
99+
releasableContent = new ReleasableBytesReference[] { ReleasableBytesReference.empty() };
100+
len = 0;
101101
} else if (contentAggregation == null) {
102-
releasableContent = firstContent;
102+
releasableContent = new ReleasableBytesReference[] { firstContent };
103+
len = firstContent.length();
103104
} else {
104-
final ReleasableBytesReference[] references = contentAggregation.toArray(new ReleasableBytesReference[0]);
105-
final BytesReference content = CompositeBytesReference.of(references);
106-
releasableContent = new ReleasableBytesReference(content, () -> Releasables.close(references));
105+
releasableContent = contentAggregation.toArray(new ReleasableBytesReference[0]);
106+
int l = 0;
107+
for (ReleasableBytesReference releasableBytesReference : releasableContent) {
108+
l += releasableBytesReference.length();
109+
}
110+
len = l;
107111
}
108112

109113
final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
110-
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
114+
final InboundMessage aggregated = new InboundMessage(
115+
currentHeader,
116+
ReleasableBytesReference.consumingStreamInput(releasableContent),
117+
len,
118+
breakerControl
119+
);
111120
boolean success = false;
112121
try {
113122
if (aggregated.getHeader().needsToReadVariableHeader()) {

server/src/main/java/org/elasticsearch/transport/InboundMessage.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
package org.elasticsearch.transport;
1111

1212
import org.elasticsearch.ElasticsearchException;
13-
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1413
import org.elasticsearch.common.io.stream.StreamInput;
1514
import org.elasticsearch.core.IOUtils;
1615
import org.elasticsearch.core.Releasable;
@@ -23,7 +22,7 @@
2322
public class InboundMessage implements Releasable {
2423

2524
private final Header header;
26-
private final ReleasableBytesReference content;
25+
private final int contentLength;
2726
private final Exception exception;
2827
private final boolean isPing;
2928
private Releasable breakerRelease;
@@ -42,25 +41,27 @@ public class InboundMessage implements Releasable {
4241
}
4342
}
4443

45-
public InboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
44+
public InboundMessage(Header header, StreamInput streamInput, int contentLength, Releasable breakerRelease) {
4645
this.header = header;
47-
this.content = content;
46+
this.streamInput = streamInput;
47+
streamInput.setTransportVersion(header.getVersion());
4848
this.breakerRelease = breakerRelease;
4949
this.exception = null;
5050
this.isPing = false;
51+
this.contentLength = contentLength;
5152
}
5253

5354
public InboundMessage(Header header, Exception exception) {
5455
this.header = header;
55-
this.content = null;
56+
this.contentLength = 0;
5657
this.breakerRelease = null;
5758
this.exception = exception;
5859
this.isPing = false;
5960
}
6061

6162
public InboundMessage(Header header, boolean isPing) {
6263
this.header = header;
63-
this.content = null;
64+
this.contentLength = 0;
6465
this.breakerRelease = null;
6566
this.exception = null;
6667
this.isPing = isPing;
@@ -71,11 +72,7 @@ public Header getHeader() {
7172
}
7273

7374
public int getContentLength() {
74-
if (content == null) {
75-
return 0;
76-
} else {
77-
return content.length();
78-
}
75+
return contentLength;
7976
}
8077

8178
public Exception getException() {
@@ -97,12 +94,6 @@ public Releasable takeBreakerReleaseControl() {
9794
}
9895

9996
public StreamInput openOrGetStreamInput() throws IOException {
100-
assert isPing == false && content != null;
101-
assert (boolean) CLOSED.getAcquire(this) == false;
102-
if (streamInput == null) {
103-
streamInput = content.streamInput();
104-
streamInput.setTransportVersion(header.getVersion());
105-
}
10697
return streamInput;
10798
}
10899

@@ -117,7 +108,7 @@ public void close() {
117108
return;
118109
}
119110
try {
120-
IOUtils.close(streamInput, content, breakerRelease);
111+
IOUtils.close(streamInput, breakerRelease);
121112
} catch (Exception e) {
122113
assert false : e;
123114
throw new ElasticsearchException(e);

server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.action.ActionListener;
1717
import org.elasticsearch.common.bytes.BytesArray;
1818
import org.elasticsearch.common.bytes.BytesReference;
19-
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2019
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2120
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
2221
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -190,7 +189,7 @@ public TestResponse read(StreamInput in) throws IOException {
190189
TransportStatus.setRequest((byte) 0),
191190
TransportVersion.current()
192191
);
193-
InboundMessage requestMessage = new InboundMessage(requestHeader, ReleasableBytesReference.wrap(requestContent), () -> {});
192+
InboundMessage requestMessage = new InboundMessage(requestHeader, requestContent.streamInput(), requestContent.length(), () -> {});
194193
requestHeader.finishParsingHeader(requestMessage.openOrGetStreamInput());
195194
handler.inboundMessage(channel, requestMessage);
196195

@@ -210,7 +209,12 @@ public TestResponse read(StreamInput in) throws IOException {
210209
BytesReference fullResponseBytes = channel.getMessageCaptor().get();
211210
BytesReference responseContent = fullResponseBytes.slice(TcpHeader.HEADER_SIZE, fullResponseBytes.length() - TcpHeader.HEADER_SIZE);
212211
Header responseHeader = new Header(fullRequestBytes.length() - 6, requestId, responseStatus, TransportVersion.current());
213-
InboundMessage responseMessage = new InboundMessage(responseHeader, ReleasableBytesReference.wrap(responseContent), () -> {});
212+
InboundMessage responseMessage = new InboundMessage(
213+
responseHeader,
214+
responseContent.streamInput(),
215+
responseContent.length(),
216+
() -> {}
217+
);
214218
responseHeader.finishParsingHeader(responseMessage.openOrGetStreamInput());
215219
handler.inboundMessage(channel, responseMessage);
216220

@@ -298,7 +302,8 @@ public void testLogsSlowInboundProcessing() throws Exception {
298302
}
299303
final InboundMessage requestMessage = new InboundMessage(
300304
requestHeader,
301-
ReleasableBytesReference.wrap(byteData.bytes()),
305+
byteData.bytes().streamInput(),
306+
byteData.size(),
302307
() -> safeSleep(TimeValue.timeValueSeconds(1))
303308
);
304309
requestHeader.actionName = TransportHandshaker.HANDSHAKE_ACTION_NAME;
@@ -327,14 +332,14 @@ public void onResponseReceived(long requestId, Transport.ResponseContext context
327332
safeSleep(TimeValue.timeValueSeconds(1));
328333
}
329334
});
330-
handler.inboundMessage(channel, new InboundMessage(responseHeader, ReleasableBytesReference.empty(), () -> {}));
335+
handler.inboundMessage(channel, new InboundMessage(responseHeader, BytesArray.EMPTY.streamInput(), 0, () -> {}));
331336

332337
mockLog.assertAllExpectationsMatched();
333338
}
334339
}
335340

336341
private static InboundMessage unreadableInboundHandshake(TransportVersion remoteVersion, Header requestHeader) {
337-
return new InboundMessage(requestHeader, ReleasableBytesReference.wrap(BytesArray.EMPTY), () -> {}) {
342+
return new InboundMessage(requestHeader, BytesArray.EMPTY.streamInput(), 0, () -> {}) {
338343
@Override
339344
public StreamInput openOrGetStreamInput() {
340345
final StreamInput streamInput = new InputStreamStreamInput(new InputStream() {

0 commit comments

Comments
 (0)