diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java index ff8e68d462829..fbc1958f70c5d 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java @@ -51,6 +51,13 @@ public static ReleasableBytesReference wrap(BytesReference reference) { return reference.length() == 0 ? empty() : new ReleasableBytesReference(reference, ALWAYS_REFERENCED); } + public static BytesReference unwrap(BytesReference reference) { + if (reference instanceof ReleasableBytesReference releasable) { + return releasable.delegate; + } + return reference; + } + @Override public void incRef() { refCounted.incRef(); @@ -278,6 +285,11 @@ public int arrayOffset() { return delegate.arrayOffset(); } + public BytesReference delegate() { + assert hasReferences(); + return delegate; + } + private static final class RefCountedReleasable extends AbstractRefCounted { private final Releasable releasable; diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java index 0209ffd4bdb81..fb90c219fb3b6 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java @@ -15,14 +15,12 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.util.ByteUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; -import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Objects; @@ -33,11 +31,6 @@ */ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable { - static final VarHandle VH_BE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN); - static final VarHandle VH_LE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN); - static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN); - static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN); - private ArrayList> pages = new ArrayList<>(8); private final Recycler recycler; private final int pageSize; @@ -148,7 +141,7 @@ public void writeVInt(int i) throws IOException { } } - protected static int vIntLength(int value) { + public static int vIntLength(int value) { int leadingZeros = Integer.numberOfLeadingZeros(value); if (leadingZeros >= 25) { return 1; @@ -177,7 +170,7 @@ public void writeInt(int i) throws IOException { super.writeInt(i); } else { BytesRef currentPage = currentBytesRef; - VH_BE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i); + ByteUtils.writeIntBE(i, currentPage.bytes, currentPage.offset + currentPageOffset); this.currentPageOffset = currentPageOffset + 4; } } @@ -189,7 +182,7 @@ public void writeIntLE(int i) throws IOException { super.writeIntLE(i); } else { BytesRef currentPage = currentBytesRef; - VH_LE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i); + ByteUtils.writeIntLE(i, currentPage.bytes, currentPage.offset + currentPageOffset); this.currentPageOffset = currentPageOffset + 4; } } @@ -201,7 +194,7 @@ public void writeLong(long i) throws IOException { super.writeLong(i); } else { BytesRef currentPage = currentBytesRef; - VH_BE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i); + ByteUtils.writeLongBE(i, currentPage.bytes, currentPage.offset + currentPageOffset); this.currentPageOffset = currentPageOffset + 8; } } @@ -213,7 +206,7 @@ public void writeLongLE(long i) throws IOException { super.writeLongLE(i); } else { BytesRef currentPage = currentBytesRef; - VH_LE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i); + ByteUtils.writeLongLE(i, currentPage.bytes, currentPage.offset + currentPageOffset); this.currentPageOffset = currentPageOffset + 8; } } @@ -241,6 +234,25 @@ public void legacyWriteWithSizePrefix(Writeable writeable) throws IOException { } } + /** + * Attempt to get one page to perform a write directly into the page. The page will only be returned if the requested bytes can fit. + * If requested bytes cannot fit, null will be returned. This will advance the current position in the stream. + * + * @param bytes the number of bytes for the single write + * @return a direct page if there is enough space in current page, otherwise null + */ + public BytesRef tryGetPageForWrite(int bytes) { + final int beforePageOffset = this.currentPageOffset; + if (bytes <= (pageSize - beforePageOffset)) { + BytesRef currentPage = currentBytesRef; + BytesRef bytesRef = new BytesRef(currentPage.bytes, currentPage.offset + beforePageOffset, bytes); + this.currentPageOffset = beforePageOffset + bytes; + return bytesRef; + } else { + return null; + } + } + // overridden with some code duplication the same way other write methods in this class are overridden to bypass StreamOutput's // intermediary buffers @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java index 24b0512d27598..5704db7663f24 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java @@ -182,8 +182,9 @@ private static LeafReader createInMemoryReader( boolean rootDocOnly, Translog.Index operation ) { + final String id = Uid.decodeId(operation.uid()); final ParsedDocument parsedDocs = documentParser.parseDocument( - new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()), + new SourceToParse(id, operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()), mappingLookup ); @@ -244,7 +245,7 @@ protected StoredFieldsReader doGetSequentialStoredFieldsReader(StoredFieldsReade } }; } catch (IOException e) { - throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e); + throw new EngineException(shardId, "failed to create an in-memory segment for get [" + id + "]", e); } } @@ -339,7 +340,7 @@ private static class TranslogLeafReader extends LeafReader { this.engineConfig = engineConfig; this.onSegmentCreated = onSegmentCreated; this.directory = directory; - this.uid = Uid.encodeId(operation.id()); + this.uid = operation.uid(); } private LeafReader getDelegate() { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Uid.java b/server/src/main/java/org/elasticsearch/index/mapper/Uid.java index 70db8eca209eb..78ee8ddce8d28 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Uid.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Uid.java @@ -173,6 +173,12 @@ private static String decodeBase64Id(byte[] idBytes, int offset, int length) { return Strings.BASE_64_NO_PADDING_URL_ENCODER.encodeToString(idBytes); } + /** Decode an indexed id back to its original form. + * @see #encodeId */ + public static String decodeId(BytesRef idBytes) { + return decodeId(idBytes.bytes, idBytes.offset, idBytes.length); + } + /** Decode an indexed id back to its original form. * @see #encodeId */ public static String decodeId(byte[] idBytes) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e94ef25aa7822..0b82ef49f10f9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2130,7 +2130,12 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o index.getAutoGeneratedIdTimestamp(), true, origin, - new SourceToParse(index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing()) + new SourceToParse( + Uid.decodeId(index.uid()), + index.source(), + XContentHelper.xContentType(index.source()), + index.routing() + ) ); } case DELETE -> { @@ -2140,7 +2145,7 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o delete.seqNo(), delete.primaryTerm(), delete.version(), - delete.id(), + Uid.decodeId(delete.uid()), versionType, UNASSIGNED_SEQ_NO, 0, diff --git a/server/src/main/java/org/elasticsearch/index/translog/OperationListener.java b/server/src/main/java/org/elasticsearch/index/translog/OperationListener.java index f63447dd6814c..02480c4e9bed3 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/OperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/translog/OperationListener.java @@ -9,18 +9,15 @@ package org.elasticsearch.index.translog; -import org.elasticsearch.common.bytes.BytesReference; - @FunctionalInterface public interface OperationListener { /** - * This method is called when a new operation is added to the translog. The BytesReference is a releasable - * instance, so it should not be retained beyond the scope of this method. + * This method is called when a new operation is added to the translog. * - * @param data a releasable bytes reference of the data add + * @param operation the serialized operation added to the translog * @param seqNo the sequence number of the operation * @param location the location written */ - void operationAdded(BytesReference data, long seqNo, Translog.Location location); + void operationAdded(Translog.Serialized operation, long seqNo, Translog.Location location); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 6e83a684cfa82..a5088b5896c80 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -9,14 +9,23 @@ package org.elasticsearch.index.translog; +import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -63,6 +72,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.index.translog.TranslogConfig.EMPTY_TRANSLOG_BUFFER_SIZE; @@ -90,6 +101,8 @@ */ public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { + public static final TransportVersion REORDERED_TRANSLOG_OPERATIONS = TransportVersion.fromName("reordered_translog_operations"); + /* * TODO * - we might need something like a deletion policy to hold on to more than one translog eventually (I think sequence IDs needs this) @@ -610,9 +623,15 @@ TranslogWriter createWriter( * @throws IOException if adding the operation to the translog resulted in an I/O exception */ public Location add(final Operation operation) throws IOException { - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) { - writeOperationWithSize(out, operation); - final BytesReference bytes = out.bytes(); + try (RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler())) { + writeHeaderWithSize(out, operation); + final BytesReference header = out.bytes(); + Serialized serialized = Serialized.create( + header, + operation instanceof Index index ? ReleasableBytesReference.unwrap(index.source()) : null, + new CRC32() + ); + readLock.lock(); try { ensureOpen(); @@ -633,7 +652,7 @@ public Location add(final Operation operation) throws IOException { + "]" ); } - return current.add(bytes, operation.seqNo()); + return current.add(serialized, operation.seqNo()); } finally { readLock.unlock(); } @@ -646,6 +665,54 @@ public Location add(final Operation operation) throws IOException { } } + public record Serialized(BytesReference header, @Nullable BytesReference source, int length, int checksum) { + + public Serialized(BytesReference header, @Nullable BytesReference source, int checksum) { + this(header, source, header.length() + (source == null ? 0 : source.length()) + 4, checksum); + } + + public BytesReference toBytesReference() throws IOException { + byte[] checksumBytes = new byte[4]; + DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(checksumBytes)); + out.writeInt(checksum); + BytesArray checksum = new BytesArray(checksumBytes); + return source == null ? CompositeBytesReference.of(header, checksum) : CompositeBytesReference.of(header, source, checksum); + } + + public static Serialized create(BytesReference header, @Nullable BytesReference source, Checksum checksum) throws IOException { + int length = header.length() + 4; + updateChecksum(header, checksum, 4); + if (source != null) { + updateChecksum(source, checksum, 0); + length += source.length(); + } + return new Serialized(header, source, length, (int) checksum.getValue()); + } + + private static void updateChecksum(BytesReference bytes, Checksum checksum, final int bytesToSkip) throws IOException { + if (bytes.hasArray()) { + checksum.update(bytes.array(), bytes.arrayOffset() + bytesToSkip, bytes.length() - bytesToSkip); + } else { + int offset = bytesToSkip; + BytesRefIterator iterator = bytes.iterator(); + BytesRef slice; + while ((slice = iterator.next()) != null) { + int toSkip = Math.min(offset, slice.length); + checksum.update(slice.bytes, slice.offset + toSkip, slice.length - toSkip); + offset -= toSkip; + } + } + } + + public void writeToTranslogBuffer(RecyclerBytesStreamOutput buffer) throws IOException { + header.writeTo(buffer); + if (source != null) { + source.writeTo(buffer); + } + buffer.writeInt(checksum); + } + } + /** * Tests whether or not the translog generation should be rolled to a new generation. This test * is based on the size of the current generation compared to the configured generation @@ -1156,6 +1223,15 @@ public final void writeTo(StreamOutput out) throws IOException { writeBody(out); } + /** + * Writes the operation header. This is the body excluding the index source. This method only differs from writeBody + * for the Index operation. This is because deletes and no-ops do not have a source. + */ + protected abstract void writeHeader(int format, StreamOutput out) throws IOException; + + /** + * Writes the entire operation body which comes after the byte indicating the operation type. + */ protected abstract void writeBody(StreamOutput out) throws IOException; } @@ -1164,9 +1240,10 @@ public static final class Index extends Operation { public static final int FORMAT_NO_PARENT = 9; // since 7.0 public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; public static final int FORMAT_NO_DOC_TYPE = FORMAT_NO_VERSION_TYPE + 1; - public static final int SERIALIZATION_FORMAT = FORMAT_NO_DOC_TYPE; + public static final int FORMAT_REORDERED = FORMAT_NO_DOC_TYPE + 1; + public static final int SERIALIZATION_FORMAT = FORMAT_REORDERED; - private final String id; + private final BytesRef uid; private final long autoGeneratedIdTimestamp; private final long version; private final BytesReference source; @@ -1175,26 +1252,43 @@ public static final class Index extends Operation { private static Index readFrom(StreamInput in) throws IOException { final int format = in.readVInt(); // SERIALIZATION_FORMAT assert format >= FORMAT_NO_PARENT : "format was: " + format; - String id = in.readString(); - if (format < FORMAT_NO_DOC_TYPE) { - in.readString(); - // can't assert that this is _doc because pre-8.0 indexes can have any name for a type - } - BytesReference source = in.readBytesReference(); - String routing = in.readOptionalString(); - long version = in.readLong(); - if (format < FORMAT_NO_VERSION_TYPE) { - in.readByte(); // _version_type + BytesRef uid; + BytesReference source; + String routing; + long version; + long autoGeneratedIdTimestamp; + long seqNo; + long primaryTerm; + if (format < FORMAT_REORDERED) { + uid = Uid.encodeId(in.readString()); + if (format < FORMAT_NO_DOC_TYPE) { + in.readString(); + // can't assert that this is _doc because pre-8.0 indexes can have any name for a type + } + source = in.readBytesReference(); + routing = in.readOptionalString(); + version = in.readLong(); + if (format < FORMAT_NO_VERSION_TYPE) { + in.readByte(); // _version_type + } + autoGeneratedIdTimestamp = in.readLong(); + seqNo = in.readLong(); + primaryTerm = in.readLong(); + } else { + version = in.readLong(); + seqNo = in.readLong(); + primaryTerm = in.readLong(); + autoGeneratedIdTimestamp = in.readLong(); + uid = in.readBytesRef(); + routing = in.readOptionalString(); + source = in.readBytesReference(); } - long autoGeneratedIdTimestamp = in.readLong(); - long seqNo = in.readLong(); - long primaryTerm = in.readLong(); - return new Index(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp); + return new Index(uid, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp); } public Index(Engine.Index index, Engine.IndexResult indexResult) { this( - index.id(), + index.uid(), indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), @@ -1212,9 +1306,21 @@ public Index( BytesReference source, String routing, long autoGeneratedIdTimestamp + ) { + this(Uid.encodeId(id), seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp); + } + + public Index( + BytesRef uid, + long seqNo, + long primaryTerm, + long version, + BytesReference source, + String routing, + long autoGeneratedIdTimestamp ) { super(seqNo, primaryTerm); - this.id = id; + this.uid = uid; this.source = source; this.version = version; this.routing = routing; @@ -1228,14 +1334,14 @@ public Type opType() { @Override public long estimateSize() { - return (2 * id.length()) + source.length() + (routing != null ? 2 * routing.length() : 0) + (4 * Long.BYTES); // timestamp, + return uid.length + source.length() + (routing != null ? 2 * routing.length() : 0) + (4 * Long.BYTES); // timestamp, // seq_no, // primary_term, // and version } - public String id() { - return this.id; + public BytesRef uid() { + return uid; } public String routing() { @@ -1251,21 +1357,40 @@ public long version() { } @Override - public void writeBody(final StreamOutput out) throws IOException { - final int format = out.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0) - ? SERIALIZATION_FORMAT - : FORMAT_NO_VERSION_TYPE; + protected void writeHeader(int format, StreamOutput out) throws IOException { out.writeVInt(format); - out.writeString(id); - if (format < FORMAT_NO_DOC_TYPE) { - out.writeString(MapperService.SINGLE_MAPPING_NAME); - } - out.writeBytesReference(source); - out.writeOptionalString(routing); out.writeLong(version); - out.writeLong(autoGeneratedIdTimestamp); out.writeLong(seqNo); out.writeLong(primaryTerm); + out.writeLong(autoGeneratedIdTimestamp); + out.writeBytesRef(uid); + out.writeOptionalString(routing); + out.writeVInt(source == null ? 0 : source.length()); + } + + @Override + public void writeBody(final StreamOutput out) throws IOException { + final int format = out.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0) + ? out.getTransportVersion().supports(REORDERED_TRANSLOG_OPERATIONS) ? SERIALIZATION_FORMAT : FORMAT_NO_DOC_TYPE + : FORMAT_NO_VERSION_TYPE; + if (format < FORMAT_REORDERED) { + out.writeVInt(format); + out.writeString(Uid.decodeId(uid.bytes, uid.offset, uid.length)); + if (format < FORMAT_NO_DOC_TYPE) { + out.writeString(MapperService.SINGLE_MAPPING_NAME); + } + out.writeBytesReference(source); + out.writeOptionalString(routing); + out.writeLong(version); + out.writeLong(autoGeneratedIdTimestamp); + out.writeLong(seqNo); + out.writeLong(primaryTerm); + } else { + writeHeader(format, out); + if (source != null) { + source.writeTo(out); + } + } } @Override @@ -1283,7 +1408,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = id.hashCode(); + int result = uid.hashCode(); result = 31 * result + Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); @@ -1297,7 +1422,7 @@ public int hashCode() { public String toString() { return "Index{" + "id='" - + id + + Uid.decodeId(uid.bytes, uid.offset, uid.length) + '\'' + ", seqNo=" + seqNo @@ -1318,7 +1443,7 @@ public static boolean equalsWithoutAutoGeneratedTimestamp(Translog.Index o1, Tra if (o1.version != o2.version || o1.seqNo != o2.seqNo || o1.primaryTerm != o2.primaryTerm - || o1.id.equals(o2.id) == false + || o1.uid.equals(o2.uid) == false || Objects.equals(o1.routing, o2.routing) == false) { return false; } @@ -1359,35 +1484,47 @@ public static final class Delete extends Operation { public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0 public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1; public static final int FORMAT_NO_DOC_TYPE = FORMAT_NO_VERSION_TYPE + 1; // since 8.0 - public static final int SERIALIZATION_FORMAT = FORMAT_NO_DOC_TYPE; + public static final int FORMAT_REORDERED = FORMAT_NO_DOC_TYPE + 1; + public static final int SERIALIZATION_FORMAT = FORMAT_REORDERED; - private final String id; + private final BytesRef uid; private final long version; private static Delete readFrom(StreamInput in) throws IOException { final int format = in.readVInt();// SERIALIZATION_FORMAT assert format >= FORMAT_6_0 : "format was: " + format; - if (format < FORMAT_NO_DOC_TYPE) { - in.readString(); - // Can't assert that this is _doc because pre-8.0 indexes can have any name for a type - } - String id = in.readString(); - if (format < FORMAT_NO_DOC_TYPE) { - final String docType = in.readString(); - assert docType.equals(IdFieldMapper.NAME) : docType + " != " + IdFieldMapper.NAME; - in.readSlicedBytesReference(); // uid - } - long version = in.readLong(); - if (format < FORMAT_NO_VERSION_TYPE) { - in.readByte(); // versionType + final BytesRef uid; + final long version; + final long seqNo; + final long primaryTerm; + if (format < FORMAT_REORDERED) { + if (format < FORMAT_NO_DOC_TYPE) { + in.readString(); + // Can't assert that this is _doc because pre-8.0 indexes can have any name for a type + } + uid = Uid.encodeId(in.readString()); + if (format < FORMAT_NO_DOC_TYPE) { + final String docType = in.readString(); + assert docType.equals(IdFieldMapper.NAME) : docType + " != " + IdFieldMapper.NAME; + in.readSlicedBytesReference(); // uid + } + version = in.readLong(); + if (format < FORMAT_NO_VERSION_TYPE) { + in.readByte(); // versionType + } + seqNo = in.readLong(); + primaryTerm = in.readLong(); + } else { + version = in.readLong(); + seqNo = in.readLong(); + primaryTerm = in.readLong(); + uid = in.readBytesRef(); } - long seqNo = in.readLong(); - long primaryTerm = in.readLong(); - return new Delete(id, seqNo, primaryTerm, version); + return new Delete(uid, seqNo, primaryTerm, version); } public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) { - this(delete.id(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion()); + this(delete.uid(), deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getVersion()); } /** utility for testing */ @@ -1395,9 +1532,14 @@ public Delete(String id, long seqNo, long primaryTerm) { this(id, seqNo, primaryTerm, Versions.MATCH_ANY); } + /** utility for testing */ public Delete(String id, long seqNo, long primaryTerm, long version) { + this(Uid.encodeId(id), seqNo, primaryTerm, version); + } + + public Delete(BytesRef uid, long seqNo, long primaryTerm, long version) { super(seqNo, primaryTerm); - this.id = Objects.requireNonNull(id); + this.uid = Objects.requireNonNull(uid); this.version = version; } @@ -1408,11 +1550,20 @@ public Type opType() { @Override public long estimateSize() { - return (2 * id.length()) + (3 * Long.BYTES); // seq_no, primary_term, and version; + return uid.length + (3 * Long.BYTES); // seq_no, primary_term, and version; } - public String id() { - return id; + @Override + protected void writeHeader(int format, StreamOutput out) throws IOException { + out.writeVInt(format); + out.writeLong(version); + out.writeLong(seqNo); + out.writeLong(primaryTerm); + out.writeBytesRef(uid); + } + + public BytesRef uid() { + return uid; } public long version() { @@ -1422,20 +1573,24 @@ public long version() { @Override public void writeBody(final StreamOutput out) throws IOException { final int format = out.getTransportVersion().onOrAfter(TransportVersions.V_8_0_0) - ? SERIALIZATION_FORMAT + ? out.getTransportVersion().supports(REORDERED_TRANSLOG_OPERATIONS) ? SERIALIZATION_FORMAT : FORMAT_NO_DOC_TYPE : FORMAT_NO_VERSION_TYPE; - out.writeVInt(format); - if (format < FORMAT_NO_DOC_TYPE) { - out.writeString(MapperService.SINGLE_MAPPING_NAME); - } - out.writeString(id); - if (format < FORMAT_NO_DOC_TYPE) { - out.writeString(IdFieldMapper.NAME); - out.writeBytesRef(Uid.encodeId(id)); + if (format < FORMAT_REORDERED) { + out.writeVInt(format); + if (format < FORMAT_NO_DOC_TYPE) { + out.writeString(MapperService.SINGLE_MAPPING_NAME); + } + out.writeString(Uid.decodeId(uid)); + if (format < FORMAT_NO_DOC_TYPE) { + out.writeString(IdFieldMapper.NAME); + out.writeBytesRef(uid); + } + out.writeLong(version); + out.writeLong(seqNo); + out.writeLong(primaryTerm); + } else { + writeHeader(format, out); } - out.writeLong(version); - out.writeLong(seqNo); - out.writeLong(primaryTerm); } @Override @@ -1449,12 +1604,12 @@ public boolean equals(Object o) { Delete delete = (Delete) o; - return id.equals(delete.id) && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm && version == delete.version; + return uid.equals(delete.uid) && seqNo == delete.seqNo && primaryTerm == delete.primaryTerm && version == delete.version; } @Override public int hashCode() { - int result = id.hashCode(); + int result = uid.hashCode(); result += 31 * Long.hashCode(seqNo); result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); @@ -1463,7 +1618,16 @@ public int hashCode() { @Override public String toString() { - return "Delete{" + "id='" + id + "', seqNo=" + seqNo + ", primaryTerm=" + primaryTerm + ", version=" + version + '}'; + return "Delete{" + + "id='" + + Uid.decodeId(uid) + + "', seqNo=" + + seqNo + + ", primaryTerm=" + + primaryTerm + + ", version=" + + version + + '}'; } } @@ -1487,12 +1651,18 @@ public NoOp(final long seqNo, final long primaryTerm, final String reason) { } @Override - public void writeBody(final StreamOutput out) throws IOException { + protected void writeHeader(int format, StreamOutput out) throws IOException { out.writeLong(seqNo); out.writeLong(primaryTerm); out.writeString(reason); } + @Override + public void writeBody(final StreamOutput out) throws IOException { + // No versioning for No-op + writeHeader(-1, out); + } + @Override public Type opType() { return Type.NO_OP; @@ -1642,15 +1812,12 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl out.writeInt((int) checksum); } - public static void writeOperationWithSize(BytesStreamOutput out, Translog.Operation op) throws IOException { - final long start = out.position(); - out.skip(Integer.BYTES); - writeOperationNoSize(new BufferedChecksumStreamOutput(out), op); - final long end = out.position(); - final int operationSize = (int) (end - Integer.BYTES - start); - out.seek(start); - out.writeInt(operationSize); - out.seek(end); + public static void writeHeaderWithSize(RecyclerBytesStreamOutput out, Translog.Operation op) throws IOException { + switch (op) { + case Index index -> TranslogHeaderWriter.writeIndexHeader(out, index); + case Delete delete -> TranslogHeaderWriter.writeDeleteHeader(out, delete); + case NoOp noOp -> TranslogHeaderWriter.writeNoOpHeader(out, noOp); + } } /** diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogHeaderWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogHeaderWriter.java new file mode 100644 index 0000000000000..e5f1027a93b1a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogHeaderWriter.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.translog; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.ByteUtils; + +import java.io.IOException; + +/** + * This class provided specialized serialization methods for translog operations. The goal is to allow direct + * access to the current recycler bytes stream output page and write the entire operation with a single bounds + * check. + */ +public final class TranslogHeaderWriter { + + public static final int FIXED_INDEX_HEADER_SIZE = 39; + public static final int FIXED_DELETE_HEADER_SIZE = 30; + public static final int FIXED_NO_OP_HEADER_SIZE = 21; + + public static final int OPERATION_TYPE_OFFSET = 4; + public static final int SERIALIZATION_FORMAT_OFFSET = 5; + public static final int VERSION_OFFSET = 6; + public static final int SEQ_NO_OFFSET = 14; + public static final int PRIMARY_TERM_OFFSET = 22; + + public static final int INDEX_AUTO_GENERATED_ID_TIMESTAMP_OFFSET = 30; + public static final int INDEX_UID_LENGTH_OFFSET = 38; + + private TranslogHeaderWriter() { + // This class depends on the serialization format fitting in a single vInt byte. If we advance pass 127 we need to tweak the logic + // to write 2 bytes. + assert Translog.Index.SERIALIZATION_FORMAT <= 127; + assert Translog.Delete.SERIALIZATION_FORMAT <= 127; + } + + public static void writeIndexHeader(RecyclerBytesStreamOutput buffer, Translog.Index index) throws IOException { + int uidLen = index.uid().length; + int uidVIntLen = RecyclerBytesStreamOutput.vIntLength(uidLen); + BytesRef page = buffer.tryGetPageForWrite(FIXED_INDEX_HEADER_SIZE + uidLen + uidVIntLen); + if (page != null) { + writeFastIndexHeader(buffer, index, page, uidVIntLen); + } else { + writeSlowIndexHeader(buffer, index); + } + } + + private static void writeFastIndexHeader(RecyclerBytesStreamOutput buffer, Translog.Index index, BytesRef page, int uidVIntLen) + throws IOException { + BytesRef uid = index.uid(); + String routing = index.routing(); + + int off = page.offset; + byte[] bytes = page.bytes; + bytes[off + OPERATION_TYPE_OFFSET] = Translog.Operation.Type.INDEX.id(); + // This is technically a vInt in the serialization, but until we advance past 127 we can just directly serialize as a byte + bytes[off + SERIALIZATION_FORMAT_OFFSET] = (byte) Translog.Index.SERIALIZATION_FORMAT; + ByteUtils.writeLongBE(index.version(), bytes, off + VERSION_OFFSET); + ByteUtils.writeLongBE(index.seqNo(), bytes, off + SEQ_NO_OFFSET); + ByteUtils.writeLongBE(index.primaryTerm(), bytes, off + PRIMARY_TERM_OFFSET); + ByteUtils.writeLongBE(index.getAutoGeneratedIdTimestamp(), bytes, off + INDEX_AUTO_GENERATED_ID_TIMESTAMP_OFFSET); + StreamOutput.putVInt(bytes, uid.length, off + INDEX_UID_LENGTH_OFFSET); + System.arraycopy(uid.bytes, uid.offset, bytes, off + INDEX_UID_LENGTH_OFFSET + uidVIntLen, uid.length); + bytes[off + INDEX_UID_LENGTH_OFFSET + uidVIntLen + uid.length] = index.routing() == null ? (byte) 0 : (byte) 1; + + long variableLengthStart = buffer.position(); + // Write variable length items in header + if (routing != null) { + buffer.writeString(routing); + } + + BytesReference source = index.source(); + int sourceLength = source == null ? 0 : source.length(); + // We write this so that we have a fully serialized header ready to append the source to. This allows us to fully calculate the + // checksum + buffer.writeVInt(sourceLength); + + int variableLengthSize = (int) (buffer.position() - variableLengthStart); + int sizeOfOperation = FIXED_INDEX_HEADER_SIZE - Integer.BYTES + uidVIntLen + uid.length + variableLengthSize + sourceLength + + Integer.BYTES; + ByteUtils.writeIntBE(sizeOfOperation, bytes, off); + } + + private static void writeSlowIndexHeader(RecyclerBytesStreamOutput buffer, Translog.Index index) throws IOException { + final long start = buffer.position(); + buffer.skip(Integer.BYTES); + buffer.writeByte(Translog.Operation.Type.INDEX.id()); + index.writeHeader(Translog.Index.SERIALIZATION_FORMAT, buffer); + final long end = buffer.position(); + // The total operation size is the header size + source size + 4 bytes for checksum + final int operationSize = (int) (end - Integer.BYTES - start) + index.source().length() + Integer.BYTES; + buffer.seek(start); + buffer.writeInt(operationSize); + buffer.seek(end); + } + + public static void writeDeleteHeader(RecyclerBytesStreamOutput buffer, Translog.Delete delete) throws IOException { + int uidLen = delete.uid().length; + int uidVIntLen = RecyclerBytesStreamOutput.vIntLength(uidLen); + BytesRef page = buffer.tryGetPageForWrite(FIXED_DELETE_HEADER_SIZE + uidLen + uidVIntLen); + if (page != null) { + writeFastDeleteHeader(delete, page, uidVIntLen); + } else { + writeSlowDeleteHeader(buffer, delete); + } + } + + private static void writeFastDeleteHeader(Translog.Delete delete, BytesRef page, int uidVIntLen) throws IOException { + BytesRef uid = delete.uid(); + + int off = page.offset; + byte[] bytes = page.bytes; + bytes[off + OPERATION_TYPE_OFFSET] = Translog.Operation.Type.DELETE.id(); + // This is technically a vInt in the serialization, but until we advance past 127 we can just directly serialize as a byte + bytes[off + SERIALIZATION_FORMAT_OFFSET] = (byte) Translog.Delete.SERIALIZATION_FORMAT; + ByteUtils.writeLongBE(delete.version(), bytes, off + VERSION_OFFSET); + ByteUtils.writeLongBE(delete.seqNo(), bytes, off + SEQ_NO_OFFSET); + ByteUtils.writeLongBE(delete.primaryTerm(), bytes, off + PRIMARY_TERM_OFFSET); + StreamOutput.putVInt(bytes, uid.length, off + FIXED_DELETE_HEADER_SIZE); + System.arraycopy(uid.bytes, uid.offset, bytes, off + FIXED_DELETE_HEADER_SIZE + uidVIntLen, uid.length); + + int sizeOfOperation = FIXED_DELETE_HEADER_SIZE - Integer.BYTES + uidVIntLen + uid.length + Integer.BYTES; + ByteUtils.writeIntBE(sizeOfOperation, bytes, off); + } + + private static void writeSlowDeleteHeader(RecyclerBytesStreamOutput buffer, Translog.Delete delete) throws IOException { + final long start = buffer.position(); + buffer.skip(Integer.BYTES); + buffer.writeByte(Translog.Operation.Type.DELETE.id()); + delete.writeHeader(Translog.Delete.SERIALIZATION_FORMAT, buffer); + final long end = buffer.position(); + // The total operation size is the header size + 4 bytes for checksum + final int operationSize = (int) (end - Integer.BYTES - start) + Integer.BYTES; + buffer.seek(start); + buffer.writeInt(operationSize); + buffer.seek(end); + } + + public static void writeNoOpHeader(RecyclerBytesStreamOutput buffer, Translog.NoOp noop) throws IOException { + BytesRef bytesRef = buffer.tryGetPageForWrite(FIXED_NO_OP_HEADER_SIZE); + if (bytesRef != null) { + int off = bytesRef.offset; + byte[] bytes = bytesRef.bytes; + bytes[off + OPERATION_TYPE_OFFSET] = Translog.Operation.Type.NO_OP.id(); + ByteUtils.writeLongBE(noop.seqNo(), bytes, off + 5); + ByteUtils.writeLongBE(noop.primaryTerm(), bytes, off + 13); + + long variableLengthStart = buffer.position(); + // Write variable length items in header + buffer.writeString(noop.reason()); + int variableLengthSize = (int) (buffer.position() - variableLengthStart); + // The total operation size is the header size + 4 bytes for checksum + int sizeOfOperation = FIXED_NO_OP_HEADER_SIZE - Integer.BYTES + variableLengthSize + Integer.BYTES; + ByteUtils.writeIntBE(sizeOfOperation, bytes, off); + } else { + writeSlowNoOpHeader(buffer, noop); + } + } + + private static void writeSlowNoOpHeader(RecyclerBytesStreamOutput buffer, Translog.NoOp noop) throws IOException { + final long start = buffer.position(); + buffer.skip(Integer.BYTES); + buffer.writeByte(Translog.Operation.Type.NO_OP.id()); + // No versioning for no-op + noop.writeHeader(-1, buffer); + final long end = buffer.position(); + // The total operation size is the header size + 4 bytes for checksum + final int operationSize = (int) (end - Integer.BYTES - start) + Integer.BYTES; + buffer.seek(start); + buffer.writeInt(operationSize); + buffer.seek(end); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 1ad32acf2bf92..7591be5a87aca 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -228,14 +228,14 @@ private synchronized void closeWithTragicEvent(final Exception ex) { } /** - * Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to. + * Add the serialized operation to the translog with the specified sequence number; returns the location the operation was written to. * - * @param data the bytes to write + * @param operation the serialized operation to write * @param seqNo the sequence number associated with the operation * @return the location the bytes were written to * @throws IOException if writing to the translog resulted in an I/O exception */ - public Translog.Location add(final BytesReference data, final long seqNo) throws IOException { + public Translog.Location add(final Translog.Serialized operation, final long seqNo) throws IOException { long bufferedBytesBeforeAdd = this.bufferedBytes; if (bufferedBytesBeforeAdd >= forceWriteThreshold) { writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4); @@ -249,8 +249,8 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws } assert bufferedBytes == buffer.size(); final long offset = totalOffset; - totalOffset += data.length(); - data.writeTo(buffer); + totalOffset += operation.length(); + operation.writeToTranslogBuffer(buffer); assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0; @@ -262,20 +262,24 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws operationCounter++; - assert assertNoSeqNumberConflict(seqNo, data); + assert assertNoSeqNumberConflict(seqNo, operation); - location = new Translog.Location(generation, offset, data.length()); - operationListener.operationAdded(data, seqNo, location); + location = new Translog.Location(generation, offset, operation.length()); + operationListener.operationAdded(operation, seqNo, location); bufferedBytes = buffer.size(); } return location; } - private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException { + private synchronized boolean assertNoSeqNumberConflict(long seqNo, Translog.Serialized serialized) throws IOException { if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { // nothing to do - } else if (seenSequenceNumbers.containsKey(seqNo)) { + return true; + } + + BytesReference data = serialized.toBytesReference(); + if (seenSequenceNumbers.containsKey(seqNo)) { final Tuple previous = seenSequenceNumbers.get(seqNo); if (previous.v1().equals(data) == false) { Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput(), "assertion")); diff --git a/server/src/main/resources/transport/definitions/referable/reordered_translog_operations.csv b/server/src/main/resources/transport/definitions/referable/reordered_translog_operations.csv new file mode 100644 index 0000000000000..ce3efd6a04801 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/reordered_translog_operations.csv @@ -0,0 +1 @@ +9190000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index b947ec1f1d1ce..c13b776b0c3a3 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -esql_resolve_fields_response_created,9189000 +reordered_translog_operations,9190000 diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index cfce840cda9f0..fa951d6473153 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1678,7 +1678,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { Translog.Operation op = ops.get(seqno); if (op != null) { assertThat(op, instanceOf(Translog.Index.class)); - assertThat(msg, ((Translog.Index) op).id(), is(in(liveDocs))); + assertThat(msg, Uid.decodeId(((Translog.Index) op).uid()), is(in(liveDocs))); assertEquals(msg, ((Translog.Index) op).source(), B_1); } } else { @@ -1766,7 +1766,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc Translog.Operation op = ops.get(seqno); if (op != null) { assertThat(op, instanceOf(Translog.Index.class)); - assertThat(msg, ((Translog.Index) op).id(), is(in(liveDocs))); + assertThat(msg, Uid.decodeId(((Translog.Index) op).uid()), is(in(liveDocs))); } } else { Translog.Operation op = ops.get(seqno); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 9aa847c837e96..5b3f964813905 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -12,7 +12,6 @@ import org.apache.lucene.store.ByteArrayDataOutput; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasable; @@ -28,6 +27,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.zip.CRC32; import static org.hamcrest.Matchers.equalTo; @@ -103,10 +103,11 @@ private Tuple, TranslogWriter> createReadersAndWriter() thr byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + BytesArray header = new BytesArray(new byte[] { 'h', 'e', 'a', 'd', 'e', 'r' }); for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { out.reset(bytes); out.writeInt(ops); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); + writer.add(Translog.Serialized.create(header, new BytesArray(bytes), new CRC32()), ops); } } return new Tuple<>(readers, writer); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderWriterTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderWriterTests.java new file mode 100644 index 0000000000000..9e3f521f21522 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogHeaderWriterTests.java @@ -0,0 +1,231 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.translog; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.BytesRefRecycler; + +import java.io.IOException; +import java.util.zip.CRC32; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests to ensure that the multistep optimized TranslogStreamOutput serialization matches the standard + * StreamOutput version. + */ +public class TranslogHeaderWriterTests extends ESTestCase { + + public void testIndexOperationSerializationMatches() throws IOException { + RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + + for (int i = 0; i < 30; i++) { + // Test both the fast path (single page) and slow page (cross page) + int offset; + if (randomBoolean()) { + offset = 0; + } else { + offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1; + } + headerOutput.seek(offset); + fullOperationOutput.seek(offset); + + BytesArray source = new BytesArray(randomByteArrayOfLength(randomIntBetween(50, 4196))); + Translog.Index index = getIndex(source); + + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput); + Translog.writeOperationNoSize(output, index); + BytesReference expectedWithoutSize = bytesStreamOutput.bytes(); + + TranslogHeaderWriter.writeIndexHeader(headerOutput, index); + serializationMatches(headerOutput.bytes(), source, offset, fullOperationOutput, expectedWithoutSize); + } + } + + public void testIndexOperationSerializationMatchesPreReorderedOperation() throws IOException { + RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + + for (int i = 0; i < 30; i++) { + // Test both the fast path (single page) and slow page (cross page) + int offset; + if (randomBoolean()) { + offset = 0; + } else { + offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1; + } + headerOutput.seek(offset); + fullOperationOutput.seek(offset); + + BytesArray source = new BytesArray(randomByteArrayOfLength(randomIntBetween(50, 4196))); + Translog.Index index = getIndex(source); + + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput); + output.setTransportVersion(TransportVersion.fromId(Translog.REORDERED_TRANSLOG_OPERATIONS.id() - 1)); + Translog.writeOperationNoSize(output, index); + BytesReference expectedWithoutSize = bytesStreamOutput.bytes(); + + TranslogHeaderWriter.writeIndexHeader(headerOutput, index); + operationMatches(headerOutput.bytes(), source, offset, fullOperationOutput, expectedWithoutSize); + } + } + + private static Translog.Index getIndex(BytesReference source) { + String id = randomAlphaOfLength(20); + long seqNo = randomLongBetween(0, Long.MAX_VALUE); + long primaryTerm = randomLongBetween(0, Long.MAX_VALUE); + long version = randomLongBetween(0, Long.MAX_VALUE); + String routing = randomAlphaOfLength(20); + long autoGeneratedIdTimestamp = randomLongBetween(0, Long.MAX_VALUE); + return new Translog.Index(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp); + } + + public void testDeleteOperationSerializationMatches() throws IOException { + RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + + for (int i = 0; i < 30; i++) { + // Test both the fast path (single page) and slow page (cross page) + int offset; + if (randomBoolean()) { + offset = 0; + } else { + offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1; + } + headerOutput.seek(offset); + fullOperationOutput.seek(offset); + + Translog.Delete delete = getDelete(); + + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput); + Translog.writeOperationNoSize(output, delete); + BytesReference expectedWithoutSize = bytesStreamOutput.bytes(); + + TranslogHeaderWriter.writeDeleteHeader(headerOutput, delete); + serializationMatches(headerOutput.bytes(), null, offset, fullOperationOutput, expectedWithoutSize); + } + } + + public void testDeleteOperationSerializationMatchesPreReorderedOperation() throws IOException { + RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + + for (int i = 0; i < 30; i++) { + // Test both the fast path (single page) and slow page (cross page) + int offset; + if (randomBoolean()) { + offset = 0; + } else { + offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1; + } + headerOutput.seek(offset); + fullOperationOutput.seek(offset); + + Translog.Delete delete = getDelete(); + + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput); + output.setTransportVersion(TransportVersion.fromId(Translog.REORDERED_TRANSLOG_OPERATIONS.id() - 1)); + Translog.writeOperationNoSize(output, delete); + BytesReference expectedWithoutSize = bytesStreamOutput.bytes(); + + TranslogHeaderWriter.writeDeleteHeader(headerOutput, delete); + operationMatches(headerOutput.bytes(), null, offset, fullOperationOutput, expectedWithoutSize); + } + } + + private static Translog.Delete getDelete() { + String id = randomAlphaOfLength(20); + long seqNo = randomLongBetween(0, Long.MAX_VALUE); + long primaryTerm = randomLongBetween(0, Long.MAX_VALUE); + long version = randomLongBetween(0, Long.MAX_VALUE); + Translog.Delete delete = new Translog.Delete(id, seqNo, primaryTerm, version); + return delete; + } + + public void testNoOpOperationSerializationMatches() throws IOException { + RecyclerBytesStreamOutput headerOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + RecyclerBytesStreamOutput fullOperationOutput = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE); + + for (int i = 0; i < 30; i++) { + // Test both the fast path (single page) and slow page (cross page) + int offset; + if (randomBoolean()) { + offset = 0; + } else { + offset = BytesRefRecycler.NON_RECYCLING_INSTANCE.pageSize() - 1; + } + headerOutput.seek(offset); + fullOperationOutput.seek(offset); + + long seqNo = randomLongBetween(0, Long.MAX_VALUE); + long primaryTerm = randomLongBetween(0, Long.MAX_VALUE); + String reason = randomAlphaOfLength(20); + Translog.NoOp noOp = new Translog.NoOp(seqNo, primaryTerm, reason); + + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + BufferedChecksumStreamOutput output = new BufferedChecksumStreamOutput(bytesStreamOutput); + Translog.writeOperationNoSize(output, noOp); + BytesReference expectedWithoutSize = bytesStreamOutput.bytes(); + + TranslogHeaderWriter.writeNoOpHeader(headerOutput, noOp); + serializationMatches(headerOutput.bytes(), null, offset, fullOperationOutput, expectedWithoutSize); + } + } + + private static void serializationMatches( + BytesReference headerBytes, + @Nullable BytesReference source, + int offset, + RecyclerBytesStreamOutput operationOutput, + BytesReference expectedWithoutSize + ) throws IOException { + Translog.Serialized serialized = Translog.Serialized.create( + headerBytes.slice(offset, headerBytes.length() - offset), + source, + new CRC32() + ); + serialized.writeToTranslogBuffer(operationOutput); + + BytesReference actualWithSize = operationOutput.bytes(); + assertThat(actualWithSize.slice(4 + offset, actualWithSize.length() - offset - 4), equalTo(expectedWithoutSize)); + } + + private static void operationMatches( + BytesReference headerBytes, + BytesReference source, + int offset, + RecyclerBytesStreamOutput fullOperationOutput, + BytesReference expectedWithoutSize + ) throws IOException { + Translog.Serialized serialized = Translog.Serialized.create( + headerBytes.slice(offset, headerBytes.length() - offset), + source, + new CRC32() + ); + serialized.writeToTranslogBuffer(fullOperationOutput); + + BytesReference actualWithSize = fullOperationOutput.bytes(); + StreamInput actual = actualWithSize.slice(4 + offset, actualWithSize.length() - offset - 4).streamInput(); + + assertThat(Translog.Operation.readOperation(actual), equalTo(Translog.Operation.readOperation(expectedWithoutSize.streamInput()))); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 8032eda2dba1a..62e609c5b9925 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -33,10 +33,10 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.DiskIoBufferPool; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -68,6 +68,7 @@ import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.BytesRefRecycler; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -147,6 +148,8 @@ @LuceneTestCase.SuppressFileSystems("ExtrasFS") public class TranslogTests extends ESTestCase { + private static final BytesArray HEADER = new BytesArray(new byte[] { 'h', 'e', 'a', 'd', 'e', 'r' }); + public static final DiskIoBufferPool RANDOMIZING_IO_BUFFERS = new DiskIoBufferPool() { @Override public ByteBuffer maybeGetDirectIOBuffer() { @@ -399,7 +402,7 @@ public void testSimpleOperations() throws IOException { Translog.Delete delete = (Translog.Delete) snapshot.next(); assertNotNull(delete); - assertThat(delete.id(), equalTo("2")); + assertThat(Uid.decodeId(delete.uid()), equalTo("2")); Translog.NoOp noOp = (Translog.NoOp) snapshot.next(); assertNotNull(noOp); @@ -482,9 +485,9 @@ public void testStats() throws Exception { waitForPositiveAge(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(156L + sourceLength)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(157L + sourceLength)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(101L + sourceLength)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(102L + sourceLength)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -493,9 +496,9 @@ public void testStats() throws Exception { waitForPositiveAge(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(192L + sourceLength)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(194L + sourceLength)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(137L + sourceLength)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(139L + sourceLength)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -504,9 +507,9 @@ public void testStats() throws Exception { waitForPositiveAge(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(228L + sourceLength)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(231L + sourceLength)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(173L + sourceLength)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(176L + sourceLength)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -515,9 +518,9 @@ public void testStats() throws Exception { waitForPositiveAge(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(270L + sourceLength)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(273L + sourceLength)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(215L + sourceLength)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(218L + sourceLength)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -526,9 +529,9 @@ public void testStats() throws Exception { waitForPositiveAge(); final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(325L + sourceLength)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(328L + sourceLength)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(270L + sourceLength)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(273L + sourceLength)); assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } @@ -538,7 +541,7 @@ public void testStats() throws Exception { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(325L + sourceLength)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(328L + sourceLength)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); @@ -553,7 +556,7 @@ public void testStats() throws Exception { "uncommitted_size_in_bytes": %s, "earliest_last_modified_age": %s } - }""", 325L + sourceLength, 270L + sourceLength, stats.getEarliestLastModifiedAge())))); + }""", 328L + sourceLength, 273L + sourceLength, stats.getEarliestLastModifiedAge())))); } } translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE)); @@ -870,7 +873,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { case INDEX -> { Translog.Index indexOp = (Translog.Index) op; Translog.Index expIndexOp = (Translog.Index) expectedOp; - assertEquals(expIndexOp.id(), indexOp.id()); + assertEquals(expIndexOp.uid(), indexOp.uid()); assertEquals(expIndexOp.routing(), indexOp.routing()); assertEquals(expIndexOp.source(), indexOp.source()); assertEquals(expIndexOp.version(), indexOp.version()); @@ -878,7 +881,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { case DELETE -> { Translog.Delete delOp = (Translog.Delete) op; Translog.Delete expDelOp = (Translog.Delete) expectedOp; - assertEquals(expDelOp.id(), delOp.id()); + assertEquals(expDelOp.uid(), delOp.uid()); assertEquals(expDelOp.version(), delOp.version()); } case NO_OP -> { @@ -1326,6 +1329,8 @@ public void testTranslogWriter() throws IOException { final int numOps = scaledRandomIntBetween(8, 250000); final Set seenSeqNos = new HashSet<>(); boolean opsHaveValidSequenceNumbers = randomBoolean(); + // The size is the header length + data size (int for this test) + the checksum int + int opSize = HEADER.length() + 4 + 4; for (int i = 0; i < numOps; i++) { byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); @@ -1338,7 +1343,7 @@ public void testTranslogWriter() throws IOException { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { seenSeqNos.add(seqNo); } - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), seqNo); } assertThat(persistedSeqNos, empty()); writer.sync(); @@ -1349,10 +1354,10 @@ public void testTranslogWriter() throws IOException { ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); for (int i = 0; i < numOps; i++) { - ByteBuffer buffer = ByteBuffer.allocate(4); - reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); + ByteBuffer buffer = ByteBuffer.allocate(opSize); + reader.readBytes(buffer, reader.getFirstOperationOffset() + opSize * i); buffer.flip(); - final int value = buffer.getInt(); + final int value = buffer.getInt(HEADER.length()); assertEquals(i, value); } final long minSeqNo = seenSeqNos.stream().min(Long::compareTo).orElse(SequenceNumbers.NO_OPS_PERFORMED); @@ -1363,12 +1368,12 @@ public void testTranslogWriter() throws IOException { byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); out.writeInt(2048); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), randomNonNegativeLong()); if (reader instanceof TranslogReader) { - ByteBuffer buffer = ByteBuffer.allocate(4); + ByteBuffer buffer = ByteBuffer.allocate(opSize); try { - reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * numOps); + reader.readBytes(buffer, reader.getFirstOperationOffset() + opSize * numOps); fail("read past EOF?"); } catch (EOFException ex) { // expected @@ -1376,11 +1381,11 @@ public void testTranslogWriter() throws IOException { ((TranslogReader) reader).close(); } else { // live reader! - ByteBuffer buffer = ByteBuffer.allocate(4); - final long pos = reader.getFirstOperationOffset() + 4 * numOps; + ByteBuffer buffer = ByteBuffer.allocate(opSize); + final long pos = reader.getFirstOperationOffset() + opSize * numOps; reader.readBytes(buffer, pos); buffer.flip(); - final int value = buffer.getInt(); + final int value = buffer.getInt(HEADER.length()); assertEquals(2048, value); } IOUtils.close(writer); @@ -1459,16 +1464,17 @@ ChannelFactory getChannelFactory() { TranslogWriter writer = translog.getCurrent(); int initialWriteCalls = writeCalls.get(); byte[] bytes = new byte[256]; - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + int opSize = HEADER.length() + bytes.length + 4; + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 1); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 2); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 3); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 4); assertThat(persistedSeqNos, empty()); assertEquals(initialWriteCalls, writeCalls.get()); if (randomBoolean()) { // Since the buffer is full, this will flush before performing the add. - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 5); assertThat(persistedSeqNos, empty()); assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); } else { @@ -1478,7 +1484,7 @@ ChannelFactory getChannelFactory() { assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); // Add after we the read flushed the buffer - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 5); } writer.sync(); @@ -1574,10 +1580,11 @@ ChannelFactory getChannelFactory() { ) { TranslogWriter writer = translog.getCurrent(); + int opSize = HEADER.length() + 4 + 4; byte[] bytes = new byte[4]; DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(new byte[4])); out.writeInt(1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 1); assertThat(persistedSeqNos, empty()); startBlocking.set(true); Thread thread = new Thread(() -> { @@ -1591,7 +1598,7 @@ ChannelFactory getChannelFactory() { writeStarted.await(); // Add will not block even though we are currently writing/syncing - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), 2); blocker.countDown(); // Sync against so that both operations are written @@ -1643,12 +1650,12 @@ public void testTranslogOperationListener() throws IOException { final ArrayList seqNos = new ArrayList<>(); final ArrayList locations = new ArrayList<>(); final ArrayList datas = new ArrayList<>(); - OperationListener listener = (data, seqNo, location) -> { + OperationListener listener = (operation, seqNo, location) -> { seqNos.add(seqNo); locations.add(location); - try (BytesStreamOutput output = new BytesStreamOutput()) { + try (RecyclerBytesStreamOutput output = new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE)) { try { - data.writeTo(output); + operation.writeToTranslogBuffer(output); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -1686,13 +1693,14 @@ public void testTranslogOperationListener() throws IOException { } public void testCloseIntoReader() throws IOException { + int opSize = HEADER.length() + 4 + 4; try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { final int numOps = randomIntBetween(8, 128); for (int i = 0; i < numOps; i++) { final byte[] bytes = new byte[4]; final DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); out.writeInt(i); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(new Translog.Serialized(HEADER, new BytesArray(bytes), opSize, 0), randomNonNegativeLong()); } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint(); @@ -1703,10 +1711,10 @@ public void testCloseIntoReader() throws IOException { reader = translog.openReader(reader.path(), writerCheckpoint); } for (int i = 0; i < numOps; i++) { - final ByteBuffer buffer = ByteBuffer.allocate(4); - reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); + final ByteBuffer buffer = ByteBuffer.allocate(opSize); + reader.readBytes(buffer, reader.getFirstOperationOffset() + opSize * i); buffer.flip(); - final int value = buffer.getInt(); + final int value = buffer.getInt(HEADER.length()); assertEquals(i, value); } final Checkpoint readerCheckpoint = reader.getCheckpoint(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index 33c745de25438..841e7f9f8542d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -19,6 +19,7 @@ import org.elasticsearch.index.mapper.MapperRegistry; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; @@ -97,7 +98,12 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O final Translog.Index index = (Translog.Index) operation; final Engine.Index engineIndex = IndexShard.prepareIndex( mapperService, - new SourceToParse(index.id(), index.source(), XContentHelper.xContentType(index.source()), index.routing()), + new SourceToParse( + Uid.decodeId(index.uid()), + index.source(), + XContentHelper.xContentType(index.source()), + index.routing() + ), index.seqNo(), index.primaryTerm(), index.version(), @@ -114,7 +120,7 @@ public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.O case DELETE -> { final Translog.Delete delete = (Translog.Delete) operation; return IndexShard.prepareDelete( - delete.id(), + Uid.decodeId(delete.uid()), delete.seqNo(), delete.primaryTerm(), delete.version(), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 9a6e187e5544e..47c2de146d4fc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -138,7 +138,7 @@ public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operat case INDEX -> { final Translog.Index index = (Translog.Index) operation; operationWithPrimaryTerm = new Translog.Index( - index.id(), + index.uid(), index.seqNo(), primaryTerm, index.version(), @@ -149,7 +149,7 @@ public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operat } case DELETE -> { final Translog.Delete delete = (Translog.Delete) operation; - operationWithPrimaryTerm = new Translog.Delete(delete.id(), delete.seqNo(), primaryTerm, delete.version()); + operationWithPrimaryTerm = new Translog.Delete(delete.uid(), delete.seqNo(), primaryTerm, delete.version()); } case NO_OP -> { final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java index 78d997ef9d777..7873f5aee6bdd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.shard.ShardId; @@ -65,15 +66,15 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { assertThat(response.getOperations().length, equalTo(3)); Translog.Index operation = (Translog.Index) response.getOperations()[0]; assertThat(operation.seqNo(), equalTo(0L)); - assertThat(operation.id(), equalTo("1")); + assertThat(Uid.decodeId(operation.uid()), equalTo("1")); operation = (Translog.Index) response.getOperations()[1]; assertThat(operation.seqNo(), equalTo(1L)); - assertThat(operation.id(), equalTo("2")); + assertThat(Uid.decodeId(operation.uid()), equalTo("2")); operation = (Translog.Index) response.getOperations()[2]; assertThat(operation.seqNo(), equalTo(2L)); - assertThat(operation.id(), equalTo("3")); + assertThat(Uid.decodeId(operation.uid()), equalTo("3")); prepareIndex("index").setId("3").setSource("{}", XContentType.JSON).get(); prepareIndex("index").setId("4").setSource("{}", XContentType.JSON).get(); @@ -90,15 +91,15 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { assertThat(response.getOperations().length, equalTo(3)); operation = (Translog.Index) response.getOperations()[0]; assertThat(operation.seqNo(), equalTo(3L)); - assertThat(operation.id(), equalTo("3")); + assertThat(Uid.decodeId(operation.uid()), equalTo("3")); operation = (Translog.Index) response.getOperations()[1]; assertThat(operation.seqNo(), equalTo(4L)); - assertThat(operation.id(), equalTo("4")); + assertThat(Uid.decodeId(operation.uid()), equalTo("4")); operation = (Translog.Index) response.getOperations()[2]; assertThat(operation.seqNo(), equalTo(5L)); - assertThat(operation.id(), equalTo("5")); + assertThat(Uid.decodeId(operation.uid()), equalTo("5")); } public void testMissingOperations() throws Exception {