Skip to content

feat(metadata): extract stream range index by lazy load StreamSetObject #2710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log.stream.s3.metadata;

import kafka.server.BrokerServer;
Expand All @@ -35,8 +34,10 @@
import org.apache.kafka.metadata.stream.S3StreamSetObject;

import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.cache.LRUCache;
import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory;
import com.automq.stream.s3.index.LocalStreamRangeIndexCache;
import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex;
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3StreamConstant;
Expand All @@ -48,11 +49,14 @@
import com.automq.stream.s3.streams.StreamMetadataListener;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.Threads;
import com.google.common.collect.Sets;

import org.apache.orc.util.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -67,6 +71,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;

import static com.automq.stream.utils.FutureUtil.exec;
import static kafka.log.stream.s3.metadata.StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER;

public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class);
Expand All @@ -78,6 +83,8 @@ public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPub
private final LocalStreamRangeIndexCache indexCache;
private final Map<Long, StreamMetadataListener> streamMetadataListeners = new ConcurrentHashMap<>();

private Set<Long> streamSetObjectIds = Collections.emptySet();

public StreamMetadataManager(BrokerServer broker, int nodeId, ObjectReaderFactory objectReaderFactory,
LocalStreamRangeIndexCache indexCache) {
this.nodeId = nodeId;
Expand All @@ -98,16 +105,23 @@ public String name() {
@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
Set<Long> changedStreams;
Set<Long> streamSetObjectIds = this.streamSetObjectIds;
synchronized (this) {
if (newImage.highestOffsetAndEpoch().equals(this.metadataImage.highestOffsetAndEpoch())) {
return;
}
this.metadataImage = newImage;
changedStreams = delta.getOrCreateStreamsMetadataDelta().changedStreams();
}
this.streamSetObjectIds = Collections.unmodifiableSet(getStreamSetObjectIds());

// update streamBloomFilter
Set<Long> sets = Sets.difference(this.streamSetObjectIds, streamSetObjectIds);
sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject);
Copy link
Preview

Copilot AI Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable name 'sets' is misleading as it contains the difference between two sets (newly added objects). It should be called 'removedObjects' or similar to clarify that these are objects being removed from the bloom filter.

Suggested change
sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject);
Set<Long> removedStreamSetObjectIds = Sets.difference(this.streamSetObjectIds, streamSetObjectIds);
removedStreamSetObjectIds.forEach(STREAM_ID_BLOOM_FILTER::removeObject);

Copilot uses AI. Check for mistakes.


// retry all pending tasks
retryPendingTasks();
this.indexCache.asyncPrune(this::getStreamSetObjectIds);
this.indexCache.asyncPrune(() -> streamSetObjectIds);
Copy link
Preview

Copilot AI Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda captures the local variable 'streamSetObjectIds' which refers to the old set of objects. This should capture 'this.streamSetObjectIds' to use the updated set of stream set object IDs.

Suggested change
this.indexCache.asyncPrune(() -> streamSetObjectIds);
this.indexCache.asyncPrune(() -> this.streamSetObjectIds);

Copilot uses AI. Check for mistakes.

notifyMetadataListeners(changedStreams);
}

Expand Down Expand Up @@ -339,9 +353,74 @@ public void close() {
}
}

private static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter {
public static class StreamIdBloomFilter {
public static final double DEFAULT_FPP = 0.01;
private final LRUCache<Long/*objectId*/, BloomFilter> cache = new LRUCache<>();
private final long maxBloomFilterSize;
private long cacheSize = 0;

public StreamIdBloomFilter(long maxBloomFilterCacheSize) {
this.maxBloomFilterSize = maxBloomFilterCacheSize;
}

public synchronized void maintainCacheSize() {
while (cacheSize > maxBloomFilterSize) {
Map.Entry<Long, BloomFilter> entry = cache.pop();
if (entry != null) {
cacheSize -= Long.BYTES + entry.getValue().sizeInBytes();
}
}
}

public synchronized boolean mightContain(long objectId, long streamId) {
BloomFilter bloomFilter = cache.get(objectId);
if (bloomFilter == null) {
return true; // treat as exist
}

cache.touchIfExist(objectId);
return bloomFilter.testLong(streamId);
}

public synchronized void removeObject(long objectId) {
BloomFilter filter = cache.get(objectId);
if (cache.remove(objectId)) {
cacheSize -= Long.BYTES + filter.sizeInBytes();
}
}

public synchronized void update(long objectId, List<StreamOffsetRange> streamOffsetRanges) {
if (cache.containsKey(objectId)) {
return;
}

BloomFilter bloomFilter = new BloomFilter(streamOffsetRanges.size(), DEFAULT_FPP);

streamOffsetRanges.forEach(range -> bloomFilter.addLong(range.streamId()));
cache.put(objectId, bloomFilter);
cacheSize += Long.BYTES + bloomFilter.sizeInBytes();

maintainCacheSize();
}

public synchronized long sizeInBytes() {
return this.cacheSize;
}

public synchronized int objectNum() {
return this.cache.size();
}

public synchronized void clear() {
cache.clear();
}
}

public static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter {
private final S3ObjectsImage objectsImage;
private final ObjectReaderFactory objectReaderFactory;
public static final StreamIdBloomFilter STREAM_ID_BLOOM_FILTER = new StreamIdBloomFilter(20 * 1024 * 1024);
private S3StreamsMetadataImage.GetObjectsContext getObjectsContext;

public DefaultRangeGetter(S3ObjectsImage objectsImage,
ObjectReaderFactory objectReaderFactory) {
Expand All @@ -350,16 +429,46 @@ public DefaultRangeGetter(S3ObjectsImage objectsImage,
}

@Override
public CompletableFuture<Optional<StreamOffsetRange>> find(long objectId, long streamId) {
public void attachGetObjectsContext(S3StreamsMetadataImage.GetObjectsContext ctx) {
this.getObjectsContext = ctx;
}

public static void updateIndex(ObjectReader reader, Long nodeId, Long streamId) {
reader.basicObjectInfo().thenAccept(info -> {
Long objectId = reader.metadata().objectId();
List<StreamOffsetRange> streamOffsetRanges = info.indexBlock().streamOffsetRanges();

STREAM_ID_BLOOM_FILTER.update(objectId, streamOffsetRanges);

StreamSetObjectRangeIndex.getInstance().updateIndex(objectId, nodeId, streamId, streamOffsetRanges);
}).whenComplete((v, e) -> reader.release());
}

@Override
public CompletableFuture<Optional<StreamOffsetRange>> find(long objectId, long streamId, long nodeId, long orderId) {
S3Object s3Object = objectsImage.getObjectMetadata(objectId);
if (s3Object == null) {
return FutureUtil.failedFuture(new IllegalArgumentException("Cannot find object metadata for object: " + objectId));
}

boolean mightContain = STREAM_ID_BLOOM_FILTER.mightContain(objectId, streamId);
if (!mightContain) {
getObjectsContext.bloomFilterSkipSSOCount++;
return CompletableFuture.completedFuture(Optional.empty());
}

getObjectsContext.searchSSOStreamOffsetRangeCount++;
// The reader will be release after the find operation
@SuppressWarnings("resource")
ObjectReader reader = objectReaderFactory.get(new S3ObjectMetadata(objectId, s3Object.getObjectSize(), s3Object.getAttributes()));
CompletableFuture<Optional<StreamOffsetRange>> cf = reader.basicObjectInfo().thenApply(info -> info.indexBlock().findStreamOffsetRange(streamId));
cf.whenComplete((rst, ex) -> reader.release());
CompletableFuture<Optional<StreamOffsetRange>> cf = reader.basicObjectInfo()
.thenApply(info -> info.indexBlock().findStreamOffsetRange(streamId));
cf.whenCompleteAsync((rst, ex) -> {
if (rst.isEmpty()) {
getObjectsContext.searchSSORangeEmpty.add(1);
}
updateIndex(reader, nodeId, streamId);
}, StreamSetObjectRangeIndex.UPDATE_INDEX_THREAD_POOL);
return cf;
}

Expand Down
Loading
Loading