Skip to content

Commit 661537e

Browse files
committed
change
1 parent 632831b commit 661537e

File tree

3 files changed

+41
-47
lines changed

3 files changed

+41
-47
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
4444
private int pageIndex = -1;
4545
private int currentCapacity = 0;
4646
private int currentPageOffset;
47+
48+
private byte[] bytesRefBytes;
49+
private int bytesRefOffset;
4750

4851
public RecyclerBytesStreamOutput(Recycler<BytesRef> recycler) {
4952
this.recycler = recycler;
@@ -58,14 +61,11 @@ public long position() {
5861

5962
@Override
6063
public void writeByte(byte b) {
61-
int currentPageOffset = this.currentPageOffset;
62-
if (1 > (pageSize - currentPageOffset)) {
64+
if (currentPageOffset == pageSize) {
6365
ensureCapacity(1);
64-
currentPageOffset = 0;
6566
}
66-
BytesRef currentPage = pages.get(pageIndex).v();
67-
currentPage.bytes[currentPage.offset + currentPageOffset] = b;
68-
this.currentPageOffset = currentPageOffset + 1;
67+
bytesRefBytes[bytesRefOffset + currentPageOffset] = b;
68+
++currentPageOffset;
6969
}
7070

7171
@Override
@@ -77,36 +77,21 @@ public void writeBytes(byte[] b, int offset, int length) {
7777

7878
Objects.checkFromIndexSize(offset, length, b.length);
7979

80-
// get enough pages for new size
81-
final int pageSize = this.pageSize;
82-
int currentPageOffset = this.currentPageOffset;
83-
if (length > pageSize - currentPageOffset) {
84-
ensureCapacity(length);
85-
currentPageOffset = this.currentPageOffset;
86-
}
87-
88-
// bulk copy
8980
int bytesToCopy = length;
90-
int srcOff = offset;
91-
int j = 0;
92-
while (true) {
93-
BytesRef currentPage = pages.get(pageIndex + j).v();
81+
int srcOffset = offset;
82+
83+
while (bytesToCopy > 0) {
84+
if (currentPageOffset == pageSize) {
85+
ensureCapacity(1);
86+
}
87+
9488
int toCopyThisLoop = Math.min(pageSize - currentPageOffset, bytesToCopy);
95-
System.arraycopy(b, srcOff, currentPage.bytes, currentPage.offset + currentPageOffset, toCopyThisLoop);
96-
srcOff += toCopyThisLoop;
89+
System.arraycopy(b, srcOffset, bytesRefBytes, bytesRefOffset + currentPageOffset, toCopyThisLoop);
90+
91+
srcOffset += toCopyThisLoop;
9792
bytesToCopy -= toCopyThisLoop;
98-
if (bytesToCopy > 0) {
99-
currentPageOffset = 0;
100-
} else {
101-
currentPageOffset += toCopyThisLoop;
102-
break;
103-
}
104-
j++;
93+
currentPageOffset += toCopyThisLoop;
10594
}
106-
this.currentPageOffset = currentPageOffset;
107-
108-
// advance
109-
pageIndex += j;
11095
}
11196

11297
@Override
@@ -115,8 +100,7 @@ public void writeInt(int i) throws IOException {
115100
if (4 > (pageSize - currentPageOffset)) {
116101
super.writeInt(i);
117102
} else {
118-
BytesRef currentPage = pages.get(pageIndex).v();
119-
VH_BE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
103+
VH_BE_INT.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
120104
this.currentPageOffset = currentPageOffset + 4;
121105
}
122106
}
@@ -126,8 +110,7 @@ public void writeIntLE(int i) throws IOException {
126110
if (4 > (pageSize - currentPageOffset)) {
127111
super.writeIntLE(i);
128112
} else {
129-
BytesRef currentPage = pages.get(pageIndex).v();
130-
VH_LE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
113+
VH_LE_INT.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
131114
currentPageOffset += 4;
132115
}
133116
}
@@ -138,8 +121,7 @@ public void writeLong(long i) throws IOException {
138121
if (8 > (pageSize - currentPageOffset)) {
139122
super.writeLong(i);
140123
} else {
141-
BytesRef currentPage = pages.get(pageIndex).v();
142-
VH_BE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
124+
VH_BE_LONG.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
143125
this.currentPageOffset = currentPageOffset + 8;
144126
}
145127
}
@@ -149,8 +131,7 @@ public void writeLongLE(long i) throws IOException {
149131
if (8 > (pageSize - currentPageOffset)) {
150132
super.writeLongLE(i);
151133
} else {
152-
BytesRef currentPage = pages.get(pageIndex).v();
153-
VH_LE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
134+
VH_LE_LONG.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
154135
currentPageOffset += 8;
155136
}
156137
}
@@ -189,9 +170,10 @@ public void writeString(String str) throws IOException {
189170
super.writeString(str);
190171
return;
191172
}
192-
BytesRef currentPage = pages.get(pageIndex).v();
193-
int off = currentPage.offset + currentPageOffset;
194-
byte[] buffer = currentPage.bytes;
173+
174+
int off = bytesRefOffset + currentPageOffset;
175+
byte[] buffer = bytesRefBytes;
176+
195177
// mostly duplicated from StreamOutput.writeString to to get more reliable compilation of this very hot loop
196178
int offset = off + putVInt(buffer, charCount, off);
197179
for (int i = 0; i < charCount; i++) {
@@ -207,7 +189,7 @@ public void writeString(String str) throws IOException {
207189
buffer[offset++] = ((byte) (0x80 | c >> 0 & 0x3F));
208190
}
209191
}
210-
this.currentPageOffset = offset - currentPage.offset;
192+
this.currentPageOffset = offset - bytesRefOffset;
211193
}
212194

213195
@Override
@@ -220,6 +202,7 @@ public void seek(long position) {
220202
ensureCapacityFromPosition(position);
221203
int offsetInPage = (int) (position % pageSize);
222204
int pageIndex = (int) position / pageSize;
205+
223206
// Special handling for seeking to the first index in a new page, which is handled as a seeking to one-after the last index
224207
// in the previous case. This is done so that seeking to the first index of a new page does not cause a page allocation while
225208
// still allowing a fast check via (pageSize - currentPageOffset) on the remaining size in the current page in all other methods.
@@ -230,6 +213,9 @@ public void seek(long position) {
230213
this.pageIndex = pageIndex;
231214
this.currentPageOffset = offsetInPage;
232215
}
216+
BytesRef page = pages.get(this.pageIndex).v();
217+
this.bytesRefBytes = page.bytes;
218+
this.bytesRefOffset = page.offset;
233219
}
234220

235221
public void skip(int length) {
@@ -241,6 +227,8 @@ public void close() {
241227
var pages = this.pages;
242228
if (pages != null) {
243229
this.pages = null;
230+
231+
this.bytesRefBytes = null;
244232
Releasables.close(pages);
245233
}
246234
}
@@ -255,6 +243,8 @@ public ReleasableBytesReference moveToBytesReference() {
255243
var bytes = bytes();
256244
var pages = this.pages;
257245
this.pages = null;
246+
247+
this.bytesRefBytes = null;
258248
return new ReleasableBytesReference(bytes, () -> Releasables.close(pages));
259249
}
260250

@@ -322,6 +312,10 @@ private void ensureCapacityFromPosition(long newPosition) {
322312
if (currentPageOffset == pageSize) {
323313
pageIndex++;
324314
currentPageOffset = 0;
315+
316+
BytesRef page = pages.get(pageIndex).v();
317+
bytesRefBytes = page.bytes;
318+
bytesRefOffset = page.offset;
325319
}
326320
}
327321
}

server/src/main/java/org/elasticsearch/ingest/ESONFlat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public BytesReference getSerializedKeyBytes() {
7777
String key = entry.key();
7878
byte[] bytes = key == null ? EMPTY_KEY : key.getBytes(StandardCharsets.UTF_8);
7979
streamOutput.writeVInt(bytes.length);
80-
streamOutput.writeBytes(bytes);
80+
streamOutput.writeBytes(bytes, 0, bytes.length);
8181
streamOutput.writeByte(entry.type());
8282
// TODO: Combine
8383
if (entry instanceof ESONEntry.FieldEntry fieldEntry) {

server/src/main/java/org/elasticsearch/ingest/ESONSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ private static Value parseSimpleValue(XContentParser parser, RecyclerBytesStream
127127
case VALUE_STRING -> {
128128
XContentString.UTF8Bytes stringBytes = parser.optimizedText().bytes();
129129
bytes.writeVInt(stringBytes.length());
130-
bytes.write(stringBytes.bytes(), stringBytes.offset(), stringBytes.length());
130+
bytes.writeBytes(stringBytes.bytes(), stringBytes.offset(), stringBytes.length());
131131
yield new VariableValue((int) position, ESONEntry.STRING);
132132
}
133133
case VALUE_NUMBER -> {
@@ -153,7 +153,7 @@ yield switch (numberType) {
153153
byte type = numberType == XContentParser.NumberType.BIG_INTEGER ? ESONEntry.BIG_INTEGER : ESONEntry.BIG_DECIMAL;
154154
byte[] numberBytes = parser.text().getBytes(StandardCharsets.UTF_8);
155155
bytes.writeVInt(numberBytes.length);
156-
bytes.write(numberBytes);
156+
bytes.writeBytes(numberBytes, 0, numberBytes.length);
157157
yield new VariableValue((int) position, type);
158158
}
159159
};

0 commit comments

Comments
 (0)