Skip to content

Commit 5ea2d96

Browse files
authored
Improve serialization of translog operations (#134243)
Currently translog operations are serialized in a way that is relatively inefficient. This commit improves performance by re-ordering the operations to allow most of the header to be serialized without bounds checks. Additionally, instead of incrementally calculating the checksum we do a single pass at the end. Finally, we no longer copy the source twice. Instead serialize it directly into the translog writer.
1 parent 4ad6b28 commit 5ea2d96

File tree

18 files changed

+813
-178
lines changed

18 files changed

+813
-178
lines changed

server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ public static ReleasableBytesReference wrap(BytesReference reference) {
5151
return reference.length() == 0 ? empty() : new ReleasableBytesReference(reference, ALWAYS_REFERENCED);
5252
}
5353

54+
public static BytesReference unwrap(BytesReference reference) {
55+
if (reference instanceof ReleasableBytesReference releasable) {
56+
return releasable.delegate;
57+
}
58+
return reference;
59+
}
60+
5461
@Override
5562
public void incRef() {
5663
refCounted.incRef();
@@ -278,6 +285,11 @@ public int arrayOffset() {
278285
return delegate.arrayOffset();
279286
}
280287

288+
public BytesReference delegate() {
289+
assert hasReferences();
290+
return delegate;
291+
}
292+
281293
private static final class RefCountedReleasable extends AbstractRefCounted {
282294

283295
private final Releasable releasable;

server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
import org.elasticsearch.common.bytes.CompositeBytesReference;
1616
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1717
import org.elasticsearch.common.recycler.Recycler;
18+
import org.elasticsearch.common.util.ByteUtils;
1819
import org.elasticsearch.core.Releasable;
1920
import org.elasticsearch.core.Releasables;
2021

2122
import java.io.ByteArrayOutputStream;
2223
import java.io.IOException;
23-
import java.lang.invoke.MethodHandles;
24-
import java.lang.invoke.VarHandle;
25-
import java.nio.ByteOrder;
2624
import java.util.ArrayList;
2725
import java.util.Objects;
2826

@@ -33,11 +31,6 @@
3331
*/
3432
public class RecyclerBytesStreamOutput extends BytesStream implements Releasable {
3533

36-
static final VarHandle VH_BE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
37-
static final VarHandle VH_LE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN);
38-
static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
39-
static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);
40-
4134
private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>(8);
4235
private final Recycler<BytesRef> recycler;
4336
private final int pageSize;
@@ -148,7 +141,7 @@ public void writeVInt(int i) throws IOException {
148141
}
149142
}
150143

151-
protected static int vIntLength(int value) {
144+
public static int vIntLength(int value) {
152145
int leadingZeros = Integer.numberOfLeadingZeros(value);
153146
if (leadingZeros >= 25) {
154147
return 1;
@@ -177,7 +170,7 @@ public void writeInt(int i) throws IOException {
177170
super.writeInt(i);
178171
} else {
179172
BytesRef currentPage = currentBytesRef;
180-
VH_BE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
173+
ByteUtils.writeIntBE(i, currentPage.bytes, currentPage.offset + currentPageOffset);
181174
this.currentPageOffset = currentPageOffset + 4;
182175
}
183176
}
@@ -189,7 +182,7 @@ public void writeIntLE(int i) throws IOException {
189182
super.writeIntLE(i);
190183
} else {
191184
BytesRef currentPage = currentBytesRef;
192-
VH_LE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
185+
ByteUtils.writeIntLE(i, currentPage.bytes, currentPage.offset + currentPageOffset);
193186
this.currentPageOffset = currentPageOffset + 4;
194187
}
195188
}
@@ -201,7 +194,7 @@ public void writeLong(long i) throws IOException {
201194
super.writeLong(i);
202195
} else {
203196
BytesRef currentPage = currentBytesRef;
204-
VH_BE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
197+
ByteUtils.writeLongBE(i, currentPage.bytes, currentPage.offset + currentPageOffset);
205198
this.currentPageOffset = currentPageOffset + 8;
206199
}
207200
}
@@ -213,7 +206,7 @@ public void writeLongLE(long i) throws IOException {
213206
super.writeLongLE(i);
214207
} else {
215208
BytesRef currentPage = currentBytesRef;
216-
VH_LE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
209+
ByteUtils.writeLongLE(i, currentPage.bytes, currentPage.offset + currentPageOffset);
217210
this.currentPageOffset = currentPageOffset + 8;
218211
}
219212
}
@@ -241,6 +234,25 @@ public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException {
241234
}
242235
}
243236

237+
/**
238+
* Attempt to get one page to perform a write directly into the page. The page will only be returned if the requested bytes can fit.
239+
* If requested bytes cannot fit, null will be returned. This will advance the current position in the stream.
240+
*
241+
* @param bytes the number of bytes for the single write
242+
* @return a direct page if there is enough space in current page, otherwise null
243+
*/
244+
public BytesRef tryGetPageForWrite(int bytes) {
245+
final int beforePageOffset = this.currentPageOffset;
246+
if (bytes <= (pageSize - beforePageOffset)) {
247+
BytesRef currentPage = currentBytesRef;
248+
BytesRef bytesRef = new BytesRef(currentPage.bytes, currentPage.offset + beforePageOffset, bytes);
249+
this.currentPageOffset = beforePageOffset + bytes;
250+
return bytesRef;
251+
} else {
252+
return null;
253+
}
254+
}
255+
244256
// overridden with some code duplication the same way other write methods in this class are overridden to bypass StreamOutput's
245257
// intermediary buffers
246258
@Override

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,9 @@ private static LeafReader createInMemoryReader(
182182
boolean rootDocOnly,
183183
Translog.Index operation
184184
) {
185+
final String id = Uid.decodeId(operation.uid());
185186
final ParsedDocument parsedDocs = documentParser.parseDocument(
186-
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
187+
new SourceToParse(id, operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
187188
mappingLookup
188189
);
189190

@@ -244,7 +245,7 @@ protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReade
244245
}
245246
};
246247
} catch (IOException e) {
247-
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
248+
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + id + "]", e);
248249
}
249250
}
250251

@@ -339,7 +340,7 @@ private static class TranslogLeafReader extends LeafReader {
339340
this.engineConfig = engineConfig;
340341
this.onSegmentCreated = onSegmentCreated;
341342
this.directory = directory;
342-
this.uid = Uid.encodeId(operation.id());
343+
this.uid = operation.uid();
343344
}
344345

345346
private LeafReader getDelegate() {

server/src/main/java/org/elasticsearch/index/mapper/Uid.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ private static String decodeBase64Id(byte[] idBytes, int offset, int length) {
173173
return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(idBytes);
174174
}
175175

176+
/** Decode an indexed id back to its original form.
177+
* @see #encodeId */
178+
public static String decodeId(BytesRef idBytes) {
179+
return decodeId(idBytes.bytes, idBytes.offset, idBytes.length);
180+
}
181+
176182
/** Decode an indexed id back to its original form.
177183
* @see #encodeId */
178184
public static String decodeId(byte[] idBytes) {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2130,7 +2130,12 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
21302130
index.getAutoGeneratedIdTimestamp(),
21312131
true,
21322132
origin,
2133-
new SourceToParse(index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing())
2133+
new SourceToParse(
2134+
Uid.decodeId(index.uid()),
2135+
index.source(),
2136+
XContentHelper.xContentType(index.source()),
2137+
index.routing()
2138+
)
21342139
);
21352140
}
21362141
case DELETE -> {
@@ -2140,7 +2145,7 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
21402145
delete.seqNo(),
21412146
delete.primaryTerm(),
21422147
delete.version(),
2143-
delete.id(),
2148+
Uid.decodeId(delete.uid()),
21442149
versionType,
21452150
UNASSIGNED_SEQ_NO,
21462151
0,

server/src/main/java/org/elasticsearch/index/translog/OperationListener.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,15 @@
99

1010
package org.elasticsearch.index.translog;
1111

12-
import org.elasticsearch.common.bytes.BytesReference;
13-
1412
@FunctionalInterface
1513
public interface OperationListener {
1614

1715
/**
18-
* This method is called when a new operation is added to the translog. The BytesReference is a releasable
19-
* instance, so it should not be retained beyond the scope of this method.
16+
* This method is called when a new operation is added to the translog.
2017
*
21-
* @param data a releasable bytes reference of the data add
18+
* @param operation the serialized operation added to the translog
2219
* @param seqNo the sequence number of the operation
2320
* @param location the location written
2421
*/
25-
void operationAdded(BytesReference data, long seqNo, Translog.Location location);
22+
void operationAdded(Translog.Serialized operation, long seqNo, Translog.Location location);
2623
}

0 commit comments

Comments
 (0)