diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e4415b5e5..94d685f37 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -20,6 +20,7 @@ + diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 47e39cb27..45f13241b 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -153,7 +153,7 @@ public void configure(final Map configs) { } final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory(); chunkManagerFactory.configure(configs); - chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider); + chunkManager = chunkManagerFactory.initChunkManager(fetcher, aesEncryptionProvider, config.useNewMode()); chunkSize = config.chunkSize(); compressionEnabled = config.compressionEnabled(); compressionHeuristic = config.compressionHeuristicEnabled(); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java index cc8c2c4da..90c1eb69c 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java @@ -33,8 +33,10 @@ public void configure(final Map configs) { } public ChunkManager initChunkManager(final ObjectFetcher fileFetcher, - final AesEncryptionProvider aesEncryptionProvider) { - final DefaultChunkManager defaultChunkManager = new DefaultChunkManager(fileFetcher, aesEncryptionProvider); + final AesEncryptionProvider aesEncryptionProvider, + final boolean useNewMode) { + final DefaultChunkManager defaultChunkManager = + new DefaultChunkManager(fileFetcher, aesEncryptionProvider, useNewMode); if (config.cacheClass() != null) { try { final ChunkCache chunkCache = config diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java index ac219eee5..c64888916 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java @@ -16,9 +16,15 @@ package io.aiven.kafka.tieredstorage.chunkmanager; +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; +import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import io.aiven.kafka.tieredstorage.Chunk; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; @@ -33,13 +39,21 @@ import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration; import io.aiven.kafka.tieredstorage.transform.DetransformFinisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DefaultChunkManager implements ChunkManager { + private static final Logger log = LoggerFactory.getLogger(DefaultChunkManager.class); + private final ObjectFetcher fetcher; private final AesEncryptionProvider aesEncryptionProvider; + private final boolean useNewMode; - public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) { + public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider, + final boolean useNewMode) { this.fetcher = fetcher; this.aesEncryptionProvider = aesEncryptionProvider; + this.useNewMode = useNewMode; } /** @@ -50,8 +64,9 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest, final int chunkId) throws StorageBackendException { final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); - - final InputStream chunkContent = fetcher.fetch(objectKey, chunk.range()); + final InputStream chunkContent = useNewMode + ? getChunkContentNewMode(objectKey, chunk) + : getChunkContentOldMode(objectKey, chunk); DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk)); final Optional encryptionMetadata = manifest.encryption(); @@ -68,4 +83,130 @@ public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest man final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum); return detransformFinisher.toInputStream(); } + + private InputStream getChunkContentOldMode(final ObjectKey objectKey, + final Chunk chunk) throws StorageBackendException { + return fetcher.fetch(objectKey, chunk.range()); + } + + private final PersistentObjectInputStreamCache streamCache = new PersistentObjectInputStreamCache(); + private final AtomicInteger requestCounter = new AtomicInteger(0); + + private InputStream getChunkContentNewMode(final ObjectKey objectKey, + final Chunk chunk) throws StorageBackendException { + final int requestId = requestCounter.getAndIncrement(); + log.error("[{}] I need chunk: {}", requestId, chunk); + + try (final var streamHandler = streamCache.borrowOrCreate(requestId, objectKey, chunk)) { + final var stream = streamHandler.inputStream; + + if (chunk.equals(stream.currentChunk)) { + log.error("[{}] Chunk cached", requestId); + return new ByteArrayInputStream(stream.currentChunkContent); + } + + log.error("[{}] Reading chunk content", requestId); + final byte[] chunkBytes = stream.readChunk(chunk); + log.error("[{}] Read chunk content", requestId); + return new ByteArrayInputStream(chunkBytes); + } catch (final Exception e) { + throw new StorageBackendException("error", e); + } + } + + private static class PersistentObjectInputStream extends FilterInputStream { + public int position = 0; + public Chunk currentChunk = null; + public byte[] currentChunkContent = null; + + private PersistentObjectInputStream(final InputStream in) { + super(in); + } + + private byte[] readChunk(final Chunk chunk) throws StorageBackendException { + if (chunk.transformedPosition != position) { + throw new IllegalArgumentException("Invalid chunk " + chunk + ", current position: " + position); + } + + currentChunk = chunk; + try { + final int size = chunk.range().size(); + currentChunkContent = in.readNBytes(size); + if (currentChunkContent.length != size) { + throw new StorageBackendException( + "Expected " + size + " bytes for chunk " + chunk + " but got " + currentChunkContent.length); + } + position += size; + return currentChunkContent; + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + private class PersistentObjectInputStreamCache { + private final Object lock = new Object(); + private final HashMap> persistentStreams = new HashMap<>(); + + StreamHandler borrowOrCreate(final int requestId, + final ObjectKey objectKey, + final Chunk chunk) throws StorageBackendException { + synchronized (lock) { + final List streams = persistentStreams.get(objectKey); + log.error("[{}] Streams now: {}", requestId, persistentStreams); + final int from = chunk.range().from; + if (streams == null) { + log.error("[{}] Opening new stream", requestId); + return new StreamHandler(requestId, objectKey, + new PersistentObjectInputStream(fetcher.getContinuousStream(objectKey, from))); + } + + for (int i = 0; i < streams.size(); i++) { + final var stream = streams.get(i); + if (stream.currentChunk.equals(chunk)) { + log.error("[{}] Stream cached", requestId); + streams.remove(i); + return new StreamHandler(requestId, objectKey, stream); + } + } + + for (int i = 0; i < streams.size(); i++) { + final var stream = streams.get(i); + if (stream.position == from) { + log.error("[{}] Stream cached", requestId); + streams.remove(i); + return new StreamHandler(requestId, objectKey, stream); + } + } + + log.error("[{}] Opening new stream", requestId); + return new StreamHandler(requestId, objectKey, + new PersistentObjectInputStream(fetcher.getContinuousStream(objectKey, from))); + } + } + + class StreamHandler implements AutoCloseable { + private final int requestId; + private final ObjectKey objectKey; + public final PersistentObjectInputStream inputStream; + + private StreamHandler(final int requestId, + final ObjectKey objectKey, + final PersistentObjectInputStream inputStream) { + this.requestId = requestId; + this.objectKey = objectKey; + this.inputStream = inputStream; + } + + @Override + public void close() throws Exception { + synchronized (lock) { + log.error("[{}] Returning stream for {}", requestId, objectKey); + final var list = persistentStreams.computeIfAbsent(objectKey, k -> new ArrayList<>()); + list.add(inputStream); + log.error("[{}] Streams now: {}", requestId, persistentStreams); + } + } + } + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java index cf295ba72..11506b55e 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java @@ -42,6 +42,8 @@ public class RemoteStorageManagerConfig extends AbstractConfig { private static final String STORAGE_PREFIX = "storage."; + private static final String STORAGE_USE_NEW_MODE_CONFIG = STORAGE_PREFIX + "use.new.mode"; + private static final String STORAGE_BACKEND_CLASS_CONFIG = STORAGE_PREFIX + "backend.class"; private static final String STORAGE_BACKEND_CLASS_DOC = "The storage backend implementation class"; @@ -101,6 +103,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig { // TODO checkers + CONFIG.define( + STORAGE_USE_NEW_MODE_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + "" + ); + CONFIG.define( STORAGE_BACKEND_CLASS_CONFIG, ConfigDef.Type.CLASS, @@ -208,6 +218,10 @@ public class RemoteStorageManagerConfig extends AbstractConfig { CUSTOM_METADATA_FIELDS_INCLUDE_DOC); } + public boolean useNewMode() { + return getBoolean(STORAGE_USE_NEW_MODE_CONFIG); + } + /** * Internal config for encryption. * diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java index 99108780d..1ded38c66 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java @@ -53,7 +53,8 @@ public static Stream cachingChunkManagers() { @Test void defaultChunkManager() { chunkManagerFactory.configure(Map.of()); - final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(null, null); + final ChunkManager chunkManager = + chunkManagerFactory.initChunkManager(null, null, false); assertThat(chunkManager).isInstanceOf(DefaultChunkManager.class); } @@ -68,7 +69,8 @@ void cachingChunkManagers(final Class> cls) { ) ); try (final MockedConstruction ignored = mockConstruction(cls)) { - final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(null, null); + final ChunkManager chunkManager = + chunkManagerFactory.initChunkManager(null, null, false); assertThat(chunkManager).isInstanceOf(cls); verify((ChunkCache) chunkManager).configure(Map.of( "class", cls, @@ -85,9 +87,10 @@ void failedInitialization() { (cachingChunkManager, context) -> { throw new InvocationTargetException(null); })) { - assertThatThrownBy(() -> chunkManagerFactory.initChunkManager(null, null)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ReflectiveOperationException.class); + assertThatThrownBy(() -> + chunkManagerFactory.initChunkManager(null, null, false)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ReflectiveOperationException.class); } } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index fbf702929..891e7888d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -63,7 +63,7 @@ void testGetChunk() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null); - final ChunkManager chunkManager = new DefaultChunkManager(storage, null); + final ChunkManager chunkManager = new DefaultChunkManager(storage, null, false); when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); @@ -89,7 +89,7 @@ void testGetChunkWithEncryption() throws Exception { final var encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption, null); - final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); + final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider, false); assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); @@ -109,7 +109,7 @@ void testGetChunkWithCompression() throws Exception { .thenReturn(new ByteArrayInputStream(compressed)); final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null, null); - final ChunkManager chunkManager = new DefaultChunkManager(storage, null); + final ChunkManager chunkManager = new DefaultChunkManager(storage, null, false); assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java index de15cd8c8..b695739c5 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java @@ -89,7 +89,7 @@ void test(final Map config, final BytesRange range) throws StorageBackendException, IOException { final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory(); chunkManagerFactory.configure(config); - final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null); + final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(fetcher, null, false); final var is = new FetchChunkEnumeration(chunkManager, OBJECT_KEY, SEGMENT_MANIFEST, range) .toInputStream(); if (readFully) { diff --git a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java index 7c672ceeb..fcbb353ef 100644 --- a/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java +++ b/storage/core/src/main/java/io/aiven/kafka/tieredstorage/storage/ObjectFetcher.java @@ -31,4 +31,8 @@ public interface ObjectFetcher { * @param range range with inclusive start/end positions */ InputStream fetch(ObjectKey key, BytesRange range) throws StorageBackendException; + + default InputStream getContinuousStream(ObjectKey key, int from) throws StorageBackendException { + throw new IllegalStateException("not implemented"); + } } diff --git a/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java b/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java index 91728a62f..6798b36c8 100644 --- a/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java +++ b/storage/gcs/src/main/java/io/aiven/kafka/tieredstorage/storage/gcs/GcsStorage.java @@ -117,6 +117,31 @@ public InputStream fetch(final ObjectKey key, final BytesRange range) throws Sto } } + @Override + public InputStream getContinuousStream(final ObjectKey key, final int from) throws StorageBackendException { + try { + final Blob blob = getBlob(key); + + if (from >= blob.getSize()) { + throw new InvalidRangeException("Range start position " + from + + " is outside file content. file size = " + blob.getSize()); + } + + final ReadChannel reader = blob.reader(); + reader.seek(from); + return Channels.newInputStream(reader); + } catch (final IOException e) { + throw new StorageBackendException("Failed to fetch " + key, e); + } catch (final StorageException e) { + // https://cloud.google.com/storage/docs/json_api/v1/status-codes#416_Requested_Range_Not_Satisfiable + if (e.getCode() == 416) { + throw new InvalidRangeException("Invalid from TODO " + from, e); + } + + throw new StorageBackendException("Failed to fetch " + key, e); + } + } + private Blob getBlob(final ObjectKey key) throws KeyNotFoundException { // Unfortunately, it seems Google will do two a separate (HEAD-like) call to get blob metadata. // Since the blobs are immutable in tiered storage, we can consider caching them locally diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index 5f5d1b2b5..30c2fb1aa 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -105,6 +105,30 @@ public InputStream fetch(final ObjectKey key, final BytesRange range) throws Sto } } + @Override + public InputStream getContinuousStream(final ObjectKey key, final int from) throws StorageBackendException { + try { + final GetObjectRequest getRequest = GetObjectRequest.builder() + .bucket(bucketName) + .key(key.value()) + .build(); + final var result = s3Client.getObject(getRequest); + final long skipped = result.skip(from); + if (skipped != from) { + throw new StorageBackendException("Failed to seek to position " + from + " in " + key); + } + return result; + } catch (final AwsServiceException e) { + if (e.statusCode() == 404) { + throw new KeyNotFoundException(this, key, e); + } + + throw new StorageBackendException("Failed to fetch " + key, e); + } catch (final IOException e) { + throw new StorageBackendException("Failed to fetch " + key, e); + } + } + @Override public String toString() { return "S3Storage{"