diff --git a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 5e4da2d6d..a64d9ffd0 100644 --- a/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -57,7 +57,6 @@ import io.aiven.kafka.tieredstorage.fetch.KeyNotFoundRuntimeException; import io.aiven.kafka.tieredstorage.fetch.cache.DiskChunkCache; import io.aiven.kafka.tieredstorage.fetch.cache.MemoryChunkCache; -import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; import io.aiven.kafka.tieredstorage.manifest.serde.EncryptionSerdeModule; @@ -480,16 +479,15 @@ void testTransformingIndexes(final boolean encryption) { "storage.root", targetDir.toString(), "encryption.enabled", Boolean.toString(encryption) )); - final SegmentEncryptionMetadataV1 encryptionMetadata; + final DataKeyAndAAD maybeEncryptionKey; if (encryption) { config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); - final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); } else { - encryptionMetadata = null; + maybeEncryptionKey = null; } rsm.configure(config); @@ -499,7 +497,7 @@ void testTransformingIndexes(final boolean encryption) { IndexType.OFFSET, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); assertThat(is).isNotEmpty(); @@ -511,21 +509,21 @@ void testTransformingIndexes(final boolean encryption) { IndexType.TIMESTAMP, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.LEADER_EPOCH, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.PRODUCER_SNAPSHOT, new ByteArrayInputStream(bytes), bytes.length, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); final var index = segmentIndexBuilder.build(); @@ -545,14 +543,15 @@ void testTransformingEmptyIndexes(final boolean encryption) { "storage.root", targetDir.toString(), "encryption.enabled", Boolean.toString(encryption) )); - SegmentEncryptionMetadataV1 encryptionMetadata = null; + final DataKeyAndAAD maybeEncryptionKey; if (encryption) { config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString()); config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString()); - final var dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); + } else { + maybeEncryptionKey = null; } rsm.configure(config); @@ -561,7 +560,7 @@ void testTransformingEmptyIndexes(final boolean encryption) { IndexType.OFFSET, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); assertThat(is).isEmpty(); @@ -572,21 +571,21 @@ void testTransformingEmptyIndexes(final boolean encryption) { IndexType.TIMESTAMP, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.LEADER_EPOCH, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); rsm.transformIndex( IndexType.PRODUCER_SNAPSHOT, InputStream.nullInputStream(), 0, - encryptionMetadata, + maybeEncryptionKey, segmentIndexBuilder ); final var index = segmentIndexBuilder.build(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index b96484044..ddd2ebf0f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -57,7 +57,6 @@ import io.aiven.kafka.tieredstorage.fetch.index.MemorySegmentIndexesCache; import io.aiven.kafka.tieredstorage.fetch.index.SegmentIndexesCache; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; -import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndex; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1Builder; @@ -223,36 +222,46 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat final long startedMs = time.milliseconds(); try { - SegmentEncryptionMetadataV1 encryptionMetadata = null; final boolean requiresCompression = requiresCompression(logSegmentData); - - final ChunkIndex chunkIndex; - try (final InputStream logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { - TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( - logSegmentInputStream, chunkSize); - if (requiresCompression) { - transformEnum = new CompressionChunkEnumeration(transformEnum); - } - if (encryptionEnabled) { - final DataKeyAndAAD dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD(); - transformEnum = new EncryptionChunkEnumeration( - transformEnum, - () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD)); - encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); - } - final TransformFinisher transformFinisher = - new TransformFinisher(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes()); - uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher, customMetadataBuilder); - chunkIndex = transformFinisher.chunkIndex(); + final DataKeyAndAAD maybeEncryptionKey; + if (encryptionEnabled) { + maybeEncryptionKey = aesEncryptionProvider.createDataKeyAndAAD(); + } else { + maybeEncryptionKey = null; } + // upload segment + final ChunkIndex chunkIndex = uploadSegmentLog( + remoteLogSegmentMetadata, + logSegmentData, + requiresCompression, + maybeEncryptionKey, + customMetadataBuilder + ); + + // upload indexes final SegmentIndexesV1 segmentIndexes = uploadIndexes( - remoteLogSegmentMetadata, logSegmentData, encryptionMetadata, customMetadataBuilder); - final SegmentManifest segmentManifest = new SegmentManifestV1( - chunkIndex, segmentIndexes, requiresCompression, encryptionMetadata, remoteLogSegmentMetadata); - uploadManifest(remoteLogSegmentMetadata, segmentManifest, customMetadataBuilder); + remoteLogSegmentMetadata, + logSegmentData, + maybeEncryptionKey, + customMetadataBuilder + ); + // upload manifest + uploadManifest( + remoteLogSegmentMetadata, + chunkIndex, + segmentIndexes, + requiresCompression, + maybeEncryptionKey, + customMetadataBuilder); } catch (final Exception e) { + try { + // best effort on removing orphan files + tryDeleteSegmentObjects(remoteLogSegmentMetadata); + } catch (final Exception ignored) { + // ignore all exceptions + } throw new RemoteStorageException(e); } @@ -267,10 +276,81 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat return customMetadata; } - private SegmentIndexesV1 uploadIndexes( + boolean requiresCompression(final LogSegmentData logSegmentData) { + boolean requiresCompression = false; + if (compressionEnabled) { + if (compressionHeuristic) { + try { + final File segmentFile = logSegmentData.logSegment().toFile(); + final boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile); + requiresCompression = !alreadyCompressed; + } catch (final InvalidRecordBatchException e) { + // Log and leave value as false to upload uncompressed. + log.warn("Failed to check compression on log segment: {}", logSegmentData.logSegment(), e); + } + } else { + requiresCompression = true; + } + } + return requiresCompression; + } + + ChunkIndex uploadSegmentLog( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final LogSegmentData logSegmentData, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey, + final SegmentCustomMetadataBuilder customMetadataBuilder + ) throws IOException, StorageBackendException { + final var fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); + + try (final var logSegmentInputStream = Files.newInputStream(logSegmentData.logSegment())) { + final var transformEnum = transformation(logSegmentInputStream, requiresCompression, maybeEncryptionKey); + final var transformFinisher = new TransformFinisher( + transformEnum, + remoteLogSegmentMetadata.segmentSizeInBytes() + ); + + try (final var sis = transformFinisher.toInputStream()) { + final var bytes = uploader.upload(sis, fileKey); + metrics.recordObjectUpload( + remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), + ObjectKeyFactory.Suffix.LOG, + bytes + ); + customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); + + log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); + } + return transformFinisher.chunkIndex(); + } + } + + private TransformChunkEnumeration transformation( + final InputStream logSegmentInputStream, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey + ) { + TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration( + logSegmentInputStream, + chunkSize + ); + if (requiresCompression) { + transformEnum = new CompressionChunkEnumeration(transformEnum); + } + if (encryptionEnabled) { + transformEnum = new EncryptionChunkEnumeration( + transformEnum, + () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) + ); + } + return transformEnum; + } + + SegmentIndexesV1 uploadIndexes( final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final LogSegmentData segmentData, - final SegmentEncryptionMetadataV1 encryptionMeta, + final DataKeyAndAAD maybeEncryptionKey, final SegmentCustomMetadataBuilder customMetadataBuilder ) throws IOException, RemoteStorageException, StorageBackendException { final List indexes = new ArrayList<>(IndexType.values().length); @@ -281,7 +361,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.OFFSET, closableInputStreamHolder.add(Files.newInputStream(segmentData.offsetIndex())), indexSize(segmentData.offsetIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(offsetIndex); @@ -289,7 +369,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.TIMESTAMP, closableInputStreamHolder.add(Files.newInputStream(segmentData.timeIndex())), indexSize(segmentData.timeIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(timeIndex); @@ -297,7 +377,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.PRODUCER_SNAPSHOT, closableInputStreamHolder.add(Files.newInputStream(segmentData.producerSnapshotIndex())), indexSize(segmentData.producerSnapshotIndex()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(producerSnapshotIndex); @@ -305,7 +385,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.LEADER_EPOCH, closableInputStreamHolder.add(new ByteBufferInputStream(segmentData.leaderEpochIndex())), segmentData.leaderEpochIndex().remaining(), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(leaderEpoch); @@ -314,7 +394,7 @@ private SegmentIndexesV1 uploadIndexes( IndexType.TRANSACTION, closableInputStreamHolder.add(Files.newInputStream(segmentData.transactionIndex().get())), indexSize(segmentData.transactionIndex().get()), - encryptionMeta, + maybeEncryptionKey, segmentIndexBuilder ); indexes.add(transactionIndex); @@ -361,56 +441,19 @@ private Optional buildCustomMetadata(final SegmentCustomMetadata } } - boolean requiresCompression(final LogSegmentData logSegmentData) { - boolean requiresCompression = false; - if (compressionEnabled) { - if (compressionHeuristic) { - try { - final File segmentFile = logSegmentData.logSegment().toFile(); - final boolean alreadyCompressed = SegmentCompressionChecker.check(segmentFile); - requiresCompression = !alreadyCompressed; - } catch (final InvalidRecordBatchException e) { - // Log and leave value as false to upload uncompressed. - log.warn("Failed to check compression on log segment: {}", logSegmentData.logSegment(), e); - } - } else { - requiresCompression = true; - } - } - return requiresCompression; - } - - private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final TransformFinisher transformFinisher, - final SegmentCustomMetadataBuilder customMetadataBuilder) - throws IOException, StorageBackendException { - final ObjectKey fileKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.LOG); - try (final var sis = transformFinisher.toInputStream()) { - final var bytes = uploader.upload(sis, fileKey); - metrics.recordObjectUpload( - remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition().topicPartition(), - ObjectKeyFactory.Suffix.LOG, - bytes - ); - customMetadataBuilder.addUploadResult(ObjectKeyFactory.Suffix.LOG, bytes); - - log.debug("Uploaded segment log for {}, size: {}", remoteLogSegmentMetadata, bytes); - } - } - InputStream transformIndex(final IndexType indexType, final InputStream index, final int size, - final SegmentEncryptionMetadata encryptionMetadata, + final DataKeyAndAAD maybeEncryptionKey, final SegmentIndexesV1Builder segmentIndexBuilder) { log.debug("Transforming index {} with size {}", indexType, size); if (size > 0) { TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(index, size); if (encryptionEnabled) { - final var dataKeyAndAAD = new DataKeyAndAAD(encryptionMetadata.dataKey(), encryptionMetadata.aad()); transformEnum = new EncryptionChunkEnumeration( transformEnum, - () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD)); + () -> aesEncryptionProvider.encryptionCipher(maybeEncryptionKey) + ); } final var transformFinisher = new TransformFinisher(transformEnum, size); final var inputStream = transformFinisher.nextElement(); @@ -430,10 +473,20 @@ private Chunk singleChunk(final ChunkIndex chunkIndex) { return chunks.get(0); } - private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, - final SegmentManifest segmentManifest, - final SegmentCustomMetadataBuilder customMetadataBuilder) - throws StorageBackendException, IOException { + void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata, + final ChunkIndex chunkIndex, + final SegmentIndexesV1 segmentIndexes, + final boolean requiresCompression, + final DataKeyAndAAD maybeEncryptionKey, + final SegmentCustomMetadataBuilder customMetadataBuilder + ) throws StorageBackendException, IOException { + final var segmentManifestBuilder = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexes) + .withRlsm(remoteLogSegmentMetadata) + .withCompressionEnabled(requiresCompression); + if (maybeEncryptionKey != null) { + segmentManifestBuilder.withEncryptionKey(maybeEncryptionKey); + } + final SegmentManifest segmentManifest = segmentManifestBuilder.build(); final String manifest = mapper.writeValueAsString(segmentManifest); final ObjectKey manifestObjectKey = objectKeyFactory.key(remoteLogSegmentMetadata, ObjectKeyFactory.Suffix.MANIFEST); @@ -607,10 +660,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment final long startedMs = time.milliseconds(); try { - final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) - .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) - .collect(Collectors.toSet()); - deleter.delete(keys); + tryDeleteSegmentObjects(remoteLogSegmentMetadata); } catch (final Exception e) { metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId() .topicIdPartition().topicPartition()); @@ -624,6 +674,15 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment log.info("Deleting log segment data for completed successfully {}", remoteLogSegmentMetadata); } + private void tryDeleteSegmentObjects( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata + ) throws StorageBackendException { + final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) + .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) + .collect(Collectors.toSet()); + deleter.delete(keys); + } + @Override public void close() { metrics.close(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java index 26bcc3765..b4594f778 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentEncryptionMetadataV1.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.Objects; +import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -37,6 +39,10 @@ public SegmentEncryptionMetadataV1(@JsonProperty(value = "dataKey", required = t this.aad = Objects.requireNonNull(aad, "aad cannot be null"); } + public SegmentEncryptionMetadataV1(final DataKeyAndAAD dataKeyAndAAD) { + this(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); + } + @Override public int ivSize() { return IV_SIZE; diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java index 80b93e187..92e0f8c9f 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1.java @@ -22,6 +22,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; +import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; @@ -44,7 +45,7 @@ public SegmentManifestV1( this(chunkIndex, segmentIndexes, compression, encryption, null); } - public SegmentManifestV1(final ChunkIndex chunkIndex, + private SegmentManifestV1(final ChunkIndex chunkIndex, final SegmentIndexesV1 segmentIndexes, final boolean compression, final SegmentEncryptionMetadataV1 encryption, @@ -58,6 +59,13 @@ public SegmentManifestV1(final ChunkIndex chunkIndex, this.remoteLogSegmentMetadata = remoteLogSegmentMetadata; } + public static Builder newBuilder( + final ChunkIndex chunkIndex, + final SegmentIndexesV1 segmentIndexes + ) { + return new Builder(chunkIndex, segmentIndexes); + } + @Override @JsonProperty("chunkIndex") public ChunkIndex chunkIndex() { @@ -129,4 +137,46 @@ public String toString() { + ", encryption=" + encryption + ")"; } + + public static class Builder { + final ChunkIndex chunkIndex; + final SegmentIndexesV1 segmentIndexes; + boolean compression = false; + SegmentEncryptionMetadataV1 encryptionMetadata = null; + RemoteLogSegmentMetadata rlsm = null; + + public Builder( + final ChunkIndex chunkIndex, + final SegmentIndexesV1 segmentIndexes + ) { + this.chunkIndex = chunkIndex; + this.segmentIndexes = segmentIndexes; + } + + public Builder withCompressionEnabled(final boolean requiresCompression) { + this.compression = requiresCompression; + return this; + } + + public Builder withEncryptionMetadata(final SegmentEncryptionMetadataV1 encryptionMetadata) { + this.encryptionMetadata = Objects.requireNonNull(encryptionMetadata, "encryptionMetadata cannot be null"); + return this; + } + + public Builder withEncryptionKey(final DataKeyAndAAD dataKeyAndAAD) { + this.encryptionMetadata = new SegmentEncryptionMetadataV1( + Objects.requireNonNull(dataKeyAndAAD, "dataKeyAndAAD cannot be null") + ); + return this; + } + + public Builder withRlsm(final RemoteLogSegmentMetadata rlsm) { + this.rlsm = Objects.requireNonNull(rlsm, "remoteLogSegmentMetadata cannot be null"); + return this; + } + + public SegmentManifestV1 build() { + return new SegmentManifestV1(chunkIndex, segmentIndexes, compression, encryptionMetadata, rlsm); + } + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 9ef568e6f..7c75183f0 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -18,15 +18,19 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; @@ -36,6 +40,7 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -45,8 +50,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; class RemoteStorageManagerTest { @@ -221,6 +230,85 @@ void fetchSegmentNonInterruptionExceptionWhenGettingSegment( .hasRootCauseInstanceOf(exceptionClass); } + @Test + void deleteObjectsWhenUploadFails( + @TempDir final Path partitionDir + ) throws IOException, StorageBackendException, RemoteStorageException { + // given a sample local segment to be uploaded + final var segmentPath = Files.createFile(partitionDir.resolve("0000.log")); + final var segmentContent = "test"; + Files.writeString(segmentPath, segmentContent); + final var indexPath = Files.createFile(partitionDir.resolve("0000.index")); + final var timeIndexPath = Files.createFile(partitionDir.resolve("0000.timeindex")); + final var producerSnapshotPath = Files.createFile(partitionDir.resolve("0000.snapshot")); + final var logSegmentData = new LogSegmentData( + segmentPath, + indexPath, + timeIndexPath, + Optional.empty(), + producerSnapshotPath, + ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)) + ); + + final var remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + REMOTE_SEGMENT_ID, 0, 1L, + 0, 0, 0, segmentContent.length(), Map.of(0, 0L)); + + final var remotePartitionPath = targetDir.resolve(TOPIC_ID_PARTITION.topic() + "-" + TOPIC_ID) + .resolve(String.valueOf(TOPIC_ID_PARTITION.partition())); + + final var config = Map.of( + "chunk.size", "1", + "storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage", + "storage.root", targetDir.toString() + ); + rsm.configure(config); + rsm = spy(rsm); + + // when first upload fails + doThrow(IOException.class).when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + // when second upload fails + doThrow(IOException.class).when(rsm).uploadIndexes(any(), any(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadIndexes(any(), any(), any(), any()); + + // when third upload fails + doThrow(IOException.class).when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + // when all good + rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData); + assertThat(Files.list(remotePartitionPath)).hasSize(3); + } + static Stream provideNonInterruptionExceptions() { return Stream.of( arguments(null, Exception.class), diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultChunkManagerTest.java index 04763fdab..3a4d727f0 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/DefaultChunkManagerTest.java @@ -62,7 +62,8 @@ class DefaultChunkManagerTest extends AesKeyAwareTest { void testGetChunk() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null); + final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES) + .build(); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); @@ -88,7 +89,9 @@ void testGetChunkWithEncryption() throws Exception { new ByteArrayInputStream(encrypted)); final var encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); - final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption, null); + final var manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES) + .withEncryptionMetadata(encryption) + .build(); final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); @@ -108,7 +111,9 @@ void testGetChunkWithCompression() throws Exception { when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream(compressed)); - final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null, null); + final var manifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES) + .withCompressionEnabled(true) + .build(); final ChunkManager chunkManager = new DefaultChunkManager(storage, null); assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationSourceInputStreamClosingTest.java index 7671c7eaf..2ec5e591a 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -70,8 +70,8 @@ class FetchChunkEnumerationSourceInputStreamClosingTest { .add(IndexType.LEADER_EPOCH, 1) .add(IndexType.TRANSACTION, 1) .build(); - static final SegmentManifest SEGMENT_MANIFEST = new SegmentManifestV1( - CHUNK_INDEX, SEGMENT_INDEXES, false, null, null); + static final SegmentManifest SEGMENT_MANIFEST = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES) + .build(); TestObjectFetcher fetcher; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationTest.java index e7337b011..1a22a15f9 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/FetchChunkEnumerationTest.java @@ -53,7 +53,7 @@ class FetchChunkEnumerationTest { .add(IndexType.LEADER_EPOCH, 1) .add(IndexType.TRANSACTION, 1) .build(); - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, segmentIndexesV1, false, null, null); + final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexesV1).build(); static final byte[] CHUNK_CONTENT = "0123456789".getBytes(); static final ObjectKey SEGMENT_KEY = new TestObjectKey("topic/segment"); @@ -64,7 +64,7 @@ class FetchChunkEnumerationTest { @Test void failsWhenLargerStartPosition() { // Given - final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, segmentIndexesV1, false, null, null); + final SegmentManifest manifest = SegmentManifestV1.newBuilder(chunkIndex, segmentIndexesV1).build(); // When final int from = 1000; final int to = from + 1; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCacheTest.java index 262801b9a..cf8ac5af2 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/ChunkCacheTest.java @@ -86,8 +86,11 @@ class ChunkCacheTest { .add(IndexType.TRANSACTION, 1) .build(); - private static final SegmentManifest SEGMENT_MANIFEST = - new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, SEGMENT_INDEXES, false, null, null); + private static final SegmentManifest SEGMENT_MANIFEST = SegmentManifestV1.newBuilder( + FIXED_SIZE_CHUNK_INDEX, + SEGMENT_INDEXES + ) + .build(); private static final String TEST_EXCEPTION_MESSAGE = "test_message"; private static final String SEGMENT_KEY = "topic/segment"; private static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_KEY; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/DiskChunkCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/DiskChunkCacheMetricsTest.java index 1c769a606..34de08115 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/DiskChunkCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/fetch/cache/DiskChunkCacheMetricsTest.java @@ -56,16 +56,16 @@ class DiskChunkCacheMetricsTest { TimeUnit.SECONDS.convert(new MetricConfig().timeWindowMs(), TimeUnit.MILLISECONDS); static final SegmentManifest SEGMENT_MANIFEST = - new SegmentManifestV1( - new FixedSizeChunkIndex(10, 30, 10, 10), - SegmentIndexesV1.builder() - .add(RemoteStorageManager.IndexType.OFFSET, 1) - .add(RemoteStorageManager.IndexType.TIMESTAMP, 1) - .add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1) - .add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1) - .add(RemoteStorageManager.IndexType.TRANSACTION, 1) - .build(), - false, null, null); + SegmentManifestV1.newBuilder( + new FixedSizeChunkIndex(10, 30, 10, 10), + SegmentIndexesV1.builder() + .add(RemoteStorageManager.IndexType.OFFSET, 1) + .add(RemoteStorageManager.IndexType.TIMESTAMP, 1) + .add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1) + .add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1) + .add(RemoteStorageManager.IndexType.TRANSACTION, 1) + .build()) + .build(); static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment"; diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java index fe73cf1ad..fd6f2bc56 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java @@ -115,7 +115,7 @@ void shouldReturnAndCache() throws StorageBackendException, IOException { when(storage.fetch(key)) .thenReturn(new ByteArrayInputStream(MANIFEST.getBytes())); final var chunkIndex = new FixedSizeChunkIndex(100, 1000, 110, 110); - final var expectedManifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null); + final var expectedManifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES).build(); assertThat(provider.get(key)).isEqualTo(expectedManifest); verify(storage).fetch(key); assertThat(provider.get(key)).isEqualTo(expectedManifest); @@ -155,7 +155,7 @@ void shouldNotPoisonCacheWithFailedFutures() .hasMessage("test"); final var chunkIndex = new FixedSizeChunkIndex(100, 1000, 110, 110); - final var expectedManifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null); + final var expectedManifest = SegmentManifestV1.newBuilder(chunkIndex, SEGMENT_INDEXES).build(); await().atMost(Duration.ofMillis(50)) .pollInterval(Duration.ofMillis(5)) diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1BuilderTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1BuilderTest.java new file mode 100644 index 000000000..7a48638d9 --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1BuilderTest.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.manifest; + +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +import java.util.Map; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; + +import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; +import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class SegmentManifestV1BuilderTest { + static final FixedSizeChunkIndex CHUNK_INDEX = + new FixedSizeChunkIndex(100, 1000, 110, 110); + static final SecretKey DATA_KEY = new SecretKeySpec(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, "AES"); + static final byte[] AAD = {10, 11, 12, 13}; + static final SegmentIndexesV1 SEGMENT_INDEXES = new SegmentIndexesV1Builder() + .add(RemoteStorageManager.IndexType.OFFSET, 1) + .add(RemoteStorageManager.IndexType.TIMESTAMP, 1) + .add(RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT, 1) + .add(RemoteStorageManager.IndexType.LEADER_EPOCH, 1) + .add(RemoteStorageManager.IndexType.TRANSACTION, 1) + .build(); + static final RemoteLogSegmentMetadata REMOTE_LOG_SEGMENT_METADATA = new RemoteLogSegmentMetadata( + new RemoteLogSegmentId( + new TopicIdPartition(Uuid.fromString("lZ6vvmajTWKDBUTV6SQAtQ"), 42, "topic1"), + Uuid.fromString("adh9f8BMS4anaUnD8KWfWg") + ), + 0, + 1000L, + 1000000000L, + 2, + 2000000000L, + 100500, + Map.of(0, 100L, 1, 200L, 2, 300L) + ); + + @Test + void minimal() { + final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES).build(); + assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX); + assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES); + assertThat(manifest.compression()).isFalse(); + assertThat(manifest.encryption()).isEmpty(); + assertThat(manifest.remoteLogSegmentMetadata()).isNull(); + } + + @Test + void withRlsm() { + final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES) + .withRlsm(REMOTE_LOG_SEGMENT_METADATA) + .build(); + assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX); + assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES); + assertThat(manifest.compression()).isFalse(); + assertThat(manifest.encryption()).isEmpty(); + assertThat(manifest.remoteLogSegmentMetadata()).isEqualTo(REMOTE_LOG_SEGMENT_METADATA); + } + + @Test + void withCompressionEnabled() { + final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES) + .withCompressionEnabled(true) + .build(); + assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX); + assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES); + assertThat(manifest.compression()).isTrue(); + assertThat(manifest.encryption()).isEmpty(); + assertThat(manifest.remoteLogSegmentMetadata()).isNull(); + } + + @Test + void withCompressionDisabled() { + final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES) + .withCompressionEnabled(false) + .build(); + assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX); + assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES); + assertThat(manifest.compression()).isFalse(); + assertThat(manifest.encryption()).isEmpty(); + assertThat(manifest.remoteLogSegmentMetadata()).isNull(); + } + + @Test + void withEncryptionKey() { + final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES) + .withEncryptionKey(new DataKeyAndAAD(DATA_KEY, AAD)) + .build(); + assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX); + assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES); + assertThat(manifest.compression()).isFalse(); + assertThat(manifest.encryption()).isPresent(); + manifest.encryption().ifPresent(segmentEncryptionMetadata -> { + assertThat(segmentEncryptionMetadata.dataKey()).isEqualTo(DATA_KEY); + assertThat(segmentEncryptionMetadata.aad()).isEqualTo(AAD); + }); + assertThat(manifest.remoteLogSegmentMetadata()).isNull(); + } + + @Test + void full() { + final var manifest = SegmentManifestV1.newBuilder(CHUNK_INDEX, SEGMENT_INDEXES) + .withCompressionEnabled(true) + .withEncryptionKey(new DataKeyAndAAD(DATA_KEY, AAD)) + .withRlsm(REMOTE_LOG_SEGMENT_METADATA) + .build(); + assertThat(manifest.chunkIndex()).isEqualTo(CHUNK_INDEX); + assertThat(manifest.segmentIndexes()).isEqualTo(SEGMENT_INDEXES); + assertThat(manifest.compression()).isTrue(); + assertThat(manifest.encryption()).isPresent(); + manifest.encryption().ifPresent(segmentEncryptionMetadata -> { + assertThat(segmentEncryptionMetadata.dataKey()).isEqualTo(DATA_KEY); + assertThat(segmentEncryptionMetadata.aad()).isEqualTo(AAD); + }); + assertThat(manifest.remoteLogSegmentMetadata()).isEqualTo(REMOTE_LOG_SEGMENT_METADATA); + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java index 545f39488..b68642a82 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestV1SerdeTest.java @@ -147,8 +147,10 @@ void init() { @Test void withEncryption() throws JsonProcessingException { - final SegmentManifest manifest = new SegmentManifestV1(INDEX, SEGMENT_INDEXES, false, - new SegmentEncryptionMetadataV1(DATA_KEY, AAD), REMOTE_LOG_SEGMENT_METADATA); + final SegmentManifest manifest = SegmentManifestV1.newBuilder(INDEX, SEGMENT_INDEXES) + .withRlsm(REMOTE_LOG_SEGMENT_METADATA) + .withEncryptionMetadata(new SegmentEncryptionMetadataV1(DATA_KEY, AAD)) + .build(); final String jsonStr = mapper.writeValueAsString(manifest); @@ -174,7 +176,9 @@ void withEncryption() throws JsonProcessingException { @Test void withoutEncryption() throws JsonProcessingException { - final var manifest = new SegmentManifestV1(INDEX, SEGMENT_INDEXES, false, null, REMOTE_LOG_SEGMENT_METADATA); + final var manifest = SegmentManifestV1.newBuilder(INDEX, SEGMENT_INDEXES) + .withRlsm(REMOTE_LOG_SEGMENT_METADATA) + .build(); final String jsonStr = mapper.writeValueAsString(manifest); @@ -188,8 +192,9 @@ void withoutEncryption() throws JsonProcessingException { @Test void withoutTxnIndex() throws JsonProcessingException { - final var manifest = new SegmentManifestV1(INDEX, SEGMENT_INDEXES_WITHOUT_TXN_INDEX, - false, null, REMOTE_LOG_SEGMENT_METADATA); + final var manifest = SegmentManifestV1.newBuilder(INDEX, SEGMENT_INDEXES_WITHOUT_TXN_INDEX) + .withRlsm(REMOTE_LOG_SEGMENT_METADATA) + .build(); final String jsonStr = mapper.writeValueAsString(manifest);