From a12b947a1cbeab3fbe56416f0974e330d1618565 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Mon, 28 Jul 2025 17:15:22 +0800 Subject: [PATCH 1/7] feat(metadata): extract stream range index by lazy load Object --- .../s3/metadata/StreamMetadataManager.java | 120 +++- .../metadata/S3StreamsMetadataImageTest.java | 549 ++++++++++++++++++ .../kafka/image/S3StreamsMetadataImage.java | 418 +++++++++++-- .../image/S3StreamsMetadataImageTest.java | 14 +- .../index/lazy/StreamSetObjectRangeIndex.java | 186 ++++++ 5 files changed, 1239 insertions(+), 48 deletions(-) create mode 100644 core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java create mode 100644 s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index 4de980b6cf..c093116861 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -19,6 +19,7 @@ package kafka.log.stream.s3.metadata; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; import kafka.server.BrokerServer; import org.apache.kafka.image.MetadataDelta; @@ -37,11 +38,13 @@ import com.automq.stream.s3.ObjectReader; import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; import com.automq.stream.s3.index.LocalStreamRangeIndexCache; +import com.automq.stream.s3.cache.LRUCache; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.google.common.collect.Sets; import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.s3.operator.ObjectStorage.ReadOptions; @@ -49,10 +52,12 @@ import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.Threads; +import org.apache.orc.util.BloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -67,6 +72,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import static com.automq.stream.utils.FutureUtil.exec; +import static kafka.log.stream.s3.metadata.StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER; public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); @@ -78,6 +84,8 @@ public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPub private final LocalStreamRangeIndexCache indexCache; private final Map streamMetadataListeners = new ConcurrentHashMap<>(); + private Set streamSetObjectIds = Collections.emptySet(); + public StreamMetadataManager(BrokerServer broker, int nodeId, ObjectReaderFactory objectReaderFactory, LocalStreamRangeIndexCache indexCache) { this.nodeId = nodeId; @@ -98,6 +106,7 @@ public String name() { @Override public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { Set changedStreams; + Set streamSetObjectIds = this.streamSetObjectIds; synchronized (this) { if (newImage.highestOffsetAndEpoch().equals(this.metadataImage.highestOffsetAndEpoch())) { return; @@ -105,9 +114,15 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, Loader this.metadataImage = newImage; changedStreams = delta.getOrCreateStreamsMetadataDelta().changedStreams(); } + this.streamSetObjectIds = Collections.unmodifiableSet(getStreamSetObjectIds()); + + // update streamBloomFilter + Set sets = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); + sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); + // retry all pending tasks retryPendingTasks(); - this.indexCache.asyncPrune(this::getStreamSetObjectIds); + this.indexCache.asyncPrune(() -> streamSetObjectIds); notifyMetadataListeners(changedStreams); } @@ -339,9 +354,74 @@ public void close() { } } - private static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter { + public static class StreamIdBloomFilter { + public static final double DEFAULT_FPP = 0.01; + private final LRUCache cache = new LRUCache<>(); + private final long maxBloomFilterSize; + private long cacheSize = 0; + + public StreamIdBloomFilter(long maxBloomFilterCacheSize) { + this.maxBloomFilterSize = maxBloomFilterCacheSize; + } + + public synchronized void maintainCacheSize() { + while (cacheSize > maxBloomFilterSize) { + Map.Entry entry = cache.pop(); + if (entry != null) { + cacheSize -= Long.BYTES + entry.getValue().sizeInBytes(); + } + } + } + + public synchronized boolean mightContain(long objectId, long streamId) { + BloomFilter bloomFilter = cache.get(objectId); + if (bloomFilter == null) { + return true; // treat as exist + } + + cache.touchIfExist(objectId); + return bloomFilter.testLong(streamId); + } + + public synchronized void removeObject(long objectId) { + BloomFilter filter = cache.get(objectId); + if (cache.remove(objectId)) { + cacheSize -= Long.BYTES + filter.sizeInBytes(); + } + } + + public synchronized void update(long objectId, List streamOffsetRanges) { + if (cache.containsKey(objectId)) { + return; + } + + BloomFilter bloomFilter = new BloomFilter(streamOffsetRanges.size(), DEFAULT_FPP); + + streamOffsetRanges.forEach((range) -> bloomFilter.addLong(range.streamId())); + cache.put(objectId, bloomFilter); + cacheSize += Long.BYTES + bloomFilter.sizeInBytes(); + + maintainCacheSize(); + } + + public synchronized long sizeInBytes() { + return this.cacheSize; + } + + public synchronized int objectNum() { + return this.cache.size(); + } + + public synchronized void clear() { + cache.clear(); + } + } + + public static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter { private final S3ObjectsImage objectsImage; private final ObjectReaderFactory objectReaderFactory; + public static final StreamIdBloomFilter STREAM_ID_BLOOM_FILTER = new StreamIdBloomFilter(20 * 1024 * 1024); + private S3StreamsMetadataImage.GetObjectsContext getObjectsContext; public DefaultRangeGetter(S3ObjectsImage objectsImage, ObjectReaderFactory objectReaderFactory) { @@ -350,16 +430,46 @@ public DefaultRangeGetter(S3ObjectsImage objectsImage, } @Override - public CompletableFuture> find(long objectId, long streamId) { + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + this.getObjectsContext = ctx; + } + + public static void updateIndex(ObjectReader reader, Long nodeId, Long streamId) { + reader.basicObjectInfo().thenAccept(info -> { + Long objectId = reader.metadata().objectId(); + List streamOffsetRanges = info.indexBlock().streamOffsetRanges(); + + STREAM_ID_BLOOM_FILTER.update(objectId, streamOffsetRanges); + + StreamSetObjectRangeIndex.getInstance().updateIndex(objectId, nodeId, streamId, streamOffsetRanges); + }).whenComplete((v, e) -> reader.release()); + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { S3Object s3Object = objectsImage.getObjectMetadata(objectId); if (s3Object == null) { return FutureUtil.failedFuture(new IllegalArgumentException("Cannot find object metadata for object: " + objectId)); } + + boolean mightContain = STREAM_ID_BLOOM_FILTER.mightContain(objectId, streamId); + if (!mightContain) { + getObjectsContext.bloomFilterSkipSSOCount++; + return CompletableFuture.completedFuture(Optional.empty()); + } + + getObjectsContext.searchSSOStreamOffsetRangeCount++; // The reader will be release after the find operation @SuppressWarnings("resource") ObjectReader reader = objectReaderFactory.get(new S3ObjectMetadata(objectId, s3Object.getObjectSize(), s3Object.getAttributes())); - CompletableFuture> cf = reader.basicObjectInfo().thenApply(info -> info.indexBlock().findStreamOffsetRange(streamId)); - cf.whenComplete((rst, ex) -> reader.release()); + CompletableFuture> cf = reader.basicObjectInfo() + .thenApply(info -> info.indexBlock().findStreamOffsetRange(streamId)); + cf.whenCompleteAsync((rst, ex) -> { + if (rst.isEmpty()) { + getObjectsContext.searchSSORangeEmpty.add(1); + } + updateIndex(reader, nodeId, streamId); + }, StreamSetObjectRangeIndex.UPDATE_INDEX_THREAD_POOL); return cf; } diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java new file mode 100644 index 0000000000..3d7881de20 --- /dev/null +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -0,0 +1,549 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.stream.s3.metadata; + +import com.automq.stream.s3.ByteBufAlloc; +import com.automq.stream.s3.DataBlockIndex; +import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; +import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.S3ObjectType; +import com.automq.stream.s3.metadata.S3StreamConstant; +import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.metadata.StreamState; +import com.automq.stream.s3.operator.MemoryObjectStorage; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.image.DeltaList; +import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage; +import org.apache.kafka.image.RegistryRef; +import org.apache.kafka.image.S3ObjectsImage; +import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.S3StreamsMetadataImage; +import org.apache.kafka.image.TopicIdPartition; +import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3StreamSetObject; +import org.apache.kafka.timeline.TimelineHashMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +@Tag("S3Unit") +public class S3StreamsMetadataImageTest { + private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamsMetadataImageTest.class); + + static class NodeData { + private final List streamSetObjects; + + public NodeData(int nodeId, List streamSetObjects) { + this.streamSetObjects = streamSetObjects.stream() + .map(sso -> + new S3StreamSetObject(sso.objectId(), sso.nodeId(), Bytes.EMPTY, sso.orderId(), sso.dataTimeInMs())) + .collect(Collectors.toList()); + this.streamSetObjects.sort(Comparator.comparingLong(S3StreamSetObject::orderId)); + } + + public List getStreamSetObjects() { + return streamSetObjects; + } + } + + static class GeneratedStreamMetadata implements S3StreamsMetadataImage.RangeGetter { + private final List streamObjects; + private final Map nodeData; + private final Map> ssoRanges; + private final StreamMetadataManager.DefaultRangeGetter rangeGetter; + private final Random r; + + public GeneratedStreamMetadata(Random r, List streamObjects, Map nodeData, Map> ssoRanges) { + RegistryRef registryRef = new RegistryRef(); + this.r = r; + TimelineHashMap object = new TimelineHashMap<>(registryRef.registry(), 0); + ssoRanges.keySet().forEach(id -> object.put(id, new S3Object(id, 0, 0, S3ObjectState.COMMITTED, 0))); + S3ObjectsImage image = new S3ObjectsImage(10000, object, registryRef); + MemoryObjectStorage memoryObjectStorage = new MemoryObjectStorage(); + ObjectReaderFactory factory = new DefaultObjectReaderFactory(memoryObjectStorage) { + @Override + public synchronized ObjectReader get(S3ObjectMetadata metadata) { + return new ObjectReader() { + @Override + public S3ObjectMetadata metadata() { + return metadata; + } + + @Override + public String objectKey() { + return ""; + } + + @Override + public CompletableFuture basicObjectInfo() { + return CompletableFuture.completedFuture(new BasicObjectInfo(0, null) { + @Override + public IndexBlock indexBlock() { + return new IndexBlock(metadata, ByteBufAlloc.byteBuffer(10)) { + @Override + public Optional findStreamOffsetRange(long streamId) { + List streamOffsetRanges = GeneratedStreamMetadata.this.getSsoRanges().get(metadata.objectId()); + return streamOffsetRanges.stream() + .filter(range -> range.streamId() == streamId).findAny(); + } + + @Override + public List streamOffsetRanges() { + return GeneratedStreamMetadata.this.getSsoRanges().get(metadata.objectId()); + } + }; + } + }); + } + + @Override + public CompletableFuture read(ReadOptions readOptions, DataBlockIndex block) { + return null; + } + + @Override + public ObjectReader retain() { + return null; + } + + @Override + public ObjectReader release() { + return null; + } + + @Override + public void close() { + + } + + @Override + public CompletableFuture size() { + return null; + } + }; + } + }; + StreamMetadataManager.DefaultRangeGetter rangeGetter = new StreamMetadataManager.DefaultRangeGetter(image, factory); + + this.rangeGetter = rangeGetter; + this.streamObjects = streamObjects; + this.nodeData = nodeData; + this.ssoRanges = ssoRanges; + } + + public List getStreamObjects() { + return streamObjects; + } + + public Map getNodeData() { + return new TreeMap<>(nodeData); + } + + public Map> getSsoRanges() { + return ssoRanges; + } + + private Object DUMMY_OBJECT = new Object(); + public ConcurrentHashMap accessed = new ConcurrentHashMap<>(); + + public S3StreamsMetadataImage.GetObjectsContext getObjectsContext; + + @Override + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + this.rangeGetter.attachGetObjectsContext(ctx); + this.getObjectsContext = ctx; + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { + return this.rangeGetter.find(objectId, streamId, nodeId, orderId) + .thenCompose( + r -> { + if (accessed.contains(objectId)) { + return CompletableFuture.completedFuture(r); + } + + accessed.put(objectId, DUMMY_OBJECT); + if (this.r.nextDouble() > 0.75) { + return new CompletableFuture>() + .completeOnTimeout(r, 1, TimeUnit.MILLISECONDS); + } else { + return CompletableFuture.completedFuture(r); + } + } + + ); + } + + @Override + public CompletableFuture readNodeRangeIndex(long nodeId) { + return CompletableFuture.failedFuture(new IllegalStateException("Not implemented")); + } + } + + public static class GeneratorResult { + long seed; + S3StreamsMetadataImage image; + GeneratedStreamMetadata generatedStreamMetadata; + + public GeneratorResult(long seed, S3StreamsMetadataImage image, GeneratedStreamMetadata generatedStreamMetadata) { + this.seed = seed; + this.image = image; + this.generatedStreamMetadata = generatedStreamMetadata; + } + } + + public static S3StreamsMetadataImageTest.GeneratorResult generate( + long randomSeed, + Random random, + long streamId, long totalLength, int numNodes, int maxSegmentSize, + double streamSetObjectProbability, double nodeMigrationProbability, int maxStreamPerSSO, double SSOContainsStreamProbability) { + + List streamObjects = new ArrayList<>(); + Map> nodeObjectsMap = new HashMap<>(); + Map> notStreamNodeObjectsMap = new HashMap<>(); + + List rangeMetadatas = new ArrayList<>(); + + long currentOffset = 0L; + int nextObjectId = 0; + int rangeIndex = 0; + long nextEpoch = 0; + + int currentNodeId = numNodes <= 0 ? -1 : random.nextInt(numNodes); + + RangeMetadata rangeMetadata = new RangeMetadata(streamId, nextEpoch, rangeIndex, + currentOffset, currentOffset, currentNodeId); + while (currentOffset < totalLength) { + if (random.nextDouble() < SSOContainsStreamProbability) { + notStreamNodeObjectsMap.computeIfAbsent(currentNodeId, k -> new ArrayList<>()) + .add(new S3StreamSetObject(nextObjectId, random.nextInt(numNodes), new ArrayList<>(), nextObjectId)); + nextObjectId++; + } + + long segmentLen = random.nextInt(maxSegmentSize - 20 + 1) + 20; + long startOffset = currentOffset; + long endOffset = Math.min(startOffset + segmentLen, totalLength); + + if (random.nextDouble() < streamSetObjectProbability) { + + if (numNodes > 1 && random.nextDouble() < nodeMigrationProbability) { + int newNodeId; + do { + newNodeId = random.nextInt(numNodes); + } while (newNodeId == currentNodeId); + currentNodeId = newNodeId; + } + + int nodeId = currentNodeId; + + if (rangeMetadata.nodeId() != nodeId) { + // set endOffset + rangeMetadata = new RangeMetadata(rangeMetadata.streamId(), rangeMetadata.epoch(), rangeIndex, rangeMetadata.startOffset(), startOffset, rangeMetadata.nodeId()); + rangeMetadatas.add(rangeMetadata); + rangeIndex++; + + // create new rangeMetadata + rangeMetadata = new RangeMetadata(rangeMetadata.streamId(), rangeMetadata.epoch() + 1, rangeIndex, + rangeMetadata.endOffset(), endOffset, nodeId); + } + + int orderId = nextObjectId; + S3StreamSetObject sso = new S3StreamSetObject(nextObjectId, nodeId, List.of(new StreamOffsetRange(streamId, startOffset, endOffset)), orderId); + + nodeObjectsMap.computeIfAbsent(nodeId, k -> new ArrayList<>()).add(sso); + } else { + S3StreamObject so = new S3StreamObject(nextObjectId, streamId, startOffset, endOffset); + streamObjects.add(so); + } + + nextObjectId++; + currentOffset = endOffset; + } + + if (rangeMetadata.startOffset() != totalLength) { + rangeMetadata = new RangeMetadata(rangeMetadata.streamId(), rangeMetadata.epoch(), rangeIndex, rangeMetadata.startOffset(), totalLength, rangeMetadata.nodeId()); + rangeMetadatas.add(rangeMetadata); + } + + Map finalNodeData = new HashMap<>(); + for (int nodeId : nodeObjectsMap.keySet()) { + finalNodeData.put(nodeId, new NodeData(nodeId, nodeObjectsMap.get(nodeId))); + } + + RegistryRef ref = new RegistryRef(); + + DeltaList streamObject = new DeltaList(); + streamObjects.forEach(streamObject::add); + S3StreamMetadataImage streamsMetadataImage = new S3StreamMetadataImage(streamId, 1, StreamState.OPENED, + new S3StreamRecord.TagCollection(), + 0, rangeMetadatas, streamObject); + + TimelineHashMap streamMetadataImageTimelineHashMap = + new TimelineHashMap<>(ref.registry(), 10000); + streamMetadataImageTimelineHashMap.put(streamId, streamsMetadataImage); + + TimelineHashMap nodeMetadataMap = + new TimelineHashMap<>(ref.registry(), 10000); + + Map> ranges = new HashMap<>(); + nodeObjectsMap.entrySet().forEach(entry -> { + Integer nodeId = entry.getKey(); + List objects = entry.getValue(); + + DeltaList s3StreamSetObjectDeltaList = new DeltaList<>(); + for (S3StreamSetObject object : objects) { + S3StreamsMetadataImageTest.StreamSetObjectRange streamSetObjectRange = fillRandomStreamRangeInfo(streamId, random, maxStreamPerSSO, object); + ranges.put(object.objectId(), streamSetObjectRange.sfrs); + s3StreamSetObjectDeltaList.add(streamSetObjectRange.s3StreamSetObject); + } + + List notStreamNodeObjects = notStreamNodeObjectsMap.get(nodeId); + if (notStreamNodeObjects != null && !notStreamNodeObjects.isEmpty()) { + notStreamNodeObjects.forEach(sso -> { + S3StreamsMetadataImageTest.StreamSetObjectRange streamSetObjectRange = fillRandomStreamRangeInfo(streamId, random, maxStreamPerSSO, sso); + ranges.put(sso.objectId(), streamSetObjectRange.sfrs); + s3StreamSetObjectDeltaList.add(streamSetObjectRange.s3StreamSetObject); + }); + } + + NodeS3StreamSetObjectMetadataImage image = + new NodeS3StreamSetObjectMetadataImage(nodeId, 2, s3StreamSetObjectDeltaList); + + nodeMetadataMap.put(nodeId, image); + }); + + + TimelineHashMap> partition2streams = + new TimelineHashMap<>(ref.registry(), 10000); + + TimelineHashMap stream2partition = + new TimelineHashMap<>(ref.registry(), 10000); + + TimelineHashMap streamEndOffsets = + new TimelineHashMap<>(ref.registry(), 10000); + + streamEndOffsets.put(streamId, currentOffset); + + + S3StreamsMetadataImage image = new S3StreamsMetadataImage(streamId + 1, ref, streamMetadataImageTimelineHashMap, + nodeMetadataMap, partition2streams, stream2partition, streamEndOffsets); + + GeneratedStreamMetadata generatedStreamMetadata = new GeneratedStreamMetadata(random, streamObjects, finalNodeData, ranges); + + LOGGER.info("total stream object num: {}", streamObjects.size()); + LOGGER.info("total stream set object num: {}", notStreamNodeObjectsMap.values().stream().mapToInt(List::size).sum() + + nodeObjectsMap.values().stream().mapToInt(List::size).sum()); + LOGGER.info("total range num: {}", rangeMetadatas.size()); + LOGGER.info("node2sso num: "); + + nodeObjectsMap.entrySet().forEach(entry -> { + LOGGER.info("nodeId: {}", entry.getKey()); + LOGGER.info("sso num: {}", entry.getValue().size() + notStreamNodeObjectsMap.getOrDefault(entry.getKey(), List.of()).size()); + }); + return new S3StreamsMetadataImageTest.GeneratorResult(randomSeed, image, generatedStreamMetadata); + } + + public static List getS3ObjectMetadata(long streamId, GeneratedStreamMetadata metadata) { + List allObjects = new ArrayList<>(); + for (S3StreamObject so : metadata.getStreamObjects()) { + List ranges = Collections.singletonList(new StreamOffsetRange(so.streamId(), so.startOffset(), so.endOffset())); + allObjects.add(new S3ObjectMetadata(so.objectId(), S3ObjectType.STREAM, ranges, + S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_OBJECT_SIZE, + S3StreamConstant.INVALID_ORDER_ID)); + } + for (Map.Entry entry : metadata.getNodeData().entrySet()) { + for (S3StreamSetObject sso : entry.getValue().getStreamSetObjects()) { + allObjects.add(new S3ObjectMetadata(sso.objectId(), S3ObjectType.STREAM_SET, find(metadata.getSsoRanges().get(sso.objectId()), streamId), + sso.dataTimeInMs(), sso.dataTimeInMs(), sso.orderId(), sso.nodeId())); + } + } + return allObjects; + } + + public static List find(List list, long streamId) { + return list.stream().filter(r -> r.streamId() == streamId).collect(Collectors.toList()); + } + + public static class StreamSetObjectRange { + S3StreamSetObject s3StreamSetObject; + List sfrs; + + public StreamSetObjectRange(S3StreamSetObject s3StreamSetObject, List sfrs) { + this.s3StreamSetObject = s3StreamSetObject; + this.sfrs = sfrs; + } + } + + public static S3StreamsMetadataImageTest.StreamSetObjectRange fillRandomStreamRangeInfo(long streamId, Random r, int maxStreamPerSso, S3StreamSetObject s3StreamSetObject) { + S3StreamSetObject object = new S3StreamSetObject(s3StreamSetObject.objectId(), s3StreamSetObject.nodeId(), Bytes.EMPTY, s3StreamSetObject.orderId(), s3StreamSetObject.dataTimeInMs()); + + Set generatedStream = new HashSet<>(); + generatedStream.add(streamId); + List sfrs = new ArrayList<>(s3StreamSetObject.offsetRangeList()); + for (int i = 0; i < maxStreamPerSso; i++) { + long startOffset = Math.abs(r.nextLong()); + long gStreamId; + do { + gStreamId = Math.abs(r.nextInt((int) streamId * 2)); + } while (generatedStream.contains(gStreamId)); + sfrs.add(new StreamOffsetRange(gStreamId, startOffset, + startOffset + Math.abs(r.nextInt(101024)))); + } + + Collections.sort(sfrs); + + return new S3StreamsMetadataImageTest.StreamSetObjectRange(object, sfrs); + } + + public static void check(long now, List seedList, int startOffset, int endOffset, List allObjects, InRangeObjects inRangeObjects, long costMs) { + LOGGER.info("=====check=========================="); + LOGGER.info("seed is {}", now); + LOGGER.info("seedList is seedList: {}", seedList); + LOGGER.info("startOffset is {}", startOffset); + LOGGER.info("endOffset is {}", endOffset); + LOGGER.info("cost={}ms", costMs); + try { + Preconditions.checkArgument(allObjects.size() == inRangeObjects.objects().size(), "result size not equal: allObjects: " + allObjects.size() + ", result: " + inRangeObjects.objects().size()); + + for (int i = 0; i < allObjects.size(); i++) { + S3ObjectMetadata metadata = allObjects.get(i); + S3ObjectMetadata metadata1 = inRangeObjects.objects().get(i); + + Assertions.assertEquals(metadata.objectId(), metadata1.objectId(), "objectId not equal at " + i); + Assertions.assertEquals(metadata.getType(), metadata1.getType(), "object Type not equal at " + i); + Assertions.assertEquals(metadata.startOffset(), metadata1.startOffset(), "startOffset not equal at " + i); + Assertions.assertEquals(metadata.endOffset(), metadata1.endOffset(), "endOffset not equal at " + i); + } + } catch (IllegalArgumentException e) { + Assertions.fail(e); + } + + } + + public static List slice(List allObjects, long startOffset, long endOffset, int limit) { + ArrayList ans = new ArrayList<>(); + for (S3ObjectMetadata object : allObjects) { + if (object.endOffset() > startOffset && object.startOffset() < endOffset && ans.size() < limit) { + ans.add(object); + } + } + + return ans; + } + + @Test + public void testGetObjectsResult() { + Random random = new Random(); + ArrayList seedList = new ArrayList<>(); + StreamSetObjectRangeIndex.ENABLED = true; + + + StreamSetObjectRangeIndex.getInstance().clear(); + + StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER.clear(); + long STREAM_ID = 420000L; + long TOTAL_STREAM_LENGTH = 50000L; + int NUMBER_OF_NODES = 10; + int MAX_SEGMENT_SIZE = 100; + double STREAM_SET_OBJECT_PROBABILITY = 0.7; + double NODE_MIGRATION_PROBABILITY = 0.2; + double SSONotContainsStreamProbability = 0.5; + + long now = System.currentTimeMillis(); + seedList.add(now); + random.setSeed(now); + + S3StreamsMetadataImageTest.GeneratorResult generatorResult = generate(now, random, + STREAM_ID, TOTAL_STREAM_LENGTH, NUMBER_OF_NODES, MAX_SEGMENT_SIZE, + STREAM_SET_OBJECT_PROBABILITY, NODE_MIGRATION_PROBABILITY, + 10000, SSONotContainsStreamProbability + ); + + List allObjects = getS3ObjectMetadata(STREAM_ID, generatorResult.generatedStreamMetadata); + + allObjects.sort(Comparator.comparingLong(S3ObjectMetadata::startOffset)); + + List originResult = allObjects; + + CompletableFuture objects = null; + objects = generatorResult.image.getObjects(STREAM_ID, 0, TOTAL_STREAM_LENGTH, Integer.MAX_VALUE, generatorResult.generatedStreamMetadata); + InRangeObjects inRangeObjects = null; + try { + inRangeObjects = objects.get(); + } catch (Exception e) { + LOGGER.error("seed is {}", now); + LOGGER.error("seedList is seedList: {}", seedList); + Assertions.fail(e); + } + System.out.println(generatorResult.generatedStreamMetadata.getObjectsContext.dumpStatistics()); + + check(now, seedList, 0, (int) TOTAL_STREAM_LENGTH, originResult, inRangeObjects, 0); + + + int limit = 4; + for (int i = 0; i < 1000; i++) { + long startMs = System.nanoTime(); + int startOffset = random.nextInt((int) TOTAL_STREAM_LENGTH); + int endOffset = random.nextInt(startOffset, (int) TOTAL_STREAM_LENGTH); + + objects = generatorResult.image.getObjects(STREAM_ID, startOffset, endOffset, limit, generatorResult.generatedStreamMetadata); + + try { + inRangeObjects = objects.get(); + } catch (Exception e) { + LOGGER.error("seed is {}", now); + LOGGER.error("seedList is seedList: {}", seedList); + Assertions.fail(e); + } + + long costMs = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startMs, TimeUnit.NANOSECONDS); + check(now, seedList, startOffset, endOffset, slice(originResult, startOffset, endOffset, limit), inRangeObjects, costMs); + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 928689b177..c24d5cdbe8 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -34,6 +34,8 @@ import com.automq.stream.s3.exceptions.ObjectNotExistException; import com.automq.stream.s3.index.LocalStreamRangeIndexCache; import com.automq.stream.s3.index.NodeRangeIndexCache; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; +import com.google.common.annotations.VisibleForTesting; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; @@ -59,7 +61,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import io.netty.buffer.ByteBuf; @@ -146,9 +150,10 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } + @VisibleForTesting public CompletableFuture getObjects(long streamId, long startOffset, long endOffset, int limit, RangeGetter rangeGetter) { - return getObjects(streamId, startOffset, endOffset, limit, rangeGetter, null); + return getObjects(streamId, startOffset, endOffset, limit, rangeGetter, LocalStreamRangeIndexCache.create()); } /** @@ -166,6 +171,7 @@ public CompletableFuture getObjects(long streamId, long startOff long startTimeNanos = System.nanoTime(); GetObjectsContext ctx = new GetObjectsContext(streamId, startOffset, endOffset, limit, rangeGetter, indexCache); try { + rangeGetter.attachGetObjectsContext(ctx); getObjects0(ctx); } catch (Throwable e) { ctx.cf.completeExceptionally(e); @@ -181,6 +187,16 @@ public CompletableFuture getObjects(long streamId, long startOff return ctx.cf; } + private boolean readEndOffset(long streamId, long endOffset) { + boolean readEndOffset = endOffset == -1L; + OptionalLong optionalLong = streamEndOffset(streamId); + if (optionalLong.isPresent()) { + readEndOffset = readEndOffset || optionalLong.getAsLong() == endOffset; + } + + return readEndOffset; + } + @SuppressWarnings({"checkstyle:cyclomaticcomplexity", "checkstyle:npathcomplexity"}) void getObjects0(GetObjectsContext ctx) { long streamId = ctx.streamId; @@ -198,6 +214,8 @@ void getObjects0(GetObjectsContext ctx) { } List objects = new LinkedList<>(); + ctx.readEndOffset = readEndOffset(streamId, endOffset); + // floor value < 0 means that all stream objects' ranges are greater than startOffset int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset)); @@ -206,7 +224,7 @@ void getObjects0(GetObjectsContext ctx) { int lastRangeIndex = -1; int streamSetObjectIndex = 0; fillObjects(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, streamSetObjectIndex, - null, null); + null, 0, null); } private void fillObjects( @@ -218,10 +236,11 @@ private void fillObjects( List streamObjects, int streamSetObjectIndex, List streamSetObjects, + int loadedStreamSetObjectIndex, NodeS3StreamSetObjectMetadataImage node ) { exec(() -> fillObjects0(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, - streamSetObjectIndex, streamSetObjects, node), ctx.cf, LOGGER, "fillObjects"); + streamSetObjectIndex, streamSetObjects, loadedStreamSetObjectIndex, node), ctx.cf, LOGGER, "fillObjects"); } void fillObjects0( @@ -233,9 +252,10 @@ void fillObjects0( List streamObjects, int streamSetObjectIndex, List streamSetObjects, + int loadedStreamSetObjectIndex, NodeS3StreamSetObjectMetadataImage node ) { - ctx.record(objects.size(), lastRangeIndex, streamObjectIndex, streamSetObjectIndex); + ctx.record(objects.size(), lastRangeIndex, streamObjectIndex, streamSetObjectIndex, loadedStreamSetObjectIndex, node); long nextStartOffset = ctx.nextStartOffset; boolean firstTimeSearchInSSO = true; int finalStartSearchIndex = streamSetObjectIndex; @@ -244,6 +264,7 @@ void fillObjects0( // try to find consistent stream objects for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) { + ctx.checkSOCount++; S3StreamObject streamObject = streamObjects.get(streamObjectIndex); if (streamObject.startOffset() != nextStartOffset) { //noinspection StatementWithEmptyBody @@ -266,6 +287,7 @@ void fillObjects0( } if (streamSetObjects == null) { + ctx.searchNodeCount++; int rangeIndex = stream.getRangeContainsOffset(nextStartOffset); // 1. can not find the range containing nextStartOffset, or // 2. the range is the same as the last one, which means the nextStartOffset does not move on. @@ -289,6 +311,7 @@ void fillObjects0( final int finalStreamObjectIndex = streamObjectIndex; final List finalStreamSetObjects = streamSetObjects; final NodeS3StreamSetObjectMetadataImage finalNode = node; + final long nodeId = range.nodeId(); startSearchIndexCf.whenComplete((index, ex) -> { if (ex != null) { if (!(FutureUtil.cause(ex) instanceof ObjectNotExistException)) { @@ -298,20 +321,27 @@ void fillObjects0( } // load stream set object index int finalIndex = index; - loadStreamSetObjectInfo(ctx, finalStreamSetObjects, index).thenAccept(v -> { - ctx.nextStartOffset = finalNextStartOffset; - fillObjects(ctx, stream, objects, finalLastRangeIndex, finalStreamObjectIndex, streamObjects, - finalIndex, finalStreamSetObjects, finalNode); - }).exceptionally(exception -> { - ctx.cf.completeExceptionally(exception); - return null; - }); + if (finalIndex != 0) { + ctx.ssoIndexHelpCount++; + } else { + ctx.ssoNoIndex++; + } + long startTimeNs = System.nanoTime(); + loadStreamSetObjectInfo(ctx, nodeId, finalStreamSetObjects, index, ctx.loadStreamSetObjectInfo / 5 + 5) + .thenAccept(finalEnsureLoadedIndexExclusive -> { + ctx.nextStartOffset = finalNextStartOffset; + ctx.waitLoadSSOCostMs.add(TimerUtil.timeElapsedSince(startTimeNs, TimeUnit.NANOSECONDS)); + fillObjects(ctx, stream, objects, finalLastRangeIndex, finalStreamObjectIndex, streamObjects, finalIndex, finalStreamSetObjects, finalEnsureLoadedIndexExclusive, finalNode); + }).exceptionally(exception -> { + ctx.cf.completeExceptionally(exception); + return null; + }); }); return; } - final int streamSetObjectsSize = streamSetObjects.size(); - for (; streamSetObjectIndex < streamSetObjectsSize; streamSetObjectIndex++) { + for (; streamSetObjectIndex < loadedStreamSetObjectIndex; streamSetObjectIndex++) { + ctx.checkSSOCount++; S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); StreamOffsetRange streamOffsetRange = findStreamInStreamSetObject(ctx, streamSetObject).orElse(null); // skip the stream set object not containing the stream or the range is before the nextStartOffset @@ -340,8 +370,46 @@ void fillObjects0( break; } } - // case 1. streamSetObjectIndex >= streamSetObjects.size(), which means we have reached the end of the stream set objects. - // case 2. objects.size() == roundStartObjectSize, which means we have not found any new object in this round. + + if (streamSetObjectIndex == loadedStreamSetObjectIndex && loadedStreamSetObjectIndex < streamSetObjects.size()) { + // check if index can fast skip + CompletableFuture startSearchIndexCf = getStartSearchIndex(node, nextStartOffset, ctx); + + NodeS3StreamSetObjectMetadataImage finalNode1 = node; + List finalStreamSetObjects1 = streamSetObjects; + long finalNextStartOffset1 = nextStartOffset; + int finalLastRangeIndex1 = lastRangeIndex; + int finalStreamObjectIndex1 = streamObjectIndex; + + startSearchIndexCf.whenComplete((index, ex) -> { + if (ex != null) { + if (!(FutureUtil.cause(ex) instanceof ObjectNotExistException)) { + LOGGER.error("Failed to get start search index", ex); + } + index = 0; + } + + if (index > loadedStreamSetObjectIndex) { + ctx.fastSkipIndexCount++; + } + + // load stream set object index + int finalIndex = Math.max(index, loadedStreamSetObjectIndex); + long startTimeNs = System.nanoTime(); + loadStreamSetObjectInfo(ctx, finalNode1.getNodeId(), finalStreamSetObjects1, finalIndex, ctx.loadStreamSetObjectInfo / 5 + 5) + .thenAccept(newLoadedIndexExclusive -> { + ctx.nextStartOffset = finalNextStartOffset1; + ctx.waitLoadSSOCostMs.add(TimerUtil.timeElapsedSince(startTimeNs, TimeUnit.NANOSECONDS)); + fillObjects(ctx, stream, objects, finalLastRangeIndex1, finalStreamObjectIndex1, streamObjects, + finalIndex, finalStreamSetObjects1, newLoadedIndexExclusive, finalNode1); + }).exceptionally(exception -> { + ctx.cf.completeExceptionally(exception); + return null; + }); + }); + return; + } + if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) { // move to the next range // This can ensure that we can break the loop. @@ -420,11 +488,7 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet if (node == null) { return CompletableFuture.completedFuture(0); } - // search in compact cache first - int index = node.floorStreamSetObjectIndex(ctx.streamId, startOffset); - if (index > 0) { - return CompletableFuture.completedFuture(index); - } + // search in sparse index ctx.isFromSparseIndex = true; return getStartStreamSetObjectId(node.getNodeId(), startOffset, ctx) @@ -432,7 +496,11 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet int startIndex = -1; if (objectId >= 0) { startIndex = findStartSearchIndex(objectId, node.orderList()); - MetadataStats.getInstance().getRangeIndexHitCountStats().add(MetricsLevel.INFO, 1); + if (startIndex < 0) { + StreamSetObjectRangeIndex.getInstance().invalid(node.getNodeId(), ctx.streamId, startOffset, objectId); + } else { + MetadataStats.getInstance().getRangeIndexHitCountStats().add(MetricsLevel.INFO, 1); + } } else { MetadataStats.getInstance().getRangeIndexMissCountStats().add(MetricsLevel.INFO, 1); } @@ -457,6 +525,10 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet * should be invalidated, so we can refresh the index from object storage next time */ private CompletableFuture getStartStreamSetObjectId(int nodeId, long startOffset, GetObjectsContext ctx) { + if (StreamSetObjectRangeIndex.ENABLED) { + return StreamSetObjectRangeIndex.getInstance().searchObjectId(nodeId, ctx.streamId, startOffset); + } + if (ctx.indexCache != null && ctx.indexCache.nodeId() == nodeId) { return ctx.indexCache.searchObjectId(ctx.streamId, startOffset); } @@ -474,14 +546,22 @@ private CompletableFuture getStartStreamSetObjectId(int nodeId, long start /** * Load the stream set object range info is missing * - * @return async load + * @return async ensure loaded stream set object index (exclusive) */ - private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, - List streamSetObjects, - int startSearchIndex) { + private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, + long nodeId, + List streamSetObjects, + int startSearchIndex, + int maxWaitInfoNum + ) { return exec(() -> { + ctx.loadStreamSetObjectInfo++; final int streamSetObjectsSize = streamSetObjects.size(); List> loadIndexCfList = new LinkedList<>(); + + int currentWaitNum = maxWaitInfoNum; + int lastLoadedIndex = startSearchIndex; + for (int i = startSearchIndex; i < streamSetObjectsSize; i++) { S3StreamSetObject streamSetObject = streamSetObjects.get(i); if (streamSetObject.ranges().length != 0) { @@ -490,19 +570,108 @@ private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, if (ctx.object2range.containsKey(streamSetObject.objectId())) { continue; } - loadIndexCfList.add( - ctx.rangeGetter - .find(streamSetObject.objectId(), ctx.streamId) - .thenAccept(range -> ctx.object2range.put(streamSetObject.objectId(), range)) - ); + + CompletableFuture cf = ctx.rangeGetter + .find(streamSetObject.objectId(), ctx.streamId, nodeId, streamSetObject.orderId()) + .thenAccept(range -> ctx.object2range.put(streamSetObject.objectId(), range)); + + if (!cf.isDone() && currentWaitNum > 0) { + ctx.waitSSOLoadCount++; + currentWaitNum--; + } + + loadIndexCfList.add(cf); + + if (currentWaitNum > 0) { + lastLoadedIndex = i; + } else { + break; + } } + if (loadIndexCfList.isEmpty()) { - return CompletableFuture.completedFuture(null); + // not need to load any index + return CompletableFuture.completedFuture(streamSetObjectsSize); } - return CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0])); + + if (ctx.readEndOffset) { + preLoadEndOffsetStreamSetObject(ctx, nodeId, streamSetObjects); + } else { + preLoadMiddleOffsetStreamSetObject(ctx, nodeId, streamSetObjects, lastLoadedIndex, maxWaitInfoNum); + } + + int finalLastLoadedIndex = lastLoadedIndex; + return CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0])) + .thenApply(v -> { + return finalLastLoadedIndex + 1; + }); }, LOGGER, "loadStreamSetObjectInfo"); } + private void preLoadMiddleOffsetStreamSetObject(GetObjectsContext ctx, + long nodeId, + List streamSetObjects, + int lastLoadedIndex, + int maxWaitInfoNum) { + int streamSetObjectsSize = streamSetObjects.size(); + int remainingObjects = streamSetObjectsSize - lastLoadedIndex; + if (!ctx.readEndOffset && ctx.maxPreLoadSSORound < 5 && remainingObjects > maxWaitInfoNum) { + double preLoadStep = (double) remainingObjects / (5 - ctx.maxPreLoadSSORound); + + List preLoadPos = new ArrayList<>(); + + for (int i = 1; i <= 4; i++) { + int loadIndex = lastLoadedIndex + (int) (preLoadStep * i); + + if (loadIndex <= lastLoadedIndex || loadIndex >= streamSetObjectsSize) { + continue; + } + + preLoadPos.add(loadIndex); + } + + preLoadStreamSetObject(ctx, nodeId, streamSetObjects, preLoadPos); + + ctx.maxPreLoadSSORound++; + } + } + + private void preLoadStreamSetObject(GetObjectsContext ctx, long nodeId, List streamSetObjects, List indexProvider) { + indexProvider.forEach(index -> { + S3StreamSetObject streamSetObject = streamSetObjects.get(index); + if (streamSetObject.ranges().length != 0) { + return; + } + if (ctx.object2range.containsKey(streamSetObject.objectId())) { + return; + } + + ctx.preLoadSSOCount++; + ctx.rangeGetter + .find(streamSetObject.objectId(), ctx.streamId, nodeId, streamSetObject.orderId()) + .thenAccept(range -> { + ctx.object2range.put(streamSetObject.objectId(), range); + PreLoadHistory preLoadHistory = new PreLoadHistory(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ctx.startTime), nodeId, index, range.orElse(null)); + ctx.recordPreload(preLoadHistory); + }); + }); + } + + private void preLoadEndOffsetStreamSetObject(GetObjectsContext ctx, long nodeId, List streamSetObjects) { + int lastLoadedCount = ctx.lastObjectLoaded(nodeId); + int streamSetObjectsSize = streamSetObjects.size(); + if (ctx.readEndOffset && lastLoadedCount < 5) { + ctx.incLastObjectLoaded(nodeId); + + List preLoadPos = new ArrayList<>(); + for (int i = streamSetObjectsSize - 5 * lastLoadedCount; i < streamSetObjectsSize - 5 && i >= 0; i++) { + preLoadPos.add(i); + } + + preLoadStreamSetObject(ctx, nodeId, streamSetObjects, preLoadPos); + } + } + private Optional findStreamInStreamSetObject(GetObjectsContext ctx, S3StreamSetObject object) { if (object.ranges().length == 0) { return ctx.object2range.get(object.objectId()); @@ -702,19 +871,130 @@ public ReferenceCounted touch(Object o) { return this; } - static class GetObjectsContext { + public static class GetObjectDebugInfo { + private final long nextStartOffset; + private final int objectSize; + private final int lastRangeIndex; + private final int streamObjectIndex; + private final int streamSetObjectIndex; + private final int loadedStreamSetObjectIndex; + private final int ssoSize; + private final int nodeId; + private final long id; + + public GetObjectDebugInfo(long id, long nextStartOffset, + int objectSize, + int lastRangeIndex, + int streamObjectIndex, + int streamSetObjectIndex, + int loadedStreamSetObjectIndex, + int ssoSize, + int nodeId) { + this.id = id; + this.nextStartOffset = nextStartOffset; + this.objectSize = objectSize; + this.lastRangeIndex = lastRangeIndex; + this.streamObjectIndex = streamObjectIndex; + this.streamSetObjectIndex = streamSetObjectIndex; + this.loadedStreamSetObjectIndex = loadedStreamSetObjectIndex; + this.ssoSize = ssoSize; + this.nodeId = nodeId; + } + + @Override + public String toString() { + return "GetObjectDebugInfo{" + + "id=" + id + + ", nextStartOffset=" + nextStartOffset + + ", objectSize=" + objectSize + + ", lastRangeIndex=" + lastRangeIndex + + ", streamObjectIndex=" + streamObjectIndex + + ", streamSetObjectIndex=" + streamSetObjectIndex + + ", loadedStreamSetObjectIndex=" + loadedStreamSetObjectIndex + + ", ssoSize=" + ssoSize + + ", nodeId=" + nodeId + + '}'; + } + } + + public static class PreLoadHistory { + long nodeId; + long index; + StreamOffsetRange streamOffsetRange; + long id; + + public PreLoadHistory(long id, long nodeId, long index, StreamOffsetRange streamOffsetRange) { + this.id = id; + this.nodeId = nodeId; + this.index = index; + this.streamOffsetRange = streamOffsetRange; + } + + @Override + public String toString() { + return "PreLoadHistory{" + + "id=" + id + + ", nodeId=" + nodeId + + ", index=" + index + + ", streamOffsetRange=" + streamOffsetRange + + '}'; + } + } + + public static class GetObjectsContext { final long streamId; + public List range; + boolean readEndOffset; long startOffset; long nextStartOffset; long endOffset; int limit; boolean isFromSparseIndex; + int maxPreLoadSSORound = 1; RangeGetter rangeGetter; LocalStreamRangeIndexCache indexCache; CompletableFuture cf = new CompletableFuture<>(); Map> object2range = new ConcurrentHashMap<>(); - List debugContext = new ArrayList<>(); + List debugContext = new ArrayList<>(); + final ConcurrentLinkedQueue preloadHistory = new ConcurrentLinkedQueue<>(); + + public int checkSOCount; + public int checkSSOCount; + + public int searchNodeCount; + public int searchSSOStreamOffsetRangeCount; + public LongAdder searchSSORangeEmpty = new LongAdder(); + public int bloomFilterSkipSSOCount; + + public int ssoIndexHelpCount; + public int ssoNoIndex; + + public int waitSSOLoadCount; + public LongAdder waitLoadSSOCostMs = new LongAdder(); + public int preLoadSSOCount; + + public int fastSkipIndexCount; + public int loadStreamSetObjectInfo; + + public String dumpStatistics() { + return String.format("GetObjectsContext{streamId=%d, startOffset=%d, nextStartOffset=%d, endOffset=%d, limit=%d,%n" + + "checkSOCount=%d, checkSSOCount=%d, searchNodeCount=%d, searchSSOStreamOffsetRangeCount=%d, searchSSORangeEmpty=%d,%n" + + "bloomFilterSkipCount=%d, ssoNoIndex=%d, ssoIndexHelpCount=%d, waitSSOLoadCount=%d, waitLoadSSOCostMs=%d%n" + + "loadStreamSetObjectInfo=%d, preLoadSSOCount=%d, fastSkipIndexCount=%d}", + streamId, startOffset, nextStartOffset, endOffset, limit, + checkSOCount, checkSSOCount, + searchNodeCount, searchSSOStreamOffsetRangeCount, searchSSORangeEmpty.sum(), + bloomFilterSkipSSOCount, ssoNoIndex, + ssoIndexHelpCount, + waitSSOLoadCount, + TimeUnit.MILLISECONDS.convert(waitLoadSSOCostMs.sum(), TimeUnit.NANOSECONDS), + loadStreamSetObjectInfo, + preLoadSSOCount, + fastSkipIndexCount); + } + + private final long startTime = System.nanoTime(); GetObjectsContext(long streamId, long startOffset, long endOffset, int limit, RangeGetter rangeGetter, LocalStreamRangeIndexCache indexCache) { @@ -726,22 +1006,78 @@ static class GetObjectsContext { this.rangeGetter = rangeGetter; this.indexCache = indexCache; this.isFromSparseIndex = false; + cf.whenComplete((v, e) -> { + long cost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + + if (cost > 500 || fastSkipIndexCount > 3) { + LOGGER.info("get objects streamId={}, startOffset={}, endOffset={}, limit={} cost={}ms readEndOffset={}, debugContext={} \n" + + "ranges={}\n" + + "preload={}\n" + + "Metrics: checkSOCount={}, checkSSOCount={}, searchNodeCount={}, searchSSORangeCount={}, \" +\n" + + "searchSSORangeEmpty={}, bloomFilterSkipSSOCount={}, ssoIndexHelpCount={}, \" +\n" + + "ssoNoIndex={}, waitSSOLoadCount={}, waitLoadSSOCostMs={}, preLoadSSOCount={}, \" +\n" + + "fastSkipIndexCount={}, loadStreamSetObjectInfo={}", + streamId, startOffset, endOffset, limit, cost, readEndOffset, debugContext, + range, + this.preloadHistory, + this.checkSOCount, this.checkSSOCount, this.searchNodeCount, this.searchSSOStreamOffsetRangeCount, + this.searchSSORangeEmpty.sum(), this.bloomFilterSkipSSOCount, this.ssoIndexHelpCount, + this.ssoNoIndex, this.waitSSOLoadCount, cost, + this.preLoadSSOCount, this.fastSkipIndexCount, this.loadStreamSetObjectInfo); + } + }); + } + + public List getDebugContext() { + return debugContext; } - public void record(int objectSize, int lastRangeIndex, int streamObjectIndex, int streamSetObjectIndex) { - debugContext.add(String.format("nextStartOffset=%d, objectSize=%d, lastRangeIndex=%d, streamObjectIndex=%d, streamSetObjectIndex=%d", nextStartOffset, - objectSize, lastRangeIndex, streamObjectIndex, streamSetObjectIndex)); - if (debugContext.size() > 20) { + public void recordPreload(PreLoadHistory history) { + this.preloadHistory.add(history); + } + + public void recordRange(List range) { + this.range = range; + } + + public void record(int objectSize, int lastRangeIndex, int streamObjectIndex, int streamSetObjectIndex, int loadedStreamSetObjectIndex, NodeS3StreamSetObjectMetadataImage node) { + int nodeId = -1; + int ssoSize = -1; + if (node != null) { + nodeId = node.getNodeId(); + ssoSize = node.orderList().size(); + } + + debugContext.add(new GetObjectDebugInfo(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.startTime), nextStartOffset, + objectSize, + lastRangeIndex, + streamObjectIndex, + streamSetObjectIndex, + loadedStreamSetObjectIndex, + ssoSize, + nodeId)); + + if (debugContext.size() > 500) { LOGGER.error("GetObjects may has endless loop: streamId={}, startOffset={}, endOffset={}, limit={}, debugContext={}", streamId, startOffset, endOffset, limit, debugContext); Runtime.getRuntime().halt(1); } } + + private final ConcurrentHashMap lastObjectLoadedSet = new ConcurrentHashMap<>(); + + public int lastObjectLoaded(long nodeId) { + return lastObjectLoadedSet.getOrDefault(nodeId, 1); + } + + public void incLastObjectLoaded(long nodeId) { + this.lastObjectLoadedSet.put(nodeId, lastObjectLoadedSet.getOrDefault(nodeId, 1) + 1); + } } public interface RangeGetter { - CompletableFuture> find(long objectId, long streamId); - + void attachGetObjectsContext(GetObjectsContext ctx); + CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId); CompletableFuture readNodeRangeIndex(long nodeId); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 0085585670..4f6c426cb3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -99,7 +99,12 @@ public class S3StreamsMetadataImageTest { private static final long GB = 1024 * MB; private final RangeGetter defaultRangeGetter = new RangeGetter() { @Override - public CompletableFuture> find(long objectId, long streamId) { + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { return FutureUtil.failedFuture(new UnsupportedOperationException()); } @@ -188,7 +193,12 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { private RangeGetter buildMemoryRangeGetter() { return new RangeGetter() { @Override - public CompletableFuture> find(long objectId, long streamId) { + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { if (objectId == 0) { return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 100L, 120L))); } else if (objectId == 1) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java new file mode 100644 index 0000000000..98cc1ee9a1 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java @@ -0,0 +1,186 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.s3.index.lazy; + +import com.automq.stream.s3.index.NodeRangeIndexCache; +import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.utils.ThreadUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.Striped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +public class StreamSetObjectRangeIndex { + public static boolean ENABLED = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); + private static final Logger LOGGER = LoggerFactory.getLogger(StreamSetObjectRangeIndex.class); + private static volatile StreamSetObjectRangeIndex instance = null; + + public static final ExecutorService UPDATE_INDEX_THREAD_POOL = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("StreamSetObjectRangeIndex", true)); + + private final Object DUMMAY_OBJECT = new Object(); + private final ConcurrentHashMap> streamOffsetIndexMap = + new ConcurrentHashMap<>(10240); + private final Cache expireCache; + + private final Cache lastReaderUpdateTime = CacheBuilder.newBuilder() + .maximumSize(20000) + .expireAfterWrite(Duration.ofMinutes(1)) + .build(); + + private final Striped lock = Striped.lock(64); + + public StreamSetObjectRangeIndex(int maxSize, long expireTimeMs, Ticker ticker) { + expireCache = CacheBuilder.newBuilder() + .ticker(ticker) + .maximumSize(maxSize) + .expireAfterWrite(Duration.ofMillis(expireTimeMs)) + .removalListener( notification -> { + if (notification.getKey() != null) { + streamOffsetIndexMap.remove(notification.getKey()); + } + }) + .build(); + } + + public static StreamSetObjectRangeIndex getInstance() { + if (instance == null) { + synchronized (NodeRangeIndexCache.class) { + if (instance == null) { + instance = new StreamSetObjectRangeIndex(20 * 10000, + TimeUnit.MINUTES.toMillis(10), Ticker.systemTicker()); + } + } + } + return instance; + } + + public void updateIndex(Long objectId, Long nodeId, Long streamId, + List streamOffsetRanges) { + if (!ENABLED) { + return; + } + + if (lastReaderUpdateTime.getIfPresent(objectId) != null) { + return; + } else { + lastReaderUpdateTime.put(objectId, System.currentTimeMillis()); + } + + try { + touch(streamId); + updateIndex(objectId, streamOffsetRanges); + } catch (Exception e) { + LOGGER.error("Failed to update index for reader: {}, nodeId: {}, streamId: {}", + objectId, nodeId, streamId, e); + } + } + + public void touch(Long streamId) { + try { + expireCache.get(streamId, () -> DUMMAY_OBJECT); + } catch (Exception ignored) { + + } + } + + @VisibleForTesting + public void clear() { + streamOffsetIndexMap.clear(); + } + + public void updateIndex(Long objectId, List streamOffsetRanges) { + for (StreamOffsetRange streamOffsetRange : streamOffsetRanges) { + Long streamId = streamOffsetRange.streamId(); + Long startOffset = streamOffsetRange.startOffset(); + + withLock(streamId, () -> { + TreeMap offsetMap = streamOffsetIndexMap.computeIfAbsent(streamId, k -> new TreeMap<>()); + offsetMap.put(startOffset, objectId); + touch(streamId); + }); + } + } + + private T withLockReturn(Long streamId, Callable callable) { + Lock l = lock.get(streamId); + try { + l.lock(); + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + l.unlock(); + } + } + + private void withLock(Long streamId, Runnable runnable) { + Lock l = lock.get(streamId); + try { + l.lock(); + runnable.run(); + } finally { + l.unlock(); + } + } + + public void invalid(int nodeId, long streamId, long startOffset, Long objectId) { + TreeMap longLongTreeMap = streamOffsetIndexMap.get(streamId); + if (longLongTreeMap == null) { + return; + } + + withLock(streamId, () -> { + longLongTreeMap.remove(startOffset, objectId); + }); + } + + public CompletableFuture searchObjectId(int nodeId, long streamId, long startOffset) { + TreeMap offsetMap = streamOffsetIndexMap.get(streamId); + if (offsetMap == null) { + return CompletableFuture.completedFuture(-1L); + } + + return withLockReturn(streamId, () -> { + Map.Entry entry = offsetMap.floorEntry(startOffset); + if (entry == null) { + return CompletableFuture.completedFuture(-1L); + } + + touch(streamId); + + return CompletableFuture.completedFuture(entry.getValue()); + }); + } +} From 48c23235cc2b5b6071ae82c6e012b2781640221c Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Mon, 28 Jul 2025 18:01:38 +0800 Subject: [PATCH 2/7] feat(metadata): extract stream range index by lazy load Object fix checkstyle --- .../s3/metadata/StreamMetadataManager.java | 9 +- .../metadata/S3StreamsMetadataImageTest.java | 74 +++++++++-------- .../kafka/image/S3StreamsMetadataImage.java | 82 ++++++++++--------- .../s3/index/LocalStreamRangeIndexCache.java | 2 +- .../index/lazy/StreamSetObjectRangeIndex.java | 9 +- 5 files changed, 94 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index c093116861..a0be028cee 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -16,10 +16,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.log.stream.s3.metadata; -import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; import kafka.server.BrokerServer; import org.apache.kafka.image.MetadataDelta; @@ -36,21 +34,22 @@ import org.apache.kafka.metadata.stream.S3StreamSetObject; import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.cache.LRUCache; import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; import com.automq.stream.s3.index.LocalStreamRangeIndexCache; -import com.automq.stream.s3.cache.LRUCache; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamOffsetRange; -import com.google.common.collect.Sets; import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.s3.operator.ObjectStorage.ReadOptions; import com.automq.stream.s3.streams.StreamMetadataListener; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.Threads; +import com.google.common.collect.Sets; import org.apache.orc.util.BloomFilter; import org.slf4j.Logger; @@ -397,7 +396,7 @@ public synchronized void update(long objectId, List streamOff BloomFilter bloomFilter = new BloomFilter(streamOffsetRanges.size(), DEFAULT_FPP); - streamOffsetRanges.forEach((range) -> bloomFilter.addLong(range.streamId())); + streamOffsetRanges.forEach(range -> bloomFilter.addLong(range.streamId())); cache.put(objectId, bloomFilter); cacheSize += Long.BYTES + bloomFilter.sizeInBytes(); diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java index 3d7881de20..9ba0759c61 100644 --- a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -18,20 +18,6 @@ */ package kafka.log.stream.s3.metadata; -import com.automq.stream.s3.ByteBufAlloc; -import com.automq.stream.s3.DataBlockIndex; -import com.automq.stream.s3.ObjectReader; -import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; -import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; -import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; -import com.automq.stream.s3.metadata.S3ObjectMetadata; -import com.automq.stream.s3.metadata.S3ObjectType; -import com.automq.stream.s3.metadata.S3StreamConstant; -import com.automq.stream.s3.metadata.StreamOffsetRange; -import com.automq.stream.s3.metadata.StreamState; -import com.automq.stream.s3.operator.MemoryObjectStorage; -import com.google.common.base.Preconditions; -import io.netty.buffer.ByteBuf; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.image.DeltaList; @@ -48,6 +34,21 @@ import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3StreamSetObject; import org.apache.kafka.timeline.TimelineHashMap; + +import com.automq.stream.s3.ByteBufAlloc; +import com.automq.stream.s3.DataBlockIndex; +import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; +import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.S3ObjectType; +import com.automq.stream.s3.metadata.S3StreamConstant; +import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.metadata.StreamState; +import com.automq.stream.s3.operator.MemoryObjectStorage; +import com.google.common.base.Preconditions; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -70,6 +71,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import io.netty.buffer.ByteBuf; + @Tag("S3Unit") public class S3StreamsMetadataImageTest { @@ -188,7 +191,7 @@ public Map> getSsoRanges() { return ssoRanges; } - private Object DUMMY_OBJECT = new Object(); + private static final Object DUMMY_OBJECT = new Object(); public ConcurrentHashMap accessed = new ConcurrentHashMap<>(); public S3StreamsMetadataImage.GetObjectsContext getObjectsContext; @@ -238,11 +241,13 @@ public GeneratorResult(long seed, S3StreamsMetadataImage image, GeneratedStreamM } } + @SuppressWarnings("NPathComplexity") public static S3StreamsMetadataImageTest.GeneratorResult generate( long randomSeed, Random random, long streamId, long totalLength, int numNodes, int maxSegmentSize, - double streamSetObjectProbability, double nodeMigrationProbability, int maxStreamPerSSO, double SSOContainsStreamProbability) { + double streamSetObjectProbability, double nodeMigrationProbability, int maxStreamPerSSO, + double streamSetObjectNotContainsStreamProbability) { List streamObjects = new ArrayList<>(); Map> nodeObjectsMap = new HashMap<>(); @@ -260,7 +265,7 @@ public static S3StreamsMetadataImageTest.GeneratorResult generate( RangeMetadata rangeMetadata = new RangeMetadata(streamId, nextEpoch, rangeIndex, currentOffset, currentOffset, currentNodeId); while (currentOffset < totalLength) { - if (random.nextDouble() < SSOContainsStreamProbability) { + if (random.nextDouble() < streamSetObjectNotContainsStreamProbability) { notStreamNodeObjectsMap.computeIfAbsent(currentNodeId, k -> new ArrayList<>()) .add(new S3StreamSetObject(nextObjectId, random.nextInt(numNodes), new ArrayList<>(), nextObjectId)); nextObjectId++; @@ -481,38 +486,37 @@ public static List slice(List allObjects, lo public void testGetObjectsResult() { Random random = new Random(); ArrayList seedList = new ArrayList<>(); - StreamSetObjectRangeIndex.ENABLED = true; - + StreamSetObjectRangeIndex.enabled = true; StreamSetObjectRangeIndex.getInstance().clear(); StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER.clear(); - long STREAM_ID = 420000L; - long TOTAL_STREAM_LENGTH = 50000L; - int NUMBER_OF_NODES = 10; - int MAX_SEGMENT_SIZE = 100; - double STREAM_SET_OBJECT_PROBABILITY = 0.7; - double NODE_MIGRATION_PROBABILITY = 0.2; - double SSONotContainsStreamProbability = 0.5; + long streamId = 420000L; + long totalStreamLength = 50000L; + int numberOfNodes = 10; + int maxSegmentSize = 100; + double streamSetObjectProbability = 0.7; + double nodeMigrationProbability = 0.2; + double streamSetObjectNotContainsStreamProbability = 0.5; long now = System.currentTimeMillis(); seedList.add(now); random.setSeed(now); S3StreamsMetadataImageTest.GeneratorResult generatorResult = generate(now, random, - STREAM_ID, TOTAL_STREAM_LENGTH, NUMBER_OF_NODES, MAX_SEGMENT_SIZE, - STREAM_SET_OBJECT_PROBABILITY, NODE_MIGRATION_PROBABILITY, - 10000, SSONotContainsStreamProbability + streamId, totalStreamLength, numberOfNodes, maxSegmentSize, + streamSetObjectProbability, nodeMigrationProbability, + 10000, streamSetObjectNotContainsStreamProbability ); - List allObjects = getS3ObjectMetadata(STREAM_ID, generatorResult.generatedStreamMetadata); + List allObjects = getS3ObjectMetadata(streamId, generatorResult.generatedStreamMetadata); allObjects.sort(Comparator.comparingLong(S3ObjectMetadata::startOffset)); List originResult = allObjects; CompletableFuture objects = null; - objects = generatorResult.image.getObjects(STREAM_ID, 0, TOTAL_STREAM_LENGTH, Integer.MAX_VALUE, generatorResult.generatedStreamMetadata); + objects = generatorResult.image.getObjects(streamId, 0, totalStreamLength, Integer.MAX_VALUE, generatorResult.generatedStreamMetadata); InRangeObjects inRangeObjects = null; try { inRangeObjects = objects.get(); @@ -523,16 +527,16 @@ public void testGetObjectsResult() { } System.out.println(generatorResult.generatedStreamMetadata.getObjectsContext.dumpStatistics()); - check(now, seedList, 0, (int) TOTAL_STREAM_LENGTH, originResult, inRangeObjects, 0); + check(now, seedList, 0, (int) totalStreamLength, originResult, inRangeObjects, 0); int limit = 4; for (int i = 0; i < 1000; i++) { long startMs = System.nanoTime(); - int startOffset = random.nextInt((int) TOTAL_STREAM_LENGTH); - int endOffset = random.nextInt(startOffset, (int) TOTAL_STREAM_LENGTH); + int startOffset = random.nextInt((int) totalStreamLength); + int endOffset = random.nextInt(startOffset, (int) totalStreamLength); - objects = generatorResult.image.getObjects(STREAM_ID, startOffset, endOffset, limit, generatorResult.generatedStreamMetadata); + objects = generatorResult.image.getObjects(streamId, startOffset, endOffset, limit, generatorResult.generatedStreamMetadata); try { inRangeObjects = objects.get(); diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index c24d5cdbe8..c8d32f6a76 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -35,7 +35,6 @@ import com.automq.stream.s3.index.LocalStreamRangeIndexCache; import com.automq.stream.s3.index.NodeRangeIndexCache; import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; -import com.google.common.annotations.VisibleForTesting; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; @@ -44,6 +43,7 @@ import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.MetadataStats; import com.automq.stream.utils.FutureUtil; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -372,41 +372,8 @@ void fillObjects0( } if (streamSetObjectIndex == loadedStreamSetObjectIndex && loadedStreamSetObjectIndex < streamSetObjects.size()) { - // check if index can fast skip - CompletableFuture startSearchIndexCf = getStartSearchIndex(node, nextStartOffset, ctx); - - NodeS3StreamSetObjectMetadataImage finalNode1 = node; - List finalStreamSetObjects1 = streamSetObjects; - long finalNextStartOffset1 = nextStartOffset; - int finalLastRangeIndex1 = lastRangeIndex; - int finalStreamObjectIndex1 = streamObjectIndex; - - startSearchIndexCf.whenComplete((index, ex) -> { - if (ex != null) { - if (!(FutureUtil.cause(ex) instanceof ObjectNotExistException)) { - LOGGER.error("Failed to get start search index", ex); - } - index = 0; - } - - if (index > loadedStreamSetObjectIndex) { - ctx.fastSkipIndexCount++; - } - - // load stream set object index - int finalIndex = Math.max(index, loadedStreamSetObjectIndex); - long startTimeNs = System.nanoTime(); - loadStreamSetObjectInfo(ctx, finalNode1.getNodeId(), finalStreamSetObjects1, finalIndex, ctx.loadStreamSetObjectInfo / 5 + 5) - .thenAccept(newLoadedIndexExclusive -> { - ctx.nextStartOffset = finalNextStartOffset1; - ctx.waitLoadSSOCostMs.add(TimerUtil.timeElapsedSince(startTimeNs, TimeUnit.NANOSECONDS)); - fillObjects(ctx, stream, objects, finalLastRangeIndex1, finalStreamObjectIndex1, streamObjects, - finalIndex, finalStreamSetObjects1, newLoadedIndexExclusive, finalNode1); - }).exceptionally(exception -> { - ctx.cf.completeExceptionally(exception); - return null; - }); - }); + loadMoreStreamSetObjects(ctx, stream, objects, nextStartOffset, lastRangeIndex, streamSetObjects, + streamObjectIndex, loadedStreamSetObjectIndex, streamObjects, node); return; } @@ -418,6 +385,47 @@ void fillObjects0( } } + private void loadMoreStreamSetObjects(GetObjectsContext ctx, + S3StreamMetadataImage stream, + List objects, + long nextStartOffset, + int lastRangeIndex, + List streamSetObjects, + int streamObjectIndex, + int loadedStreamSetObjectIndex, + List streamObjects, + NodeS3StreamSetObjectMetadataImage node) { + // check if index can fast skip + CompletableFuture startSearchIndexCf = getStartSearchIndex(node, nextStartOffset, ctx); + + startSearchIndexCf.whenComplete((index, ex) -> { + if (ex != null) { + if (!(FutureUtil.cause(ex) instanceof ObjectNotExistException)) { + LOGGER.error("Failed to get start search index", ex); + } + index = 0; + } + + if (index > loadedStreamSetObjectIndex) { + ctx.fastSkipIndexCount++; + } + + // load stream set object index + int finalIndex = Math.max(index, loadedStreamSetObjectIndex); + long startTimeNs = System.nanoTime(); + loadStreamSetObjectInfo(ctx, node.getNodeId(), streamSetObjects, finalIndex, ctx.loadStreamSetObjectInfo / 5 + 5) + .thenAccept(loadedIndexExclusive -> { + ctx.nextStartOffset = nextStartOffset; + ctx.waitLoadSSOCostMs.add(TimerUtil.timeElapsedSince(startTimeNs, TimeUnit.NANOSECONDS)); + fillObjects(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, + finalIndex, streamSetObjects, loadedIndexExclusive, node); + }).exceptionally(exception -> { + ctx.cf.completeExceptionally(exception); + return null; + }); + }); + } + private void completeWithSanityCheck(GetObjectsContext ctx, List objects) { try { sanityCheck(ctx, objects); @@ -525,7 +533,7 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet * should be invalidated, so we can refresh the index from object storage next time */ private CompletableFuture getStartStreamSetObjectId(int nodeId, long startOffset, GetObjectsContext ctx) { - if (StreamSetObjectRangeIndex.ENABLED) { + if (StreamSetObjectRangeIndex.enabled) { return StreamSetObjectRangeIndex.getInstance().searchObjectId(nodeId, ctx.streamId, startOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java b/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java index 89961be160..9b0de4154f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java @@ -81,7 +81,7 @@ public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycl private long lastUploadTime = 0L; private LocalStreamRangeIndexCache() { - + } public static LocalStreamRangeIndexCache create() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java index 98cc1ee9a1..ccf020bb1c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java @@ -26,6 +26,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.Striped; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +43,14 @@ import java.util.concurrent.locks.Lock; public class StreamSetObjectRangeIndex { - public static boolean ENABLED = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); + public static boolean enabled = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); private static final Logger LOGGER = LoggerFactory.getLogger(StreamSetObjectRangeIndex.class); private static volatile StreamSetObjectRangeIndex instance = null; public static final ExecutorService UPDATE_INDEX_THREAD_POOL = Executors.newSingleThreadExecutor( ThreadUtils.createThreadFactory("StreamSetObjectRangeIndex", true)); - private final Object DUMMAY_OBJECT = new Object(); + private static final Object DUMMAY_OBJECT = new Object(); private final ConcurrentHashMap> streamOffsetIndexMap = new ConcurrentHashMap<>(10240); private final Cache expireCache; @@ -66,7 +67,7 @@ public StreamSetObjectRangeIndex(int maxSize, long expireTimeMs, Ticker ticker) .ticker(ticker) .maximumSize(maxSize) .expireAfterWrite(Duration.ofMillis(expireTimeMs)) - .removalListener( notification -> { + .removalListener(notification -> { if (notification.getKey() != null) { streamOffsetIndexMap.remove(notification.getKey()); } @@ -88,7 +89,7 @@ public static StreamSetObjectRangeIndex getInstance() { public void updateIndex(Long objectId, Long nodeId, Long streamId, List streamOffsetRanges) { - if (!ENABLED) { + if (!enabled) { return; } From c6abb790d678dd101a6f109465d9e149eb5f8983 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Mon, 28 Jul 2025 18:40:41 +0800 Subject: [PATCH 3/7] feat(metadata): extract stream range index by lazy load Object fix spotbugs --- .../metadata/S3StreamsMetadataImageTest.java | 41 +++++++++++++++---- .../kafka/image/S3StreamsMetadataImage.java | 2 +- .../index/lazy/StreamSetObjectRangeIndex.java | 4 +- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java index 9ba0759c61..d5ccdfeb47 100644 --- a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -55,6 +55,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -78,7 +80,7 @@ public class S3StreamsMetadataImageTest { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamsMetadataImageTest.class); - static class NodeData { + private static class NodeData { private final List streamSetObjects; public NodeData(int nodeId, List streamSetObjects) { @@ -94,7 +96,7 @@ public List getStreamSetObjects() { } } - static class GeneratedStreamMetadata implements S3StreamsMetadataImage.RangeGetter { + private static class GeneratedStreamMetadata implements S3StreamsMetadataImage.RangeGetter { private final List streamObjects; private final Map nodeData; private final Map> ssoRanges; @@ -192,7 +194,7 @@ public Map> getSsoRanges() { } private static final Object DUMMY_OBJECT = new Object(); - public ConcurrentHashMap accessed = new ConcurrentHashMap<>(); + private ConcurrentHashMap accessed = new ConcurrentHashMap<>(); public S3StreamsMetadataImage.GetObjectsContext getObjectsContext; @@ -317,8 +319,8 @@ public static S3StreamsMetadataImageTest.GeneratorResult generate( } Map finalNodeData = new HashMap<>(); - for (int nodeId : nodeObjectsMap.keySet()) { - finalNodeData.put(nodeId, new NodeData(nodeId, nodeObjectsMap.get(nodeId))); + for (Map.Entry> entry : nodeObjectsMap.entrySet()) { + finalNodeData.put(entry.getKey(), new NodeData(entry.getKey(), entry.getValue())); } RegistryRef ref = new RegistryRef(); @@ -432,13 +434,13 @@ public static S3StreamsMetadataImageTest.StreamSetObjectRange fillRandomStreamRa generatedStream.add(streamId); List sfrs = new ArrayList<>(s3StreamSetObject.offsetRangeList()); for (int i = 0; i < maxStreamPerSso; i++) { - long startOffset = Math.abs(r.nextLong()); + long startOffset = r.nextLong(0, Long.MAX_VALUE / 2); long gStreamId; do { - gStreamId = Math.abs(r.nextInt((int) streamId * 2)); + gStreamId = r.nextInt(0,(int) streamId * 2); } while (generatedStream.contains(gStreamId)); sfrs.add(new StreamOffsetRange(gStreamId, startOffset, - startOffset + Math.abs(r.nextInt(101024)))); + startOffset + r.nextLong(0,1024))); } Collections.sort(sfrs); @@ -482,11 +484,32 @@ public static List slice(List allObjects, lo return ans; } + private static void enableStreamSetObjectRangeIndex() { + try { + Class clazz = StreamSetObjectRangeIndex.class; + // 2. 获取 ENABLED 字段 + Field field = clazz.getDeclaredField("ENABLED"); + + // 3. 确保我们可以访问该字段,即使它是私有的 + field.setAccessible(true); + + // 4. 去掉字段的 final 修饰符 + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + // 5. 设置字段的新值 + field.set(null, true); + } catch (Exception e) { + Assertions.fail(e); + } + } + @Test public void testGetObjectsResult() { Random random = new Random(); ArrayList seedList = new ArrayList<>(); - StreamSetObjectRangeIndex.enabled = true; + enableStreamSetObjectRangeIndex(); StreamSetObjectRangeIndex.getInstance().clear(); diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index c8d32f6a76..34f6cfce4c 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -533,7 +533,7 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet * should be invalidated, so we can refresh the index from object storage next time */ private CompletableFuture getStartStreamSetObjectId(int nodeId, long startOffset, GetObjectsContext ctx) { - if (StreamSetObjectRangeIndex.enabled) { + if (StreamSetObjectRangeIndex.ENABLED) { return StreamSetObjectRangeIndex.getInstance().searchObjectId(nodeId, ctx.streamId, startOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java index ccf020bb1c..ed43a2bc33 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java @@ -43,7 +43,7 @@ import java.util.concurrent.locks.Lock; public class StreamSetObjectRangeIndex { - public static boolean enabled = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); + public static final boolean ENABLED = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); private static final Logger LOGGER = LoggerFactory.getLogger(StreamSetObjectRangeIndex.class); private static volatile StreamSetObjectRangeIndex instance = null; @@ -89,7 +89,7 @@ public static StreamSetObjectRangeIndex getInstance() { public void updateIndex(Long objectId, Long nodeId, Long streamId, List streamOffsetRanges) { - if (!enabled) { + if (!ENABLED) { return; } From 50e74ec93de011b8c74ae1ee458e576b3f3f2e7d Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Mon, 28 Jul 2025 20:10:40 +0800 Subject: [PATCH 4/7] feat(metadata): extract stream range index by lazy load Object fix spotbugs --- .../log/stream/s3/metadata/S3StreamsMetadataImageTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java index d5ccdfeb47..d41871a27c 100644 --- a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -437,10 +437,10 @@ public static S3StreamsMetadataImageTest.StreamSetObjectRange fillRandomStreamRa long startOffset = r.nextLong(0, Long.MAX_VALUE / 2); long gStreamId; do { - gStreamId = r.nextInt(0,(int) streamId * 2); + gStreamId = r.nextInt(0, (int) streamId * 2); } while (generatedStream.contains(gStreamId)); sfrs.add(new StreamOffsetRange(gStreamId, startOffset, - startOffset + r.nextLong(0,1024))); + startOffset + r.nextLong(0, 1024))); } Collections.sort(sfrs); From 2f36aa0749010533b15aa6909c190a44b4565560 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Tue, 29 Jul 2025 10:55:54 +0800 Subject: [PATCH 5/7] feat(metadata): extract stream range index by lazy load Object fix spotbugs --- .../metadata/S3StreamsMetadataImageTest.java | 19 +------------------ .../kafka/image/S3StreamsMetadataImage.java | 2 +- .../index/lazy/StreamSetObjectRangeIndex.java | 10 +++++++++- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java index d41871a27c..fe8fb86a1f 100644 --- a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -485,24 +485,7 @@ public static List slice(List allObjects, lo } private static void enableStreamSetObjectRangeIndex() { - try { - Class clazz = StreamSetObjectRangeIndex.class; - // 2. 获取 ENABLED 字段 - Field field = clazz.getDeclaredField("ENABLED"); - - // 3. 确保我们可以访问该字段,即使它是私有的 - field.setAccessible(true); - - // 4. 去掉字段的 final 修饰符 - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - // 5. 设置字段的新值 - field.set(null, true); - } catch (Exception e) { - Assertions.fail(e); - } + StreamSetObjectRangeIndex.setEnabled(true); } @Test diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 34f6cfce4c..ed6807bafb 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -533,7 +533,7 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet * should be invalidated, so we can refresh the index from object storage next time */ private CompletableFuture getStartStreamSetObjectId(int nodeId, long startOffset, GetObjectsContext ctx) { - if (StreamSetObjectRangeIndex.ENABLED) { + if (StreamSetObjectRangeIndex.isEnabled()) { return StreamSetObjectRangeIndex.getInstance().searchObjectId(nodeId, ctx.streamId, startOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java index ed43a2bc33..d0afe3d68d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java @@ -43,7 +43,7 @@ import java.util.concurrent.locks.Lock; public class StreamSetObjectRangeIndex { - public static final boolean ENABLED = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); + private static boolean ENABLED = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); private static final Logger LOGGER = LoggerFactory.getLogger(StreamSetObjectRangeIndex.class); private static volatile StreamSetObjectRangeIndex instance = null; @@ -87,6 +87,14 @@ public static StreamSetObjectRangeIndex getInstance() { return instance; } + public static boolean isEnabled() { + return ENABLED; + } + + public static void setEnabled(boolean enabled) { + ENABLED = enabled; + } + public void updateIndex(Long objectId, Long nodeId, Long streamId, List streamOffsetRanges) { if (!ENABLED) { From 351c991a85522ab6da06db9cf43b06490d7f92b3 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Wed, 30 Jul 2025 17:08:38 +0800 Subject: [PATCH 6/7] perf(s3stream): avoid S3StreamSetObject objectId long primitive unboxing fix lint --- .../metadata/S3StreamsMetadataImageTest.java | 39 ++++++++++++------- .../index/lazy/StreamSetObjectRangeIndex.java | 8 ++-- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java index fe8fb86a1f..ecf3bc492d 100644 --- a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -55,8 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -338,7 +336,7 @@ public static S3StreamsMetadataImageTest.GeneratorResult generate( TimelineHashMap nodeMetadataMap = new TimelineHashMap<>(ref.registry(), 10000); - Map> ranges = new HashMap<>(); + TreeMap> ranges = new TreeMap<>(); nodeObjectsMap.entrySet().forEach(entry -> { Integer nodeId = entry.getKey(); List objects = entry.getValue(); @@ -427,6 +425,8 @@ public StreamSetObjectRange(S3StreamSetObject s3StreamSetObject, List FILL_STREAM_END_OFFSET = new ConcurrentHashMap<>(); + public static S3StreamsMetadataImageTest.StreamSetObjectRange fillRandomStreamRangeInfo(long streamId, Random r, int maxStreamPerSso, S3StreamSetObject s3StreamSetObject) { S3StreamSetObject object = new S3StreamSetObject(s3StreamSetObject.objectId(), s3StreamSetObject.nodeId(), Bytes.EMPTY, s3StreamSetObject.orderId(), s3StreamSetObject.dataTimeInMs()); @@ -434,13 +434,25 @@ public static S3StreamsMetadataImageTest.StreamSetObjectRange fillRandomStreamRa generatedStream.add(streamId); List sfrs = new ArrayList<>(s3StreamSetObject.offsetRangeList()); for (int i = 0; i < maxStreamPerSso; i++) { - long startOffset = r.nextLong(0, Long.MAX_VALUE / 2); long gStreamId; do { gStreamId = r.nextInt(0, (int) streamId * 2); } while (generatedStream.contains(gStreamId)); - sfrs.add(new StreamOffsetRange(gStreamId, startOffset, - startOffset + r.nextLong(0, 1024))); + + if (FILL_STREAM_END_OFFSET.containsKey(gStreamId)) { + long lastEnd = FILL_STREAM_END_OFFSET.get(gStreamId); + long newStartOffset = r.nextLong(lastEnd, lastEnd + 10240); + long newEndOffset = newStartOffset + r.nextLong(0, newStartOffset); + sfrs.add(new StreamOffsetRange(gStreamId, newStartOffset, newEndOffset)); + FILL_STREAM_END_OFFSET.put(gStreamId, newEndOffset); + } else { + long startOffset = r.nextLong(0, Integer.MAX_VALUE / 4); + long endOffset = startOffset + r.nextLong(0, 10240); + FILL_STREAM_END_OFFSET.put(gStreamId, endOffset); + sfrs.add(new StreamOffsetRange(gStreamId, startOffset, + endOffset)); + } + } Collections.sort(sfrs); @@ -489,7 +501,7 @@ private static void enableStreamSetObjectRangeIndex() { } @Test - public void testGetObjectsResult() { + public void testGetObjectsResult() throws InterruptedException { Random random = new Random(); ArrayList seedList = new ArrayList<>(); enableStreamSetObjectRangeIndex(); @@ -498,12 +510,13 @@ public void testGetObjectsResult() { StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER.clear(); long streamId = 420000L; - long totalStreamLength = 50000L; + long totalStreamLength = 2000L; int numberOfNodes = 10; - int maxSegmentSize = 100; - double streamSetObjectProbability = 0.7; - double nodeMigrationProbability = 0.2; - double streamSetObjectNotContainsStreamProbability = 0.5; + int maxSegmentSize = 200; + int maxStreamPerStreamSetObject = 5; + double streamSetObjectProbability = 0.8; + double nodeMigrationProbability = 0.01; + double streamSetObjectNotContainsStreamProbability = 0.8; long now = System.currentTimeMillis(); seedList.add(now); @@ -512,7 +525,7 @@ public void testGetObjectsResult() { S3StreamsMetadataImageTest.GeneratorResult generatorResult = generate(now, random, streamId, totalStreamLength, numberOfNodes, maxSegmentSize, streamSetObjectProbability, nodeMigrationProbability, - 10000, streamSetObjectNotContainsStreamProbability + maxStreamPerStreamSetObject, streamSetObjectNotContainsStreamProbability ); List allObjects = getS3ObjectMetadata(streamId, generatorResult.generatedStreamMetadata); diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java index d0afe3d68d..3376530320 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java @@ -43,7 +43,7 @@ import java.util.concurrent.locks.Lock; public class StreamSetObjectRangeIndex { - private static boolean ENABLED = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); + private static boolean enabled = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); private static final Logger LOGGER = LoggerFactory.getLogger(StreamSetObjectRangeIndex.class); private static volatile StreamSetObjectRangeIndex instance = null; @@ -88,16 +88,16 @@ public static StreamSetObjectRangeIndex getInstance() { } public static boolean isEnabled() { - return ENABLED; + return enabled; } public static void setEnabled(boolean enabled) { - ENABLED = enabled; + StreamSetObjectRangeIndex.enabled = enabled; } public void updateIndex(Long objectId, Long nodeId, Long streamId, List streamOffsetRanges) { - if (!ENABLED) { + if (!enabled) { return; } From 8e166a360c120c95219d01cfe960345ac9b3c24e Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Thu, 14 Aug 2025 22:32:29 +0800 Subject: [PATCH 7/7] perf(s3stream): avoid S3StreamSetObject objectId long primitive unboxing fix lint --- .../kafka/image/S3StreamsMetadataImage.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index ed6807bafb..807f678238 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -215,6 +215,7 @@ void getObjects0(GetObjectsContext ctx) { List objects = new LinkedList<>(); ctx.readEndOffset = readEndOffset(streamId, endOffset); + ctx.recordRange(stream.getRanges()); // floor value < 0 means that all stream objects' ranges are greater than startOffset int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset)); @@ -645,6 +646,8 @@ private void preLoadMiddleOffsetStreamSetObject(GetObjectsContext ctx, } private void preLoadStreamSetObject(GetObjectsContext ctx, long nodeId, List streamSetObjects, List indexProvider) { + int round = ctx.loadStreamSetObjectInfo; + indexProvider.forEach(index -> { S3StreamSetObject streamSetObject = streamSetObjects.get(index); if (streamSetObject.ranges().length != 0) { @@ -659,7 +662,8 @@ private void preLoadStreamSetObject(GetObjectsContext ctx, long nodeId, List { ctx.object2range.put(streamSetObject.objectId(), range); - PreLoadHistory preLoadHistory = new PreLoadHistory(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ctx.startTime), nodeId, index, range.orElse(null)); + PreLoadHistory preLoadHistory = new PreLoadHistory(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ctx.startTime), + round, nodeId, index, range.orElse(null)); ctx.recordPreload(preLoadHistory); }); }); @@ -672,7 +676,9 @@ private void preLoadEndOffsetStreamSetObject(GetObjectsContext ctx, long nodeId, ctx.incLastObjectLoaded(nodeId); List preLoadPos = new ArrayList<>(); - for (int i = streamSetObjectsSize - 5 * lastLoadedCount; i < streamSetObjectsSize - 5 && i >= 0; i++) { + int startPos = streamSetObjectsSize - 5 * lastLoadedCount; + int endPos = streamSetObjectsSize - 5 * (lastLoadedCount - 1); + for (int i = startPos; i < endPos && i >= 0; i++) { preLoadPos.add(i); } @@ -928,13 +934,15 @@ public String toString() { public static class PreLoadHistory { long nodeId; long index; + int round; StreamOffsetRange streamOffsetRange; long id; - public PreLoadHistory(long id, long nodeId, long index, StreamOffsetRange streamOffsetRange) { + public PreLoadHistory(long id, int round, long nodeId, long index, StreamOffsetRange streamOffsetRange) { this.id = id; this.nodeId = nodeId; this.index = index; + this.round = round; this.streamOffsetRange = streamOffsetRange; } @@ -942,6 +950,7 @@ public PreLoadHistory(long id, long nodeId, long index, StreamOffsetRange stream public String toString() { return "PreLoadHistory{" + "id=" + id + + ", round=" + round + ", nodeId=" + nodeId + ", index=" + index + ", streamOffsetRange=" + streamOffsetRange +