diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index c97fdab14..33d540363 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -46,9 +46,7 @@ import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider; import io.aiven.kafka.tieredstorage.storage.BytesRange; -import io.aiven.kafka.tieredstorage.storage.ObjectDeleter; -import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; -import io.aiven.kafka.tieredstorage.storage.ObjectUploader; +import io.aiven.kafka.tieredstorage.storage.StorageBackend; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration; import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration; @@ -78,9 +76,6 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private final Executor executor = new ForkJoinPool(); - private ObjectFetcher fetcher; - private ObjectUploader uploader; - private ObjectDeleter deleter; private boolean compressionEnabled; private boolean compressionHeuristic; private boolean encryptionEnabled; @@ -92,6 +87,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote. private ObjectKey objectKey; private SegmentManifestProvider segmentManifestProvider; + private StorageBackend storage; public RemoteStorageManager() { this(Time.SYSTEM); @@ -107,9 +103,7 @@ public RemoteStorageManager() { public void configure(final Map configs) { Objects.requireNonNull(configs, "configs must not be null"); final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(configs); - fetcher = config.storage(); - uploader = config.storage(); - deleter = config.storage(); + storage = config.storage(); objectKey = new ObjectKey(config.keyPrefix()); encryptionEnabled = config.encryptionEnabled(); if (encryptionEnabled) { @@ -120,7 +114,7 @@ public void configure(final Map configs) { aesEncryptionProvider = new AesEncryptionProvider(); } chunkManager = new ChunkManager( - fetcher, + storage, objectKey, aesEncryptionProvider, config.chunkCache() @@ -136,7 +130,7 @@ public void configure(final Map configs) { objectKey, config.segmentManifestCacheSize(), config.segmentManifestCacheRetention(), - fetcher, + storage, mapper, executor); } @@ -231,7 +225,7 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet throws IOException, StorageBackendException { final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG); try (final var sis = transformFinisher.toInputStream()) { - uploader.upload(sis, fileKey); + storage.upload(sis, fileKey); } } @@ -241,7 +235,7 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta throws StorageBackendException, IOException { final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)); try (index) { - uploader.upload(index, key); + storage.upload(index, key); } } @@ -252,7 +246,7 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST); try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) { - uploader.upload(manifestContent, manifestFileKey); + storage.upload(manifestContent, manifestFileKey); } } @@ -291,7 +285,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet final IndexType indexType) throws RemoteStorageException { try { final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)); - return fetcher.fetch(key); + return storage.fetch(key); } catch (final StorageBackendException e) { // TODO: should be aligned with upstream implementation if (indexType == TRANSACTION) { @@ -309,7 +303,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment try { for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) { final String key = objectKey.key(remoteLogSegmentMetadata, suffix); - deleter.delete(key); + storage.delete(key); } } catch (final StorageBackendException e) { throw new RemoteStorageException(e);