diff --git a/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java b/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java index da6e85d1d7..9a694e1de9 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java +++ b/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java @@ -189,6 +189,11 @@ public CompletableFuture createAndOpenStream(CreateStreamOptions options return failureHandle(streamClient.createAndOpenStream(options).thenApplyAsync(rst -> rst, streamManagerCallbackExecutors)); } + @Override + public CompletableFuture preWarmStream(long streamId) { + return streamClient.preWarmStream(streamId); + } + @Override public CompletableFuture openStream(long streamId, OpenStreamOptions options) { return failureHandle(streamClient.openStream(streamId, options).thenApplyAsync(rst -> rst, streamManagerCallbackExecutors)); diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index c614b2f31c..696e1aa1b3 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -709,6 +709,7 @@ object ElasticLog extends Logging { stream } else { val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong() + client.streamClient().preWarmStream(metaStreamId) val awaitCostMs = awaitStreamReadyForOpen(openStreamChecker, topicId.get, topicPartition.partition(), metaStreamId, leaderEpoch, logIdent = logIdent) // open partition meta stream val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).tags(streamTags).build()) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogMeta.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogMeta.java index caf2e018f1..b11a8ba7af 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogMeta.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogMeta.java @@ -41,7 +41,7 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class ElasticLogMeta { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final ObjectWriter WRITER = OBJECT_MAPPER.writer(); + private static final ObjectWriter WRITER = OBJECT_MAPPER.writerFor(ElasticLogMeta.class); private static final ObjectReader READER = OBJECT_MAPPER.readerFor(ElasticLogMeta.class); private static final Logger LOGGER = LoggerFactory.getLogger(ElasticLogMeta.class); diff --git a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java index 75074f5b77..319605b03b 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java @@ -162,6 +162,11 @@ public CompletableFuture createAndOpenStream(CreateStreamOptions createS return CompletableFuture.completedFuture(new StreamImpl(streamIdAlloc.incrementAndGet())); } + @Override + public CompletableFuture preWarmStream(long streamId) { + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { return CompletableFuture.completedFuture(new StreamImpl(streamId)); diff --git a/s3stream/src/main/java/com/automq/stream/api/StreamClient.java b/s3stream/src/main/java/com/automq/stream/api/StreamClient.java index 8ac8672c83..36210cf90d 100644 --- a/s3stream/src/main/java/com/automq/stream/api/StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/api/StreamClient.java @@ -34,6 +34,14 @@ public interface StreamClient { */ CompletableFuture createAndOpenStream(CreateStreamOptions options); + /** + * PreWarmStream + * Expect to preWarm metaStream data cache before real open + * @param streamId metaStreamId + * @return preWarm result {@link CompletableFuture} + */ + CompletableFuture preWarmStream(long streamId); + /** * Open stream. * diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 353b37306b..f77acb3d60 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -23,9 +23,11 @@ import com.automq.stream.api.CreateStreamOptions; import com.automq.stream.api.FetchResult; import com.automq.stream.api.OpenStreamOptions; +import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.Stream; import com.automq.stream.api.StreamClient; +import com.automq.stream.s3.cache.ReadDataBlock; import com.automq.stream.s3.compact.StreamObjectCompactor; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; @@ -33,6 +35,7 @@ import com.automq.stream.s3.metadata.StreamState; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StreamOperationStats; +import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.network.NetworkBandwidthLimiter; import com.automq.stream.s3.objects.ObjectManager; import com.automq.stream.s3.operator.ObjectStorage; @@ -134,6 +137,22 @@ public CompletableFuture createAndOpenStream(CreateStreamOptions options }); } + public CompletableFuture preWarmStream(long streamId) { + return this.streamManager.getStreams(List.of(streamId)).thenCompose(meta -> { + if (meta.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + FetchContext fetchContext = new FetchContext(); + ReadOptions readOptions = ReadOptions.builder().prioritizedRead(true).build(); + fetchContext.setReadOptions(readOptions); + + StreamMetadata streamMetadata = meta.get(0); + CompletableFuture read = this.storage.read(fetchContext, streamId, streamMetadata.startOffset(), streamMetadata.endOffset(), 64 * 1024); + return read.thenAccept(readDataBlock -> readDataBlock.getRecords().forEach(StreamRecordBatch::release)); + }); + } + @Override public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { return runInLock(() -> {