diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index 4de980b6cf..a0be028cee 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.log.stream.s3.metadata; import kafka.server.BrokerServer; @@ -35,8 +34,10 @@ import org.apache.kafka.metadata.stream.S3StreamSetObject; import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.cache.LRUCache; import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; import com.automq.stream.s3.index.LocalStreamRangeIndexCache; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3StreamConstant; @@ -48,11 +49,14 @@ import com.automq.stream.s3.streams.StreamMetadataListener; import com.automq.stream.utils.FutureUtil; import com.automq.stream.utils.Threads; +import com.google.common.collect.Sets; +import org.apache.orc.util.BloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -67,6 +71,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import static com.automq.stream.utils.FutureUtil.exec; +import static kafka.log.stream.s3.metadata.StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER; public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); @@ -78,6 +83,8 @@ public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPub private final LocalStreamRangeIndexCache indexCache; private final Map streamMetadataListeners = new ConcurrentHashMap<>(); + private Set streamSetObjectIds = Collections.emptySet(); + public StreamMetadataManager(BrokerServer broker, int nodeId, ObjectReaderFactory objectReaderFactory, LocalStreamRangeIndexCache indexCache) { this.nodeId = nodeId; @@ -98,6 +105,7 @@ public String name() { @Override public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { Set changedStreams; + Set streamSetObjectIds = this.streamSetObjectIds; synchronized (this) { if (newImage.highestOffsetAndEpoch().equals(this.metadataImage.highestOffsetAndEpoch())) { return; @@ -105,9 +113,15 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, Loader this.metadataImage = newImage; changedStreams = delta.getOrCreateStreamsMetadataDelta().changedStreams(); } + this.streamSetObjectIds = Collections.unmodifiableSet(getStreamSetObjectIds()); + + // update streamBloomFilter + Set sets = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); + sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); + // retry all pending tasks retryPendingTasks(); - this.indexCache.asyncPrune(this::getStreamSetObjectIds); + this.indexCache.asyncPrune(() -> streamSetObjectIds); notifyMetadataListeners(changedStreams); } @@ -339,9 +353,74 @@ public void close() { } } - private static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter { + public static class StreamIdBloomFilter { + public static final double DEFAULT_FPP = 0.01; + private final LRUCache cache = new LRUCache<>(); + private final long maxBloomFilterSize; + private long cacheSize = 0; + + public StreamIdBloomFilter(long maxBloomFilterCacheSize) { + this.maxBloomFilterSize = maxBloomFilterCacheSize; + } + + public synchronized void maintainCacheSize() { + while (cacheSize > maxBloomFilterSize) { + Map.Entry entry = cache.pop(); + if (entry != null) { + cacheSize -= Long.BYTES + entry.getValue().sizeInBytes(); + } + } + } + + public synchronized boolean mightContain(long objectId, long streamId) { + BloomFilter bloomFilter = cache.get(objectId); + if (bloomFilter == null) { + return true; // treat as exist + } + + cache.touchIfExist(objectId); + return bloomFilter.testLong(streamId); + } + + public synchronized void removeObject(long objectId) { + BloomFilter filter = cache.get(objectId); + if (cache.remove(objectId)) { + cacheSize -= Long.BYTES + filter.sizeInBytes(); + } + } + + public synchronized void update(long objectId, List streamOffsetRanges) { + if (cache.containsKey(objectId)) { + return; + } + + BloomFilter bloomFilter = new BloomFilter(streamOffsetRanges.size(), DEFAULT_FPP); + + streamOffsetRanges.forEach(range -> bloomFilter.addLong(range.streamId())); + cache.put(objectId, bloomFilter); + cacheSize += Long.BYTES + bloomFilter.sizeInBytes(); + + maintainCacheSize(); + } + + public synchronized long sizeInBytes() { + return this.cacheSize; + } + + public synchronized int objectNum() { + return this.cache.size(); + } + + public synchronized void clear() { + cache.clear(); + } + } + + public static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter { private final S3ObjectsImage objectsImage; private final ObjectReaderFactory objectReaderFactory; + public static final StreamIdBloomFilter STREAM_ID_BLOOM_FILTER = new StreamIdBloomFilter(20 * 1024 * 1024); + private S3StreamsMetadataImage.GetObjectsContext getObjectsContext; public DefaultRangeGetter(S3ObjectsImage objectsImage, ObjectReaderFactory objectReaderFactory) { @@ -350,16 +429,46 @@ public DefaultRangeGetter(S3ObjectsImage objectsImage, } @Override - public CompletableFuture> find(long objectId, long streamId) { + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + this.getObjectsContext = ctx; + } + + public static void updateIndex(ObjectReader reader, Long nodeId, Long streamId) { + reader.basicObjectInfo().thenAccept(info -> { + Long objectId = reader.metadata().objectId(); + List streamOffsetRanges = info.indexBlock().streamOffsetRanges(); + + STREAM_ID_BLOOM_FILTER.update(objectId, streamOffsetRanges); + + StreamSetObjectRangeIndex.getInstance().updateIndex(objectId, nodeId, streamId, streamOffsetRanges); + }).whenComplete((v, e) -> reader.release()); + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { S3Object s3Object = objectsImage.getObjectMetadata(objectId); if (s3Object == null) { return FutureUtil.failedFuture(new IllegalArgumentException("Cannot find object metadata for object: " + objectId)); } + + boolean mightContain = STREAM_ID_BLOOM_FILTER.mightContain(objectId, streamId); + if (!mightContain) { + getObjectsContext.bloomFilterSkipSSOCount++; + return CompletableFuture.completedFuture(Optional.empty()); + } + + getObjectsContext.searchSSOStreamOffsetRangeCount++; // The reader will be release after the find operation @SuppressWarnings("resource") ObjectReader reader = objectReaderFactory.get(new S3ObjectMetadata(objectId, s3Object.getObjectSize(), s3Object.getAttributes())); - CompletableFuture> cf = reader.basicObjectInfo().thenApply(info -> info.indexBlock().findStreamOffsetRange(streamId)); - cf.whenComplete((rst, ex) -> reader.release()); + CompletableFuture> cf = reader.basicObjectInfo() + .thenApply(info -> info.indexBlock().findStreamOffsetRange(streamId)); + cf.whenCompleteAsync((rst, ex) -> { + if (rst.isEmpty()) { + getObjectsContext.searchSSORangeEmpty.add(1); + } + updateIndex(reader, nodeId, streamId); + }, StreamSetObjectRangeIndex.UPDATE_INDEX_THREAD_POOL); return cf; } diff --git a/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java new file mode 100644 index 0000000000..ecf3bc492d --- /dev/null +++ b/core/src/test/java/kafka/log/stream/s3/metadata/S3StreamsMetadataImageTest.java @@ -0,0 +1,572 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.stream.s3.metadata; + +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.image.DeltaList; +import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage; +import org.apache.kafka.image.RegistryRef; +import org.apache.kafka.image.S3ObjectsImage; +import org.apache.kafka.image.S3StreamMetadataImage; +import org.apache.kafka.image.S3StreamsMetadataImage; +import org.apache.kafka.image.TopicIdPartition; +import org.apache.kafka.metadata.stream.InRangeObjects; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3StreamSetObject; +import org.apache.kafka.timeline.TimelineHashMap; + +import com.automq.stream.s3.ByteBufAlloc; +import com.automq.stream.s3.DataBlockIndex; +import com.automq.stream.s3.ObjectReader; +import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory; +import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.S3ObjectType; +import com.automq.stream.s3.metadata.S3StreamConstant; +import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.metadata.StreamState; +import com.automq.stream.s3.operator.MemoryObjectStorage; +import com.google.common.base.Preconditions; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import io.netty.buffer.ByteBuf; + + +@Tag("S3Unit") +public class S3StreamsMetadataImageTest { + private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamsMetadataImageTest.class); + + private static class NodeData { + private final List streamSetObjects; + + public NodeData(int nodeId, List streamSetObjects) { + this.streamSetObjects = streamSetObjects.stream() + .map(sso -> + new S3StreamSetObject(sso.objectId(), sso.nodeId(), Bytes.EMPTY, sso.orderId(), sso.dataTimeInMs())) + .collect(Collectors.toList()); + this.streamSetObjects.sort(Comparator.comparingLong(S3StreamSetObject::orderId)); + } + + public List getStreamSetObjects() { + return streamSetObjects; + } + } + + private static class GeneratedStreamMetadata implements S3StreamsMetadataImage.RangeGetter { + private final List streamObjects; + private final Map nodeData; + private final Map> ssoRanges; + private final StreamMetadataManager.DefaultRangeGetter rangeGetter; + private final Random r; + + public GeneratedStreamMetadata(Random r, List streamObjects, Map nodeData, Map> ssoRanges) { + RegistryRef registryRef = new RegistryRef(); + this.r = r; + TimelineHashMap object = new TimelineHashMap<>(registryRef.registry(), 0); + ssoRanges.keySet().forEach(id -> object.put(id, new S3Object(id, 0, 0, S3ObjectState.COMMITTED, 0))); + S3ObjectsImage image = new S3ObjectsImage(10000, object, registryRef); + MemoryObjectStorage memoryObjectStorage = new MemoryObjectStorage(); + ObjectReaderFactory factory = new DefaultObjectReaderFactory(memoryObjectStorage) { + @Override + public synchronized ObjectReader get(S3ObjectMetadata metadata) { + return new ObjectReader() { + @Override + public S3ObjectMetadata metadata() { + return metadata; + } + + @Override + public String objectKey() { + return ""; + } + + @Override + public CompletableFuture basicObjectInfo() { + return CompletableFuture.completedFuture(new BasicObjectInfo(0, null) { + @Override + public IndexBlock indexBlock() { + return new IndexBlock(metadata, ByteBufAlloc.byteBuffer(10)) { + @Override + public Optional findStreamOffsetRange(long streamId) { + List streamOffsetRanges = GeneratedStreamMetadata.this.getSsoRanges().get(metadata.objectId()); + return streamOffsetRanges.stream() + .filter(range -> range.streamId() == streamId).findAny(); + } + + @Override + public List streamOffsetRanges() { + return GeneratedStreamMetadata.this.getSsoRanges().get(metadata.objectId()); + } + }; + } + }); + } + + @Override + public CompletableFuture read(ReadOptions readOptions, DataBlockIndex block) { + return null; + } + + @Override + public ObjectReader retain() { + return null; + } + + @Override + public ObjectReader release() { + return null; + } + + @Override + public void close() { + + } + + @Override + public CompletableFuture size() { + return null; + } + }; + } + }; + StreamMetadataManager.DefaultRangeGetter rangeGetter = new StreamMetadataManager.DefaultRangeGetter(image, factory); + + this.rangeGetter = rangeGetter; + this.streamObjects = streamObjects; + this.nodeData = nodeData; + this.ssoRanges = ssoRanges; + } + + public List getStreamObjects() { + return streamObjects; + } + + public Map getNodeData() { + return new TreeMap<>(nodeData); + } + + public Map> getSsoRanges() { + return ssoRanges; + } + + private static final Object DUMMY_OBJECT = new Object(); + private ConcurrentHashMap accessed = new ConcurrentHashMap<>(); + + public S3StreamsMetadataImage.GetObjectsContext getObjectsContext; + + @Override + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + this.rangeGetter.attachGetObjectsContext(ctx); + this.getObjectsContext = ctx; + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { + return this.rangeGetter.find(objectId, streamId, nodeId, orderId) + .thenCompose( + r -> { + if (accessed.contains(objectId)) { + return CompletableFuture.completedFuture(r); + } + + accessed.put(objectId, DUMMY_OBJECT); + if (this.r.nextDouble() > 0.75) { + return new CompletableFuture>() + .completeOnTimeout(r, 1, TimeUnit.MILLISECONDS); + } else { + return CompletableFuture.completedFuture(r); + } + } + + ); + } + + @Override + public CompletableFuture readNodeRangeIndex(long nodeId) { + return CompletableFuture.failedFuture(new IllegalStateException("Not implemented")); + } + } + + public static class GeneratorResult { + long seed; + S3StreamsMetadataImage image; + GeneratedStreamMetadata generatedStreamMetadata; + + public GeneratorResult(long seed, S3StreamsMetadataImage image, GeneratedStreamMetadata generatedStreamMetadata) { + this.seed = seed; + this.image = image; + this.generatedStreamMetadata = generatedStreamMetadata; + } + } + + @SuppressWarnings("NPathComplexity") + public static S3StreamsMetadataImageTest.GeneratorResult generate( + long randomSeed, + Random random, + long streamId, long totalLength, int numNodes, int maxSegmentSize, + double streamSetObjectProbability, double nodeMigrationProbability, int maxStreamPerSSO, + double streamSetObjectNotContainsStreamProbability) { + + List streamObjects = new ArrayList<>(); + Map> nodeObjectsMap = new HashMap<>(); + Map> notStreamNodeObjectsMap = new HashMap<>(); + + List rangeMetadatas = new ArrayList<>(); + + long currentOffset = 0L; + int nextObjectId = 0; + int rangeIndex = 0; + long nextEpoch = 0; + + int currentNodeId = numNodes <= 0 ? -1 : random.nextInt(numNodes); + + RangeMetadata rangeMetadata = new RangeMetadata(streamId, nextEpoch, rangeIndex, + currentOffset, currentOffset, currentNodeId); + while (currentOffset < totalLength) { + if (random.nextDouble() < streamSetObjectNotContainsStreamProbability) { + notStreamNodeObjectsMap.computeIfAbsent(currentNodeId, k -> new ArrayList<>()) + .add(new S3StreamSetObject(nextObjectId, random.nextInt(numNodes), new ArrayList<>(), nextObjectId)); + nextObjectId++; + } + + long segmentLen = random.nextInt(maxSegmentSize - 20 + 1) + 20; + long startOffset = currentOffset; + long endOffset = Math.min(startOffset + segmentLen, totalLength); + + if (random.nextDouble() < streamSetObjectProbability) { + + if (numNodes > 1 && random.nextDouble() < nodeMigrationProbability) { + int newNodeId; + do { + newNodeId = random.nextInt(numNodes); + } while (newNodeId == currentNodeId); + currentNodeId = newNodeId; + } + + int nodeId = currentNodeId; + + if (rangeMetadata.nodeId() != nodeId) { + // set endOffset + rangeMetadata = new RangeMetadata(rangeMetadata.streamId(), rangeMetadata.epoch(), rangeIndex, rangeMetadata.startOffset(), startOffset, rangeMetadata.nodeId()); + rangeMetadatas.add(rangeMetadata); + rangeIndex++; + + // create new rangeMetadata + rangeMetadata = new RangeMetadata(rangeMetadata.streamId(), rangeMetadata.epoch() + 1, rangeIndex, + rangeMetadata.endOffset(), endOffset, nodeId); + } + + int orderId = nextObjectId; + S3StreamSetObject sso = new S3StreamSetObject(nextObjectId, nodeId, List.of(new StreamOffsetRange(streamId, startOffset, endOffset)), orderId); + + nodeObjectsMap.computeIfAbsent(nodeId, k -> new ArrayList<>()).add(sso); + } else { + S3StreamObject so = new S3StreamObject(nextObjectId, streamId, startOffset, endOffset); + streamObjects.add(so); + } + + nextObjectId++; + currentOffset = endOffset; + } + + if (rangeMetadata.startOffset() != totalLength) { + rangeMetadata = new RangeMetadata(rangeMetadata.streamId(), rangeMetadata.epoch(), rangeIndex, rangeMetadata.startOffset(), totalLength, rangeMetadata.nodeId()); + rangeMetadatas.add(rangeMetadata); + } + + Map finalNodeData = new HashMap<>(); + for (Map.Entry> entry : nodeObjectsMap.entrySet()) { + finalNodeData.put(entry.getKey(), new NodeData(entry.getKey(), entry.getValue())); + } + + RegistryRef ref = new RegistryRef(); + + DeltaList streamObject = new DeltaList(); + streamObjects.forEach(streamObject::add); + S3StreamMetadataImage streamsMetadataImage = new S3StreamMetadataImage(streamId, 1, StreamState.OPENED, + new S3StreamRecord.TagCollection(), + 0, rangeMetadatas, streamObject); + + TimelineHashMap streamMetadataImageTimelineHashMap = + new TimelineHashMap<>(ref.registry(), 10000); + streamMetadataImageTimelineHashMap.put(streamId, streamsMetadataImage); + + TimelineHashMap nodeMetadataMap = + new TimelineHashMap<>(ref.registry(), 10000); + + TreeMap> ranges = new TreeMap<>(); + nodeObjectsMap.entrySet().forEach(entry -> { + Integer nodeId = entry.getKey(); + List objects = entry.getValue(); + + DeltaList s3StreamSetObjectDeltaList = new DeltaList<>(); + for (S3StreamSetObject object : objects) { + S3StreamsMetadataImageTest.StreamSetObjectRange streamSetObjectRange = fillRandomStreamRangeInfo(streamId, random, maxStreamPerSSO, object); + ranges.put(object.objectId(), streamSetObjectRange.sfrs); + s3StreamSetObjectDeltaList.add(streamSetObjectRange.s3StreamSetObject); + } + + List notStreamNodeObjects = notStreamNodeObjectsMap.get(nodeId); + if (notStreamNodeObjects != null && !notStreamNodeObjects.isEmpty()) { + notStreamNodeObjects.forEach(sso -> { + S3StreamsMetadataImageTest.StreamSetObjectRange streamSetObjectRange = fillRandomStreamRangeInfo(streamId, random, maxStreamPerSSO, sso); + ranges.put(sso.objectId(), streamSetObjectRange.sfrs); + s3StreamSetObjectDeltaList.add(streamSetObjectRange.s3StreamSetObject); + }); + } + + NodeS3StreamSetObjectMetadataImage image = + new NodeS3StreamSetObjectMetadataImage(nodeId, 2, s3StreamSetObjectDeltaList); + + nodeMetadataMap.put(nodeId, image); + }); + + + TimelineHashMap> partition2streams = + new TimelineHashMap<>(ref.registry(), 10000); + + TimelineHashMap stream2partition = + new TimelineHashMap<>(ref.registry(), 10000); + + TimelineHashMap streamEndOffsets = + new TimelineHashMap<>(ref.registry(), 10000); + + streamEndOffsets.put(streamId, currentOffset); + + + S3StreamsMetadataImage image = new S3StreamsMetadataImage(streamId + 1, ref, streamMetadataImageTimelineHashMap, + nodeMetadataMap, partition2streams, stream2partition, streamEndOffsets); + + GeneratedStreamMetadata generatedStreamMetadata = new GeneratedStreamMetadata(random, streamObjects, finalNodeData, ranges); + + LOGGER.info("total stream object num: {}", streamObjects.size()); + LOGGER.info("total stream set object num: {}", notStreamNodeObjectsMap.values().stream().mapToInt(List::size).sum() + + nodeObjectsMap.values().stream().mapToInt(List::size).sum()); + LOGGER.info("total range num: {}", rangeMetadatas.size()); + LOGGER.info("node2sso num: "); + + nodeObjectsMap.entrySet().forEach(entry -> { + LOGGER.info("nodeId: {}", entry.getKey()); + LOGGER.info("sso num: {}", entry.getValue().size() + notStreamNodeObjectsMap.getOrDefault(entry.getKey(), List.of()).size()); + }); + return new S3StreamsMetadataImageTest.GeneratorResult(randomSeed, image, generatedStreamMetadata); + } + + public static List getS3ObjectMetadata(long streamId, GeneratedStreamMetadata metadata) { + List allObjects = new ArrayList<>(); + for (S3StreamObject so : metadata.getStreamObjects()) { + List ranges = Collections.singletonList(new StreamOffsetRange(so.streamId(), so.startOffset(), so.endOffset())); + allObjects.add(new S3ObjectMetadata(so.objectId(), S3ObjectType.STREAM, ranges, + S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_TS, S3StreamConstant.INVALID_OBJECT_SIZE, + S3StreamConstant.INVALID_ORDER_ID)); + } + for (Map.Entry entry : metadata.getNodeData().entrySet()) { + for (S3StreamSetObject sso : entry.getValue().getStreamSetObjects()) { + allObjects.add(new S3ObjectMetadata(sso.objectId(), S3ObjectType.STREAM_SET, find(metadata.getSsoRanges().get(sso.objectId()), streamId), + sso.dataTimeInMs(), sso.dataTimeInMs(), sso.orderId(), sso.nodeId())); + } + } + return allObjects; + } + + public static List find(List list, long streamId) { + return list.stream().filter(r -> r.streamId() == streamId).collect(Collectors.toList()); + } + + public static class StreamSetObjectRange { + S3StreamSetObject s3StreamSetObject; + List sfrs; + + public StreamSetObjectRange(S3StreamSetObject s3StreamSetObject, List sfrs) { + this.s3StreamSetObject = s3StreamSetObject; + this.sfrs = sfrs; + } + } + + private static final ConcurrentHashMap FILL_STREAM_END_OFFSET = new ConcurrentHashMap<>(); + + public static S3StreamsMetadataImageTest.StreamSetObjectRange fillRandomStreamRangeInfo(long streamId, Random r, int maxStreamPerSso, S3StreamSetObject s3StreamSetObject) { + S3StreamSetObject object = new S3StreamSetObject(s3StreamSetObject.objectId(), s3StreamSetObject.nodeId(), Bytes.EMPTY, s3StreamSetObject.orderId(), s3StreamSetObject.dataTimeInMs()); + + Set generatedStream = new HashSet<>(); + generatedStream.add(streamId); + List sfrs = new ArrayList<>(s3StreamSetObject.offsetRangeList()); + for (int i = 0; i < maxStreamPerSso; i++) { + long gStreamId; + do { + gStreamId = r.nextInt(0, (int) streamId * 2); + } while (generatedStream.contains(gStreamId)); + + if (FILL_STREAM_END_OFFSET.containsKey(gStreamId)) { + long lastEnd = FILL_STREAM_END_OFFSET.get(gStreamId); + long newStartOffset = r.nextLong(lastEnd, lastEnd + 10240); + long newEndOffset = newStartOffset + r.nextLong(0, newStartOffset); + sfrs.add(new StreamOffsetRange(gStreamId, newStartOffset, newEndOffset)); + FILL_STREAM_END_OFFSET.put(gStreamId, newEndOffset); + } else { + long startOffset = r.nextLong(0, Integer.MAX_VALUE / 4); + long endOffset = startOffset + r.nextLong(0, 10240); + FILL_STREAM_END_OFFSET.put(gStreamId, endOffset); + sfrs.add(new StreamOffsetRange(gStreamId, startOffset, + endOffset)); + } + + } + + Collections.sort(sfrs); + + return new S3StreamsMetadataImageTest.StreamSetObjectRange(object, sfrs); + } + + public static void check(long now, List seedList, int startOffset, int endOffset, List allObjects, InRangeObjects inRangeObjects, long costMs) { + LOGGER.info("=====check=========================="); + LOGGER.info("seed is {}", now); + LOGGER.info("seedList is seedList: {}", seedList); + LOGGER.info("startOffset is {}", startOffset); + LOGGER.info("endOffset is {}", endOffset); + LOGGER.info("cost={}ms", costMs); + try { + Preconditions.checkArgument(allObjects.size() == inRangeObjects.objects().size(), "result size not equal: allObjects: " + allObjects.size() + ", result: " + inRangeObjects.objects().size()); + + for (int i = 0; i < allObjects.size(); i++) { + S3ObjectMetadata metadata = allObjects.get(i); + S3ObjectMetadata metadata1 = inRangeObjects.objects().get(i); + + Assertions.assertEquals(metadata.objectId(), metadata1.objectId(), "objectId not equal at " + i); + Assertions.assertEquals(metadata.getType(), metadata1.getType(), "object Type not equal at " + i); + Assertions.assertEquals(metadata.startOffset(), metadata1.startOffset(), "startOffset not equal at " + i); + Assertions.assertEquals(metadata.endOffset(), metadata1.endOffset(), "endOffset not equal at " + i); + } + } catch (IllegalArgumentException e) { + Assertions.fail(e); + } + + } + + public static List slice(List allObjects, long startOffset, long endOffset, int limit) { + ArrayList ans = new ArrayList<>(); + for (S3ObjectMetadata object : allObjects) { + if (object.endOffset() > startOffset && object.startOffset() < endOffset && ans.size() < limit) { + ans.add(object); + } + } + + return ans; + } + + private static void enableStreamSetObjectRangeIndex() { + StreamSetObjectRangeIndex.setEnabled(true); + } + + @Test + public void testGetObjectsResult() throws InterruptedException { + Random random = new Random(); + ArrayList seedList = new ArrayList<>(); + enableStreamSetObjectRangeIndex(); + + StreamSetObjectRangeIndex.getInstance().clear(); + + StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER.clear(); + long streamId = 420000L; + long totalStreamLength = 2000L; + int numberOfNodes = 10; + int maxSegmentSize = 200; + int maxStreamPerStreamSetObject = 5; + double streamSetObjectProbability = 0.8; + double nodeMigrationProbability = 0.01; + double streamSetObjectNotContainsStreamProbability = 0.8; + + long now = System.currentTimeMillis(); + seedList.add(now); + random.setSeed(now); + + S3StreamsMetadataImageTest.GeneratorResult generatorResult = generate(now, random, + streamId, totalStreamLength, numberOfNodes, maxSegmentSize, + streamSetObjectProbability, nodeMigrationProbability, + maxStreamPerStreamSetObject, streamSetObjectNotContainsStreamProbability + ); + + List allObjects = getS3ObjectMetadata(streamId, generatorResult.generatedStreamMetadata); + + allObjects.sort(Comparator.comparingLong(S3ObjectMetadata::startOffset)); + + List originResult = allObjects; + + CompletableFuture objects = null; + objects = generatorResult.image.getObjects(streamId, 0, totalStreamLength, Integer.MAX_VALUE, generatorResult.generatedStreamMetadata); + InRangeObjects inRangeObjects = null; + try { + inRangeObjects = objects.get(); + } catch (Exception e) { + LOGGER.error("seed is {}", now); + LOGGER.error("seedList is seedList: {}", seedList); + Assertions.fail(e); + } + System.out.println(generatorResult.generatedStreamMetadata.getObjectsContext.dumpStatistics()); + + check(now, seedList, 0, (int) totalStreamLength, originResult, inRangeObjects, 0); + + + int limit = 4; + for (int i = 0; i < 1000; i++) { + long startMs = System.nanoTime(); + int startOffset = random.nextInt((int) totalStreamLength); + int endOffset = random.nextInt(startOffset, (int) totalStreamLength); + + objects = generatorResult.image.getObjects(streamId, startOffset, endOffset, limit, generatorResult.generatedStreamMetadata); + + try { + inRangeObjects = objects.get(); + } catch (Exception e) { + LOGGER.error("seed is {}", now); + LOGGER.error("seedList is seedList: {}", seedList); + Assertions.fail(e); + } + + long costMs = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startMs, TimeUnit.NANOSECONDS); + check(now, seedList, startOffset, endOffset, slice(originResult, startOffset, endOffset, limit), inRangeObjects, costMs); + } + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 928689b177..807f678238 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -34,6 +34,7 @@ import com.automq.stream.s3.exceptions.ObjectNotExistException; import com.automq.stream.s3.index.LocalStreamRangeIndexCache; import com.automq.stream.s3.index.NodeRangeIndexCache; +import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; @@ -42,6 +43,7 @@ import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.MetadataStats; import com.automq.stream.utils.FutureUtil; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +61,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import io.netty.buffer.ByteBuf; @@ -146,9 +150,10 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } + @VisibleForTesting public CompletableFuture getObjects(long streamId, long startOffset, long endOffset, int limit, RangeGetter rangeGetter) { - return getObjects(streamId, startOffset, endOffset, limit, rangeGetter, null); + return getObjects(streamId, startOffset, endOffset, limit, rangeGetter, LocalStreamRangeIndexCache.create()); } /** @@ -166,6 +171,7 @@ public CompletableFuture getObjects(long streamId, long startOff long startTimeNanos = System.nanoTime(); GetObjectsContext ctx = new GetObjectsContext(streamId, startOffset, endOffset, limit, rangeGetter, indexCache); try { + rangeGetter.attachGetObjectsContext(ctx); getObjects0(ctx); } catch (Throwable e) { ctx.cf.completeExceptionally(e); @@ -181,6 +187,16 @@ public CompletableFuture getObjects(long streamId, long startOff return ctx.cf; } + private boolean readEndOffset(long streamId, long endOffset) { + boolean readEndOffset = endOffset == -1L; + OptionalLong optionalLong = streamEndOffset(streamId); + if (optionalLong.isPresent()) { + readEndOffset = readEndOffset || optionalLong.getAsLong() == endOffset; + } + + return readEndOffset; + } + @SuppressWarnings({"checkstyle:cyclomaticcomplexity", "checkstyle:npathcomplexity"}) void getObjects0(GetObjectsContext ctx) { long streamId = ctx.streamId; @@ -198,6 +214,9 @@ void getObjects0(GetObjectsContext ctx) { } List objects = new LinkedList<>(); + ctx.readEndOffset = readEndOffset(streamId, endOffset); + ctx.recordRange(stream.getRanges()); + // floor value < 0 means that all stream objects' ranges are greater than startOffset int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset)); @@ -206,7 +225,7 @@ void getObjects0(GetObjectsContext ctx) { int lastRangeIndex = -1; int streamSetObjectIndex = 0; fillObjects(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, streamSetObjectIndex, - null, null); + null, 0, null); } private void fillObjects( @@ -218,10 +237,11 @@ private void fillObjects( List streamObjects, int streamSetObjectIndex, List streamSetObjects, + int loadedStreamSetObjectIndex, NodeS3StreamSetObjectMetadataImage node ) { exec(() -> fillObjects0(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, - streamSetObjectIndex, streamSetObjects, node), ctx.cf, LOGGER, "fillObjects"); + streamSetObjectIndex, streamSetObjects, loadedStreamSetObjectIndex, node), ctx.cf, LOGGER, "fillObjects"); } void fillObjects0( @@ -233,9 +253,10 @@ void fillObjects0( List streamObjects, int streamSetObjectIndex, List streamSetObjects, + int loadedStreamSetObjectIndex, NodeS3StreamSetObjectMetadataImage node ) { - ctx.record(objects.size(), lastRangeIndex, streamObjectIndex, streamSetObjectIndex); + ctx.record(objects.size(), lastRangeIndex, streamObjectIndex, streamSetObjectIndex, loadedStreamSetObjectIndex, node); long nextStartOffset = ctx.nextStartOffset; boolean firstTimeSearchInSSO = true; int finalStartSearchIndex = streamSetObjectIndex; @@ -244,6 +265,7 @@ void fillObjects0( // try to find consistent stream objects for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) { + ctx.checkSOCount++; S3StreamObject streamObject = streamObjects.get(streamObjectIndex); if (streamObject.startOffset() != nextStartOffset) { //noinspection StatementWithEmptyBody @@ -266,6 +288,7 @@ void fillObjects0( } if (streamSetObjects == null) { + ctx.searchNodeCount++; int rangeIndex = stream.getRangeContainsOffset(nextStartOffset); // 1. can not find the range containing nextStartOffset, or // 2. the range is the same as the last one, which means the nextStartOffset does not move on. @@ -289,6 +312,7 @@ void fillObjects0( final int finalStreamObjectIndex = streamObjectIndex; final List finalStreamSetObjects = streamSetObjects; final NodeS3StreamSetObjectMetadataImage finalNode = node; + final long nodeId = range.nodeId(); startSearchIndexCf.whenComplete((index, ex) -> { if (ex != null) { if (!(FutureUtil.cause(ex) instanceof ObjectNotExistException)) { @@ -298,20 +322,27 @@ void fillObjects0( } // load stream set object index int finalIndex = index; - loadStreamSetObjectInfo(ctx, finalStreamSetObjects, index).thenAccept(v -> { - ctx.nextStartOffset = finalNextStartOffset; - fillObjects(ctx, stream, objects, finalLastRangeIndex, finalStreamObjectIndex, streamObjects, - finalIndex, finalStreamSetObjects, finalNode); - }).exceptionally(exception -> { - ctx.cf.completeExceptionally(exception); - return null; - }); + if (finalIndex != 0) { + ctx.ssoIndexHelpCount++; + } else { + ctx.ssoNoIndex++; + } + long startTimeNs = System.nanoTime(); + loadStreamSetObjectInfo(ctx, nodeId, finalStreamSetObjects, index, ctx.loadStreamSetObjectInfo / 5 + 5) + .thenAccept(finalEnsureLoadedIndexExclusive -> { + ctx.nextStartOffset = finalNextStartOffset; + ctx.waitLoadSSOCostMs.add(TimerUtil.timeElapsedSince(startTimeNs, TimeUnit.NANOSECONDS)); + fillObjects(ctx, stream, objects, finalLastRangeIndex, finalStreamObjectIndex, streamObjects, finalIndex, finalStreamSetObjects, finalEnsureLoadedIndexExclusive, finalNode); + }).exceptionally(exception -> { + ctx.cf.completeExceptionally(exception); + return null; + }); }); return; } - final int streamSetObjectsSize = streamSetObjects.size(); - for (; streamSetObjectIndex < streamSetObjectsSize; streamSetObjectIndex++) { + for (; streamSetObjectIndex < loadedStreamSetObjectIndex; streamSetObjectIndex++) { + ctx.checkSSOCount++; S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); StreamOffsetRange streamOffsetRange = findStreamInStreamSetObject(ctx, streamSetObject).orElse(null); // skip the stream set object not containing the stream or the range is before the nextStartOffset @@ -340,8 +371,13 @@ void fillObjects0( break; } } - // case 1. streamSetObjectIndex >= streamSetObjects.size(), which means we have reached the end of the stream set objects. - // case 2. objects.size() == roundStartObjectSize, which means we have not found any new object in this round. + + if (streamSetObjectIndex == loadedStreamSetObjectIndex && loadedStreamSetObjectIndex < streamSetObjects.size()) { + loadMoreStreamSetObjects(ctx, stream, objects, nextStartOffset, lastRangeIndex, streamSetObjects, + streamObjectIndex, loadedStreamSetObjectIndex, streamObjects, node); + return; + } + if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) { // move to the next range // This can ensure that we can break the loop. @@ -350,6 +386,47 @@ void fillObjects0( } } + private void loadMoreStreamSetObjects(GetObjectsContext ctx, + S3StreamMetadataImage stream, + List objects, + long nextStartOffset, + int lastRangeIndex, + List streamSetObjects, + int streamObjectIndex, + int loadedStreamSetObjectIndex, + List streamObjects, + NodeS3StreamSetObjectMetadataImage node) { + // check if index can fast skip + CompletableFuture startSearchIndexCf = getStartSearchIndex(node, nextStartOffset, ctx); + + startSearchIndexCf.whenComplete((index, ex) -> { + if (ex != null) { + if (!(FutureUtil.cause(ex) instanceof ObjectNotExistException)) { + LOGGER.error("Failed to get start search index", ex); + } + index = 0; + } + + if (index > loadedStreamSetObjectIndex) { + ctx.fastSkipIndexCount++; + } + + // load stream set object index + int finalIndex = Math.max(index, loadedStreamSetObjectIndex); + long startTimeNs = System.nanoTime(); + loadStreamSetObjectInfo(ctx, node.getNodeId(), streamSetObjects, finalIndex, ctx.loadStreamSetObjectInfo / 5 + 5) + .thenAccept(loadedIndexExclusive -> { + ctx.nextStartOffset = nextStartOffset; + ctx.waitLoadSSOCostMs.add(TimerUtil.timeElapsedSince(startTimeNs, TimeUnit.NANOSECONDS)); + fillObjects(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, + finalIndex, streamSetObjects, loadedIndexExclusive, node); + }).exceptionally(exception -> { + ctx.cf.completeExceptionally(exception); + return null; + }); + }); + } + private void completeWithSanityCheck(GetObjectsContext ctx, List objects) { try { sanityCheck(ctx, objects); @@ -420,11 +497,7 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet if (node == null) { return CompletableFuture.completedFuture(0); } - // search in compact cache first - int index = node.floorStreamSetObjectIndex(ctx.streamId, startOffset); - if (index > 0) { - return CompletableFuture.completedFuture(index); - } + // search in sparse index ctx.isFromSparseIndex = true; return getStartStreamSetObjectId(node.getNodeId(), startOffset, ctx) @@ -432,7 +505,11 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet int startIndex = -1; if (objectId >= 0) { startIndex = findStartSearchIndex(objectId, node.orderList()); - MetadataStats.getInstance().getRangeIndexHitCountStats().add(MetricsLevel.INFO, 1); + if (startIndex < 0) { + StreamSetObjectRangeIndex.getInstance().invalid(node.getNodeId(), ctx.streamId, startOffset, objectId); + } else { + MetadataStats.getInstance().getRangeIndexHitCountStats().add(MetricsLevel.INFO, 1); + } } else { MetadataStats.getInstance().getRangeIndexMissCountStats().add(MetricsLevel.INFO, 1); } @@ -457,6 +534,10 @@ private CompletableFuture getStartSearchIndex0(NodeS3StreamSetObjectMet * should be invalidated, so we can refresh the index from object storage next time */ private CompletableFuture getStartStreamSetObjectId(int nodeId, long startOffset, GetObjectsContext ctx) { + if (StreamSetObjectRangeIndex.isEnabled()) { + return StreamSetObjectRangeIndex.getInstance().searchObjectId(nodeId, ctx.streamId, startOffset); + } + if (ctx.indexCache != null && ctx.indexCache.nodeId() == nodeId) { return ctx.indexCache.searchObjectId(ctx.streamId, startOffset); } @@ -474,14 +555,22 @@ private CompletableFuture getStartStreamSetObjectId(int nodeId, long start /** * Load the stream set object range info is missing * - * @return async load + * @return async ensure loaded stream set object index (exclusive) */ - private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, - List streamSetObjects, - int startSearchIndex) { + private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, + long nodeId, + List streamSetObjects, + int startSearchIndex, + int maxWaitInfoNum + ) { return exec(() -> { + ctx.loadStreamSetObjectInfo++; final int streamSetObjectsSize = streamSetObjects.size(); List> loadIndexCfList = new LinkedList<>(); + + int currentWaitNum = maxWaitInfoNum; + int lastLoadedIndex = startSearchIndex; + for (int i = startSearchIndex; i < streamSetObjectsSize; i++) { S3StreamSetObject streamSetObject = streamSetObjects.get(i); if (streamSetObject.ranges().length != 0) { @@ -490,19 +579,113 @@ private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, if (ctx.object2range.containsKey(streamSetObject.objectId())) { continue; } - loadIndexCfList.add( - ctx.rangeGetter - .find(streamSetObject.objectId(), ctx.streamId) - .thenAccept(range -> ctx.object2range.put(streamSetObject.objectId(), range)) - ); + + CompletableFuture cf = ctx.rangeGetter + .find(streamSetObject.objectId(), ctx.streamId, nodeId, streamSetObject.orderId()) + .thenAccept(range -> ctx.object2range.put(streamSetObject.objectId(), range)); + + if (!cf.isDone() && currentWaitNum > 0) { + ctx.waitSSOLoadCount++; + currentWaitNum--; + } + + loadIndexCfList.add(cf); + + if (currentWaitNum > 0) { + lastLoadedIndex = i; + } else { + break; + } } + if (loadIndexCfList.isEmpty()) { - return CompletableFuture.completedFuture(null); + // not need to load any index + return CompletableFuture.completedFuture(streamSetObjectsSize); + } + + if (ctx.readEndOffset) { + preLoadEndOffsetStreamSetObject(ctx, nodeId, streamSetObjects); + } else { + preLoadMiddleOffsetStreamSetObject(ctx, nodeId, streamSetObjects, lastLoadedIndex, maxWaitInfoNum); } - return CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0])); + + int finalLastLoadedIndex = lastLoadedIndex; + return CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0])) + .thenApply(v -> { + return finalLastLoadedIndex + 1; + }); }, LOGGER, "loadStreamSetObjectInfo"); } + private void preLoadMiddleOffsetStreamSetObject(GetObjectsContext ctx, + long nodeId, + List streamSetObjects, + int lastLoadedIndex, + int maxWaitInfoNum) { + int streamSetObjectsSize = streamSetObjects.size(); + int remainingObjects = streamSetObjectsSize - lastLoadedIndex; + if (!ctx.readEndOffset && ctx.maxPreLoadSSORound < 5 && remainingObjects > maxWaitInfoNum) { + double preLoadStep = (double) remainingObjects / (5 - ctx.maxPreLoadSSORound); + + List preLoadPos = new ArrayList<>(); + + for (int i = 1; i <= 4; i++) { + int loadIndex = lastLoadedIndex + (int) (preLoadStep * i); + + if (loadIndex <= lastLoadedIndex || loadIndex >= streamSetObjectsSize) { + continue; + } + + preLoadPos.add(loadIndex); + } + + preLoadStreamSetObject(ctx, nodeId, streamSetObjects, preLoadPos); + + ctx.maxPreLoadSSORound++; + } + } + + private void preLoadStreamSetObject(GetObjectsContext ctx, long nodeId, List streamSetObjects, List indexProvider) { + int round = ctx.loadStreamSetObjectInfo; + + indexProvider.forEach(index -> { + S3StreamSetObject streamSetObject = streamSetObjects.get(index); + if (streamSetObject.ranges().length != 0) { + return; + } + if (ctx.object2range.containsKey(streamSetObject.objectId())) { + return; + } + + ctx.preLoadSSOCount++; + ctx.rangeGetter + .find(streamSetObject.objectId(), ctx.streamId, nodeId, streamSetObject.orderId()) + .thenAccept(range -> { + ctx.object2range.put(streamSetObject.objectId(), range); + PreLoadHistory preLoadHistory = new PreLoadHistory(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ctx.startTime), + round, nodeId, index, range.orElse(null)); + ctx.recordPreload(preLoadHistory); + }); + }); + } + + private void preLoadEndOffsetStreamSetObject(GetObjectsContext ctx, long nodeId, List streamSetObjects) { + int lastLoadedCount = ctx.lastObjectLoaded(nodeId); + int streamSetObjectsSize = streamSetObjects.size(); + if (ctx.readEndOffset && lastLoadedCount < 5) { + ctx.incLastObjectLoaded(nodeId); + + List preLoadPos = new ArrayList<>(); + int startPos = streamSetObjectsSize - 5 * lastLoadedCount; + int endPos = streamSetObjectsSize - 5 * (lastLoadedCount - 1); + for (int i = startPos; i < endPos && i >= 0; i++) { + preLoadPos.add(i); + } + + preLoadStreamSetObject(ctx, nodeId, streamSetObjects, preLoadPos); + } + } + private Optional findStreamInStreamSetObject(GetObjectsContext ctx, S3StreamSetObject object) { if (object.ranges().length == 0) { return ctx.object2range.get(object.objectId()); @@ -702,19 +885,133 @@ public ReferenceCounted touch(Object o) { return this; } - static class GetObjectsContext { + public static class GetObjectDebugInfo { + private final long nextStartOffset; + private final int objectSize; + private final int lastRangeIndex; + private final int streamObjectIndex; + private final int streamSetObjectIndex; + private final int loadedStreamSetObjectIndex; + private final int ssoSize; + private final int nodeId; + private final long id; + + public GetObjectDebugInfo(long id, long nextStartOffset, + int objectSize, + int lastRangeIndex, + int streamObjectIndex, + int streamSetObjectIndex, + int loadedStreamSetObjectIndex, + int ssoSize, + int nodeId) { + this.id = id; + this.nextStartOffset = nextStartOffset; + this.objectSize = objectSize; + this.lastRangeIndex = lastRangeIndex; + this.streamObjectIndex = streamObjectIndex; + this.streamSetObjectIndex = streamSetObjectIndex; + this.loadedStreamSetObjectIndex = loadedStreamSetObjectIndex; + this.ssoSize = ssoSize; + this.nodeId = nodeId; + } + + @Override + public String toString() { + return "GetObjectDebugInfo{" + + "id=" + id + + ", nextStartOffset=" + nextStartOffset + + ", objectSize=" + objectSize + + ", lastRangeIndex=" + lastRangeIndex + + ", streamObjectIndex=" + streamObjectIndex + + ", streamSetObjectIndex=" + streamSetObjectIndex + + ", loadedStreamSetObjectIndex=" + loadedStreamSetObjectIndex + + ", ssoSize=" + ssoSize + + ", nodeId=" + nodeId + + '}'; + } + } + + public static class PreLoadHistory { + long nodeId; + long index; + int round; + StreamOffsetRange streamOffsetRange; + long id; + + public PreLoadHistory(long id, int round, long nodeId, long index, StreamOffsetRange streamOffsetRange) { + this.id = id; + this.nodeId = nodeId; + this.index = index; + this.round = round; + this.streamOffsetRange = streamOffsetRange; + } + + @Override + public String toString() { + return "PreLoadHistory{" + + "id=" + id + + ", round=" + round + + ", nodeId=" + nodeId + + ", index=" + index + + ", streamOffsetRange=" + streamOffsetRange + + '}'; + } + } + + public static class GetObjectsContext { final long streamId; + public List range; + boolean readEndOffset; long startOffset; long nextStartOffset; long endOffset; int limit; boolean isFromSparseIndex; + int maxPreLoadSSORound = 1; RangeGetter rangeGetter; LocalStreamRangeIndexCache indexCache; CompletableFuture cf = new CompletableFuture<>(); Map> object2range = new ConcurrentHashMap<>(); - List debugContext = new ArrayList<>(); + List debugContext = new ArrayList<>(); + final ConcurrentLinkedQueue preloadHistory = new ConcurrentLinkedQueue<>(); + + public int checkSOCount; + public int checkSSOCount; + + public int searchNodeCount; + public int searchSSOStreamOffsetRangeCount; + public LongAdder searchSSORangeEmpty = new LongAdder(); + public int bloomFilterSkipSSOCount; + + public int ssoIndexHelpCount; + public int ssoNoIndex; + + public int waitSSOLoadCount; + public LongAdder waitLoadSSOCostMs = new LongAdder(); + public int preLoadSSOCount; + + public int fastSkipIndexCount; + public int loadStreamSetObjectInfo; + + public String dumpStatistics() { + return String.format("GetObjectsContext{streamId=%d, startOffset=%d, nextStartOffset=%d, endOffset=%d, limit=%d,%n" + + "checkSOCount=%d, checkSSOCount=%d, searchNodeCount=%d, searchSSOStreamOffsetRangeCount=%d, searchSSORangeEmpty=%d,%n" + + "bloomFilterSkipCount=%d, ssoNoIndex=%d, ssoIndexHelpCount=%d, waitSSOLoadCount=%d, waitLoadSSOCostMs=%d%n" + + "loadStreamSetObjectInfo=%d, preLoadSSOCount=%d, fastSkipIndexCount=%d}", + streamId, startOffset, nextStartOffset, endOffset, limit, + checkSOCount, checkSSOCount, + searchNodeCount, searchSSOStreamOffsetRangeCount, searchSSORangeEmpty.sum(), + bloomFilterSkipSSOCount, ssoNoIndex, + ssoIndexHelpCount, + waitSSOLoadCount, + TimeUnit.MILLISECONDS.convert(waitLoadSSOCostMs.sum(), TimeUnit.NANOSECONDS), + loadStreamSetObjectInfo, + preLoadSSOCount, + fastSkipIndexCount); + } + + private final long startTime = System.nanoTime(); GetObjectsContext(long streamId, long startOffset, long endOffset, int limit, RangeGetter rangeGetter, LocalStreamRangeIndexCache indexCache) { @@ -726,22 +1023,78 @@ static class GetObjectsContext { this.rangeGetter = rangeGetter; this.indexCache = indexCache; this.isFromSparseIndex = false; + cf.whenComplete((v, e) -> { + long cost = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + + if (cost > 500 || fastSkipIndexCount > 3) { + LOGGER.info("get objects streamId={}, startOffset={}, endOffset={}, limit={} cost={}ms readEndOffset={}, debugContext={} \n" + + "ranges={}\n" + + "preload={}\n" + + "Metrics: checkSOCount={}, checkSSOCount={}, searchNodeCount={}, searchSSORangeCount={}, \" +\n" + + "searchSSORangeEmpty={}, bloomFilterSkipSSOCount={}, ssoIndexHelpCount={}, \" +\n" + + "ssoNoIndex={}, waitSSOLoadCount={}, waitLoadSSOCostMs={}, preLoadSSOCount={}, \" +\n" + + "fastSkipIndexCount={}, loadStreamSetObjectInfo={}", + streamId, startOffset, endOffset, limit, cost, readEndOffset, debugContext, + range, + this.preloadHistory, + this.checkSOCount, this.checkSSOCount, this.searchNodeCount, this.searchSSOStreamOffsetRangeCount, + this.searchSSORangeEmpty.sum(), this.bloomFilterSkipSSOCount, this.ssoIndexHelpCount, + this.ssoNoIndex, this.waitSSOLoadCount, cost, + this.preLoadSSOCount, this.fastSkipIndexCount, this.loadStreamSetObjectInfo); + } + }); } - public void record(int objectSize, int lastRangeIndex, int streamObjectIndex, int streamSetObjectIndex) { - debugContext.add(String.format("nextStartOffset=%d, objectSize=%d, lastRangeIndex=%d, streamObjectIndex=%d, streamSetObjectIndex=%d", nextStartOffset, - objectSize, lastRangeIndex, streamObjectIndex, streamSetObjectIndex)); - if (debugContext.size() > 20) { + public List getDebugContext() { + return debugContext; + } + + public void recordPreload(PreLoadHistory history) { + this.preloadHistory.add(history); + } + + public void recordRange(List range) { + this.range = range; + } + + public void record(int objectSize, int lastRangeIndex, int streamObjectIndex, int streamSetObjectIndex, int loadedStreamSetObjectIndex, NodeS3StreamSetObjectMetadataImage node) { + int nodeId = -1; + int ssoSize = -1; + if (node != null) { + nodeId = node.getNodeId(); + ssoSize = node.orderList().size(); + } + + debugContext.add(new GetObjectDebugInfo(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.startTime), nextStartOffset, + objectSize, + lastRangeIndex, + streamObjectIndex, + streamSetObjectIndex, + loadedStreamSetObjectIndex, + ssoSize, + nodeId)); + + if (debugContext.size() > 500) { LOGGER.error("GetObjects may has endless loop: streamId={}, startOffset={}, endOffset={}, limit={}, debugContext={}", streamId, startOffset, endOffset, limit, debugContext); Runtime.getRuntime().halt(1); } } + + private final ConcurrentHashMap lastObjectLoadedSet = new ConcurrentHashMap<>(); + + public int lastObjectLoaded(long nodeId) { + return lastObjectLoadedSet.getOrDefault(nodeId, 1); + } + + public void incLastObjectLoaded(long nodeId) { + this.lastObjectLoadedSet.put(nodeId, lastObjectLoadedSet.getOrDefault(nodeId, 1) + 1); + } } public interface RangeGetter { - CompletableFuture> find(long objectId, long streamId); - + void attachGetObjectsContext(GetObjectsContext ctx); + CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId); CompletableFuture readNodeRangeIndex(long nodeId); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 0085585670..4f6c426cb3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -99,7 +99,12 @@ public class S3StreamsMetadataImageTest { private static final long GB = 1024 * MB; private final RangeGetter defaultRangeGetter = new RangeGetter() { @Override - public CompletableFuture> find(long objectId, long streamId) { + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { return FutureUtil.failedFuture(new UnsupportedOperationException()); } @@ -188,7 +193,12 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { private RangeGetter buildMemoryRangeGetter() { return new RangeGetter() { @Override - public CompletableFuture> find(long objectId, long streamId) { + public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) { + + } + + @Override + public CompletableFuture> find(long objectId, long streamId, long nodeId, long orderId) { if (objectId == 0) { return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 100L, 120L))); } else if (objectId == 1) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java b/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java index 89961be160..9b0de4154f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java @@ -81,7 +81,7 @@ public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycl private long lastUploadTime = 0L; private LocalStreamRangeIndexCache() { - + } public static LocalStreamRangeIndexCache create() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java new file mode 100644 index 0000000000..3376530320 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/index/lazy/StreamSetObjectRangeIndex.java @@ -0,0 +1,195 @@ +/* + * Copyright 2025, AutoMQ HK Limited. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.automq.stream.s3.index.lazy; + +import com.automq.stream.s3.index.NodeRangeIndexCache; +import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.utils.ThreadUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.Striped; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +public class StreamSetObjectRangeIndex { + private static boolean enabled = System.getenv().containsKey("AUTOMQ_STREAM_SET_RANGE_INDEX_ENABLED"); + private static final Logger LOGGER = LoggerFactory.getLogger(StreamSetObjectRangeIndex.class); + private static volatile StreamSetObjectRangeIndex instance = null; + + public static final ExecutorService UPDATE_INDEX_THREAD_POOL = Executors.newSingleThreadExecutor( + ThreadUtils.createThreadFactory("StreamSetObjectRangeIndex", true)); + + private static final Object DUMMAY_OBJECT = new Object(); + private final ConcurrentHashMap> streamOffsetIndexMap = + new ConcurrentHashMap<>(10240); + private final Cache expireCache; + + private final Cache lastReaderUpdateTime = CacheBuilder.newBuilder() + .maximumSize(20000) + .expireAfterWrite(Duration.ofMinutes(1)) + .build(); + + private final Striped lock = Striped.lock(64); + + public StreamSetObjectRangeIndex(int maxSize, long expireTimeMs, Ticker ticker) { + expireCache = CacheBuilder.newBuilder() + .ticker(ticker) + .maximumSize(maxSize) + .expireAfterWrite(Duration.ofMillis(expireTimeMs)) + .removalListener(notification -> { + if (notification.getKey() != null) { + streamOffsetIndexMap.remove(notification.getKey()); + } + }) + .build(); + } + + public static StreamSetObjectRangeIndex getInstance() { + if (instance == null) { + synchronized (NodeRangeIndexCache.class) { + if (instance == null) { + instance = new StreamSetObjectRangeIndex(20 * 10000, + TimeUnit.MINUTES.toMillis(10), Ticker.systemTicker()); + } + } + } + return instance; + } + + public static boolean isEnabled() { + return enabled; + } + + public static void setEnabled(boolean enabled) { + StreamSetObjectRangeIndex.enabled = enabled; + } + + public void updateIndex(Long objectId, Long nodeId, Long streamId, + List streamOffsetRanges) { + if (!enabled) { + return; + } + + if (lastReaderUpdateTime.getIfPresent(objectId) != null) { + return; + } else { + lastReaderUpdateTime.put(objectId, System.currentTimeMillis()); + } + + try { + touch(streamId); + updateIndex(objectId, streamOffsetRanges); + } catch (Exception e) { + LOGGER.error("Failed to update index for reader: {}, nodeId: {}, streamId: {}", + objectId, nodeId, streamId, e); + } + } + + public void touch(Long streamId) { + try { + expireCache.get(streamId, () -> DUMMAY_OBJECT); + } catch (Exception ignored) { + + } + } + + @VisibleForTesting + public void clear() { + streamOffsetIndexMap.clear(); + } + + public void updateIndex(Long objectId, List streamOffsetRanges) { + for (StreamOffsetRange streamOffsetRange : streamOffsetRanges) { + Long streamId = streamOffsetRange.streamId(); + Long startOffset = streamOffsetRange.startOffset(); + + withLock(streamId, () -> { + TreeMap offsetMap = streamOffsetIndexMap.computeIfAbsent(streamId, k -> new TreeMap<>()); + offsetMap.put(startOffset, objectId); + touch(streamId); + }); + } + } + + private T withLockReturn(Long streamId, Callable callable) { + Lock l = lock.get(streamId); + try { + l.lock(); + return callable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + l.unlock(); + } + } + + private void withLock(Long streamId, Runnable runnable) { + Lock l = lock.get(streamId); + try { + l.lock(); + runnable.run(); + } finally { + l.unlock(); + } + } + + public void invalid(int nodeId, long streamId, long startOffset, Long objectId) { + TreeMap longLongTreeMap = streamOffsetIndexMap.get(streamId); + if (longLongTreeMap == null) { + return; + } + + withLock(streamId, () -> { + longLongTreeMap.remove(startOffset, objectId); + }); + } + + public CompletableFuture searchObjectId(int nodeId, long streamId, long startOffset) { + TreeMap offsetMap = streamOffsetIndexMap.get(streamId); + if (offsetMap == null) { + return CompletableFuture.completedFuture(-1L); + } + + return withLockReturn(streamId, () -> { + Map.Entry entry = offsetMap.floorEntry(startOffset); + if (entry == null) { + return CompletableFuture.completedFuture(-1L); + } + + touch(streamId); + + return CompletableFuture.completedFuture(entry.getValue()); + }); + } +}