From 504b3822d6e7d1b91aa03a1983d3b700ba5a5e5e Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 25 Apr 2025 16:50:17 -0500 Subject: [PATCH 01/14] CNDB-13483: Fix loading PQ file when disabled_reads is true --- .../config/CassandraRelevantProperties.java | 7 + .../cassandra/index/sai/SSTableIndex.java | 35 ++- .../cassandra/index/sai/disk/EmptyIndex.java | 7 + .../index/sai/disk/SearchableIndex.java | 3 + .../disk/V1MetadataOnlySearchableIndex.java | 208 ++++++++++++++++++ .../index/sai/disk/v1/SSTableIndexWriter.java | 20 +- .../index/sai/disk/v1/SegmentMetadata.java | 14 +- .../index/sai/disk/v1/V1SearchableIndex.java | 6 + .../sai/disk/vector/CassandraOnHeapGraph.java | 116 +++++++--- 9 files changed, 373 insertions(+), 43 deletions(-) create mode 100644 src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index add5d3a9b0da..34b4170fef7a 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -527,6 +527,13 @@ public enum CassandraRelevantProperties */ SAI_INDEX_READS_DISABLED("cassandra.sai.disabled_reads", "false"), + /** + * Only takes effect when SAI_INDEX_READS_DISABLED is true. If true, creates a lazy index searcher that only loads + * segment metadata and has the ability to load extra files, like the PQ file for vector indexes. Currently only + * affects vector indexes. Other indexes fall back to the empty index searcher when SAI_INDEX_READS_DISABLED is true. + */ + SAI_INDEX_LOAD_SEGMENT_METADATA_ONLY("cassandra.sai.load_segment_metadata_only", "true"), + /** * Allows custom implementation of {@link SensorsFactory} to optionally create * and configure {@link org.apache.cassandra.sensors.RequestSensors} instances. diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index c885f672bf2c..2f371a959205 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -37,12 +37,15 @@ import org.apache.cassandra.db.virtual.SimpleDataSet; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sai.disk.EmptyIndex; +import org.apache.cassandra.index.sai.disk.V1MetadataOnlySearchableIndex; import org.apache.cassandra.index.sai.disk.PrimaryKeyMapIterator; import org.apache.cassandra.index.sai.disk.SearchableIndex; import org.apache.cassandra.index.sai.disk.format.IndexComponents; import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet; import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.v1.Segment; +import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v1.V1SearchableIndex; import org.apache.cassandra.index.sai.iterators.KeyRangeAntiJoinIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; @@ -53,6 +56,7 @@ import org.apache.cassandra.io.sstable.SSTableIdFactory; import org.apache.cassandra.io.sstable.SSTableWatcher; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CloseableIterator; @@ -92,8 +96,19 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte { if (CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean()) { - logger.info("Creating dummy (empty) index searcher for sstable {} as SAI index reads are disabled", sstableContext.sstable.descriptor); - return new EmptyIndex(); + var context = perIndexComponents.context(); + if (context != null + && context.isVector() + && CassandraRelevantProperties.SAI_INDEX_LOAD_SEGMENT_METADATA_ONLY.getBoolean()) + { + logger.info("Creating a lazy index searcher for sstable {} as SAI index reads are disabled, but this is a vector index", sstableContext.sstable.descriptor.id); + return new V1MetadataOnlySearchableIndex(sstableContext, perIndexComponents); + } + else + { + logger.info("Creating dummy (empty) index searcher for sstable {} as SAI index reads are disabled", sstableContext.sstable.descriptor); + return new EmptyIndex(); + } } return perIndexComponents.onDiskFormat().newSearchableIndex(sstableContext, perIndexComponents); @@ -122,6 +137,22 @@ public List getSegments() return searchableIndex.getSegments(); } + public List getSegmentMetadatas() + { + return searchableIndex.getSegmentMetadatas(); + } + + public boolean areSegmentsLoaded() + { + return searchableIndex instanceof V1SearchableIndex; + } + + public FileHandle pq() + { + assert searchableIndex instanceof V1MetadataOnlySearchableIndex; + return ((V1MetadataOnlySearchableIndex) searchableIndex).pq(); + } + public long indexFileCacheSize() { return searchableIndex.indexFileCacheSize(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java index 501d2c9833ed..97216580bc8c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java @@ -28,6 +28,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.disk.v1.Segment; +import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -113,6 +114,12 @@ public List getSegments() return List.of(); } + @Override + public List getSegmentMetadatas() + { + return List.of(); + } + @Override public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java index 7aea633376ee..214eff2cc29a 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java @@ -29,6 +29,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.disk.v1.Segment; +import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -84,6 +85,8 @@ public List> orderResultsBy(QueryContex List getSegments(); + List getSegmentMetadatas(); + public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable); long estimateMatchingRowsCount(Expression predicate, AbstractBounds keyRange); diff --git a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java new file mode 100644 index 000000000000..515df949f1bf --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.disk; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.virtual.SimpleDataSet; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.QueryContext; +import org.apache.cassandra.index.sai.SSTableContext; +import org.apache.cassandra.index.sai.disk.format.IndexComponents; +import org.apache.cassandra.index.sai.disk.v1.MetadataSource; +import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; +import org.apache.cassandra.index.sai.disk.v1.Segment; +import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; +import org.apache.cassandra.index.sai.plan.Expression; +import org.apache.cassandra.index.sai.plan.Orderer; +import org.apache.cassandra.index.sai.utils.PrimaryKey; +import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; +import org.apache.cassandra.index.sai.utils.TypeUtil; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.Throwables; + +/** + * An index that eagerly loads segment metadata and nothing else. It is currently only used for vector indexes to + * read PQ files during compaction. + */ +public class V1MetadataOnlySearchableIndex implements SearchableIndex +{ + private final List metadatas; + private final DecoratedKey minKey; + private final DecoratedKey maxKey; // in token order + private final ByteBuffer minTerm; + private final ByteBuffer maxTerm; + private final long minSSTableRowId, maxSSTableRowId; + private final long numRows; + private PerIndexFiles indexFiles; + + public V1MetadataOnlySearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead perIndexComponents) + { + var indexContext = perIndexComponents.context(); + try + { + this.indexFiles = new PerIndexFiles(perIndexComponents); + + final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents); + + // We skip loading the terms distribution becuase this class doesn't use them for now. + metadatas = SegmentMetadata.load(source, indexContext, false); + + this.minKey = metadatas.get(0).minKey.partitionKey(); + this.maxKey = metadatas.get(metadatas.size() - 1).maxKey.partitionKey(); + + var version = perIndexComponents.version(); + this.minTerm = metadatas.stream().map(m -> m.minTerm).min(TypeUtil.comparator(indexContext.getValidator(), version)).orElse(null); + this.maxTerm = metadatas.stream().map(m -> m.maxTerm).max(TypeUtil.comparator(indexContext.getValidator(), version)).orElse(null); + + this.numRows = metadatas.stream().mapToLong(m -> m.numRows).sum(); + + this.minSSTableRowId = metadatas.get(0).minSSTableRowId; + this.maxSSTableRowId = metadatas.get(metadatas.size() - 1).maxSSTableRowId; + } + catch (Throwable t) + { + FileUtils.closeQuietly(indexFiles); + FileUtils.closeQuietly(sstableContext); + throw Throwables.unchecked(t); + } + } + + @Override + public long indexFileCacheSize() + { + // TODO what is the right value here? + return 0; + } + + @Override + public long getRowCount() + { + return numRows; + } + + @Override + public long minSSTableRowId() + { + return minSSTableRowId; + } + @Override + public long maxSSTableRowId() + { + return maxSSTableRowId; + } + + @Override + public ByteBuffer minTerm() + { + return minTerm; + } + + @Override + public ByteBuffer maxTerm() + { + return maxTerm; + } + + @Override + public DecoratedKey minKey() + { + return minKey; + } + + @Override + public DecoratedKey maxKey() + { + return maxKey; + } + + @Override + public KeyRangeIterator search(Expression expression, + AbstractBounds keyRange, + QueryContext context, + boolean defer, + int limit) throws IOException + { + // This index is not meant for searching, only for accessing metadata and index files + throw new UnsupportedOperationException(); + } + + @Override + public List> orderBy(Orderer orderer, + Expression slice, + AbstractBounds keyRange, + QueryContext context, + int limit, + long totalRows) throws IOException + { + // This index is not meant for searching, only for accessing metadata and index files + throw new UnsupportedOperationException(); + } + + @Override + public List getSegments() + { + throw new UnsupportedOperationException(); + } + + @Override + public List getSegmentMetadatas() + { + return metadatas; + } + + public FileHandle pq() + { + return indexFiles.pq(); + } + + @Override + public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable) + { + // TODO what is valid here? + } + + @Override + public long estimateMatchingRowsCount(Expression predicate, AbstractBounds keyRange) + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException + { + FileUtils.closeQuietly(indexFiles); + } + + @Override + public List> orderResultsBy(QueryContext context, List keys, Orderer orderer, int limit, long totalRows) throws IOException + { + throw new UnsupportedOperationException(); + } +} + diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index a4812fb120ac..bba6a77ec79c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; import com.google.common.base.Preconditions; @@ -40,6 +42,7 @@ import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SSTableIndex; import org.apache.cassandra.index.sai.disk.PerIndexWriter; +import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.IndexComponents; import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; @@ -48,11 +51,13 @@ import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.disk.vector.CassandraDiskAnn; import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph; +import org.apache.cassandra.index.sai.disk.vector.VectorCompression; import org.apache.cassandra.index.sai.disk.vector.VectorCompression.CompressionType; import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.storage.StorageProvider; +import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; @@ -350,7 +355,7 @@ private SegmentBuilder newSegmentBuilder(long rowIdOffset) throws IOException // if we have a PQ instance available, we can use it to build a CompactionGraph; // otherwise, build on heap (which will create PQ for next time, if we have enough vectors) - var pqi = CassandraOnHeapGraph.getPqIfPresent(indexContext, vc -> vc.type == CompressionType.PRODUCT_QUANTIZATION); + var pqi = CassandraOnHeapGraph.getPqIfPresent(indexContext); // If no PQ instance available in indexes of completed sstables, check if we just wrote one in the previous segment if (pqi == null && !segments.isEmpty()) pqi = maybeReadPqFromLastSegment(); @@ -387,6 +392,7 @@ else if (indexContext.isLiteral()) private static boolean allRowsHaveVectorsInWrittenSegments(IndexContext indexContext) { + // TODO should we load all of these for this op? for (SSTableIndex index : indexContext.getView().getIndexes()) { for (Segment segment : index.getSegments()) @@ -406,12 +412,16 @@ private CassandraOnHeapGraph.PqInfo maybeReadPqFromLastSegment() throws IOExcept { var pqComponent = perIndexComponents.get(IndexComponentType.PQ); assert pqComponent != null; // we always have a PQ component even if it's not actually PQ compression + try (var fhBuilder = StorageProvider.instance.indexBuildTimeFileHandleBuilderFor(pqComponent)) + { + return maybeReadPqFromSegment(segments.get(segments.size() - 1), fhBuilder.complete()); + } + } - try (var fhBuilder = StorageProvider.instance.indexBuildTimeFileHandleBuilderFor(pqComponent); - var fh = fhBuilder.complete(); - var reader = fh.createReader()) + public static CassandraOnHeapGraph.PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) throws IOException + { + try (fh; var reader = fh.createReader()) { - var sm = segments.get(segments.size() - 1); long offset = sm.componentMetadatas.get(IndexComponentType.PQ).offset; // close parallel to code in CassandraDiskANN constructor, but different enough // (we only want the PQ codebook) that it's difficult to extract into a common method diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java index b8f4cd769873..2a3ec622a7b1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentMetadata.java @@ -134,7 +134,7 @@ public class SegmentMetadata implements Comparable private static final Logger logger = LoggerFactory.getLogger(SegmentMetadata.class); @SuppressWarnings("resource") - private SegmentMetadata(IndexInput input, IndexContext context, Version version) throws IOException + private SegmentMetadata(IndexInput input, IndexContext context, Version version, boolean loadTermsDistribution) throws IOException { PrimaryKey.Factory primaryKeyFactory = context.keyFactory(); AbstractType termsType = context.getValidator(); @@ -155,7 +155,8 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version) long fp = input.getFilePointer(); if (len > 0) { - td = TermsDistribution.read(input, termsType); + if (loadTermsDistribution) + td = TermsDistribution.read(input, termsType); input.seek(fp + len); } } @@ -163,8 +164,15 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version) this.componentMetadatas = new SegmentMetadata.ComponentMetadataMap(input); } + @SuppressWarnings("resource") public static List load(MetadataSource source, IndexContext context) throws IOException + { + return load(source, context, true); + } + + @SuppressWarnings("resource") + public static List load(MetadataSource source, IndexContext context, boolean loadTermsDistribution) throws IOException { IndexInput input = source.get(NAME); @@ -175,7 +183,7 @@ public static List load(MetadataSource source, IndexContext con for (int i = 0; i < segmentCount; i++) { - segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion())); + segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), loadTermsDistribution)); } return segmentMetadata; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java index 28f90d590f8c..d00950c9e6a6 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java @@ -252,6 +252,12 @@ public List getSegments() return segments; } + @Override + public List getSegmentMetadatas() + { + return metadatas; + } + @Override public void populateSystemView(SimpleDataSet dataset, SSTableReader sstable) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index 5c2dbc2190c6..bf6f38adedb0 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -24,15 +24,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.function.IntUnaryOperator; import java.util.function.ToIntFunction; import java.util.stream.IntStream; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -64,7 +63,6 @@ import io.github.jbellis.jvector.vector.types.VectorFloat; import io.github.jbellis.jvector.vector.types.VectorTypeSupport; import org.agrona.collections.IntHashSet; -import org.apache.cassandra.db.compaction.CompactionSSTable; import org.apache.cassandra.db.marshal.VectorType; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -74,7 +72,7 @@ import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.IndexComponents; -import org.apache.cassandra.index.sai.disk.v1.Segment; +import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v2.V2VectorPostingsWriter; @@ -503,39 +501,30 @@ static SegmentMetadata.ComponentMetadataMap createMetadataMap(long termsOffset, * "Best" means the most recent one that hits the row count target of {@link ProductQuantization#MAX_PQ_TRAINING_SET_SIZE}, * or the one with the most rows if none are larger than that. */ - public static PqInfo getPqIfPresent(IndexContext indexContext, Function matcher) + public static PqInfo getPqIfPresent(IndexContext indexContext) throws IOException { - // Retrieve the first compressed vectors for a segment with at least MAX_PQ_TRAINING_SET_SIZE rows - // or the one with the most rows if none reach that size - var indexes = new ArrayList<>(indexContext.getView().getIndexes()); - indexes.sort(Comparator.comparing(SSTableIndex::getSSTable, CompactionSSTable.maxTimestampDescending)); - - PqInfo cvi = null; - long maxRows = 0; - for (SSTableIndex index : indexes) + // TODO when compacting, this view is likely the whole table, is it worth only considering the sstables that + // are being compacted? + // Flatten all segments, sorted by size then timestamp (size is capped to MAX_PQ_TRAINING_SET_SIZE) + var sortedSegments = indexContext.getView().getIndexes().stream() + .flatMap(CustomSegmentSorter::streamSegments) + .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize > MIN_PQ_ROWS) + .sorted() + .iterator(); + + while (sortedSegments.hasNext()) { - for (Segment segment : index.getSegments()) - { - if (segment.metadata.numRows < maxRows) - continue; - - var searcher = (V2VectorIndexSearcher) segment.getIndexSearcher(); - var cv = searcher.getCompression(); - if (matcher.apply(cv)) - { - // We can exit now because we won't find a better candidate - var candidate = new PqInfo(searcher.getPQ(), searcher.containsUnitVectors(), segment.metadata.numRows); - if (segment.metadata.numRows >= ProductQuantization.MAX_PQ_TRAINING_SET_SIZE) - return candidate; - - cvi = candidate; - maxRows = segment.metadata.numRows; - } - } + var customSegment = sortedSegments.next(); + // Because we sorted based on size then timestamp, this is the best match (assuming it exists) + var pqInfo = customSegment.getPqInfo(); + if (pqInfo != null) + return pqInfo; } - return cvi; + + return null; // nothing matched } + private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPostings remapped, IndexContext indexContext) throws IOException { var preferredCompression = sourceModel.compressionProvider.apply(vectorValues.dimension()); @@ -551,7 +540,7 @@ private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPos // build encoder (expensive for PQ, cheaper for BQ) if (preferredCompression.type == CompressionType.PRODUCT_QUANTIZATION) { - var pqi = getPqIfPresent(indexContext, preferredCompression::equals); + var pqi = getPqIfPresent(indexContext); compressor = computeOrRefineFrom(pqi, preferredCompression); } else @@ -703,4 +692,65 @@ public RandomAccessVectorValues copy() return new RemappedVectorValues(remapped, maxNewOrdinal, vectorValues.copy()); } } + + private static class CustomSegmentSorter implements Comparable + { + private final SSTableIndex sstableIndex; + private final int numRowsOrMaxPQTrainingSetSize; + private final long timestamp; + private final int segmentPosition; + + private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segmentPosition) + { + this.sstableIndex = sstableIndex; + // TODO give the size cost of larger PQ sets, is it worth trying to get the PQ object closest to this + // value? I'm concerned that we'll grab something pretty large for mem. + this.numRowsOrMaxPQTrainingSetSize = (int) Math.min(numRows, ProductQuantization.MAX_PQ_TRAINING_SET_SIZE); + this.timestamp = sstableIndex.getSSTable().getMaxTimestamp(); + this.segmentPosition = segmentPosition; + } + + private PqInfo getPqInfo() throws IOException + { + if (sstableIndex.areSegmentsLoaded()) + { + var segment = sstableIndex.getSegments().get(segmentPosition); + V2VectorIndexSearcher searcher = (V2VectorIndexSearcher) segment.getIndexSearcher(); + // Skip segments that don't have PQ + if (CompressionType.PRODUCT_QUANTIZATION != searcher.getCompression().type) + return null; + + // Because we sorted based on size then timestamp, this is the best match + return new PqInfo(searcher.getPQ(), searcher.containsUnitVectors(), segment.metadata.numRows); + } + else + { + // We have to load from disk here + var segmentMetadata = sstableIndex.getSegmentMetadatas().get(segmentPosition); + // Returns null if wrong uses wrong compression type. + return SSTableIndexWriter.maybeReadPqFromSegment(segmentMetadata, sstableIndex.pq()); + } + } + + @Override + public int compareTo(CustomSegmentSorter o) + { + // Sort by size descending, then timestamp descending for sstables + int cmp = Long.compare(numRowsOrMaxPQTrainingSetSize, o.numRowsOrMaxPQTrainingSetSize); + if (cmp != 0) + return cmp; + return Long.compare(timestamp, o.timestamp); + } + + private static Stream streamSegments(SSTableIndex index) + { + var results = new ArrayList(); + + var metadatas = index.getSegmentMetadatas(); + for (int i = 0; i < metadatas.size(); i++) + results.add(new CustomSegmentSorter(index, metadatas.get(i).numRows, i)); + + return results.stream(); + } + } } From 60d19fef30c690ed0cb187aba6bc56a22c86222f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 25 Apr 2025 16:56:58 -0500 Subject: [PATCH 02/14] Refactor code to pull PQ fetching logic into single class --- .../index/sai/disk/v1/SSTableIndexWriter.java | 36 +--- .../sai/disk/vector/CassandraOnHeapGraph.java | 110 +---------- .../vector/ProductQuantizationFetcher.java | 177 ++++++++++++++++++ 3 files changed, 184 insertions(+), 139 deletions(-) create mode 100644 src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index bba6a77ec79c..0c07180797c8 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -51,6 +51,7 @@ import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.disk.vector.CassandraDiskAnn; import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph; +import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; import org.apache.cassandra.index.sai.disk.vector.VectorCompression; import org.apache.cassandra.index.sai.disk.vector.VectorCompression.CompressionType; import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; @@ -355,7 +356,7 @@ private SegmentBuilder newSegmentBuilder(long rowIdOffset) throws IOException // if we have a PQ instance available, we can use it to build a CompactionGraph; // otherwise, build on heap (which will create PQ for next time, if we have enough vectors) - var pqi = CassandraOnHeapGraph.getPqIfPresent(indexContext); + var pqi = ProductQuantizationFetcher.getPqIfPresent(indexContext); // If no PQ instance available in indexes of completed sstables, check if we just wrote one in the previous segment if (pqi == null && !segments.isEmpty()) pqi = maybeReadPqFromLastSegment(); @@ -408,42 +409,13 @@ private static boolean allRowsHaveVectorsInWrittenSegments(IndexContext indexCon return true; } - private CassandraOnHeapGraph.PqInfo maybeReadPqFromLastSegment() throws IOException + private ProductQuantizationFetcher.PqInfo maybeReadPqFromLastSegment() throws IOException { var pqComponent = perIndexComponents.get(IndexComponentType.PQ); assert pqComponent != null; // we always have a PQ component even if it's not actually PQ compression try (var fhBuilder = StorageProvider.instance.indexBuildTimeFileHandleBuilderFor(pqComponent)) { - return maybeReadPqFromSegment(segments.get(segments.size() - 1), fhBuilder.complete()); + return ProductQuantizationFetcher.maybeReadPqFromSegment(segments.get(segments.size() - 1), fhBuilder.complete()); } } - - public static CassandraOnHeapGraph.PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) throws IOException - { - try (fh; var reader = fh.createReader()) - { - long offset = sm.componentMetadatas.get(IndexComponentType.PQ).offset; - // close parallel to code in CassandraDiskANN constructor, but different enough - // (we only want the PQ codebook) that it's difficult to extract into a common method - reader.seek(offset); - boolean unitVectors; - if (reader.readInt() == CassandraDiskAnn.PQ_MAGIC) - { - reader.readInt(); // skip over version - unitVectors = reader.readBoolean(); - } - else - { - unitVectors = true; - reader.seek(offset); - } - var compressionType = CompressionType.values()[reader.readByte()]; - if (compressionType == CompressionType.PRODUCT_QUANTIZATION) - { - var pq = ProductQuantization.load(reader); - return new CassandraOnHeapGraph.PqInfo(pq, unitVectors, sm.numRows); - } - } - return null; - } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index bf6f38adedb0..dde6b7b4859e 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -496,35 +496,6 @@ static SegmentMetadata.ComponentMetadataMap createMetadataMap(long termsOffset, return metadataMap; } - /** - * Return the best previous CompressedVectors for this column that matches the `matcher` predicate. - * "Best" means the most recent one that hits the row count target of {@link ProductQuantization#MAX_PQ_TRAINING_SET_SIZE}, - * or the one with the most rows if none are larger than that. - */ - public static PqInfo getPqIfPresent(IndexContext indexContext) throws IOException - { - // TODO when compacting, this view is likely the whole table, is it worth only considering the sstables that - // are being compacted? - // Flatten all segments, sorted by size then timestamp (size is capped to MAX_PQ_TRAINING_SET_SIZE) - var sortedSegments = indexContext.getView().getIndexes().stream() - .flatMap(CustomSegmentSorter::streamSegments) - .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize > MIN_PQ_ROWS) - .sorted() - .iterator(); - - while (sortedSegments.hasNext()) - { - var customSegment = sortedSegments.next(); - // Because we sorted based on size then timestamp, this is the best match (assuming it exists) - var pqInfo = customSegment.getPqInfo(); - if (pqInfo != null) - return pqInfo; - } - - return null; // nothing matched - } - - private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPostings remapped, IndexContext indexContext) throws IOException { var preferredCompression = sourceModel.compressionProvider.apply(vectorValues.dimension()); @@ -540,7 +511,7 @@ private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPos // build encoder (expensive for PQ, cheaper for BQ) if (preferredCompression.type == CompressionType.PRODUCT_QUANTIZATION) { - var pqi = getPqIfPresent(indexContext); + var pqi = ProductQuantizationFetcher.getPqIfPresent(indexContext); compressor = computeOrRefineFrom(pqi, preferredCompression); } else @@ -584,7 +555,8 @@ static void writePqHeader(DataOutput writer, boolean unitVectors, CompressionTyp writer.writeByte(type.ordinal()); } - ProductQuantization computeOrRefineFrom(PqInfo existingInfo, VectorCompression preferredCompression) + ProductQuantization computeOrRefineFrom(ProductQuantizationFetcher.PqInfo existingInfo, + VectorCompression preferredCompression) { if (existingInfo == null) { @@ -623,21 +595,6 @@ public enum InvalidVectorBehavior FAIL } - public static class PqInfo - { - public final ProductQuantization pq; - /** an empty Optional indicates that the index was written with an older version that did not record this information */ - public final boolean unitVectors; - public final long rowCount; - - public PqInfo(ProductQuantization pq, boolean unitVectors, long rowCount) - { - this.pq = pq; - this.unitVectors = unitVectors; - this.rowCount = rowCount; - } - } - /** ensures that the graph is connected -- normally not necessary but it can help tests reason about the state */ @VisibleForTesting public void cleanup() @@ -692,65 +649,4 @@ public RandomAccessVectorValues copy() return new RemappedVectorValues(remapped, maxNewOrdinal, vectorValues.copy()); } } - - private static class CustomSegmentSorter implements Comparable - { - private final SSTableIndex sstableIndex; - private final int numRowsOrMaxPQTrainingSetSize; - private final long timestamp; - private final int segmentPosition; - - private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segmentPosition) - { - this.sstableIndex = sstableIndex; - // TODO give the size cost of larger PQ sets, is it worth trying to get the PQ object closest to this - // value? I'm concerned that we'll grab something pretty large for mem. - this.numRowsOrMaxPQTrainingSetSize = (int) Math.min(numRows, ProductQuantization.MAX_PQ_TRAINING_SET_SIZE); - this.timestamp = sstableIndex.getSSTable().getMaxTimestamp(); - this.segmentPosition = segmentPosition; - } - - private PqInfo getPqInfo() throws IOException - { - if (sstableIndex.areSegmentsLoaded()) - { - var segment = sstableIndex.getSegments().get(segmentPosition); - V2VectorIndexSearcher searcher = (V2VectorIndexSearcher) segment.getIndexSearcher(); - // Skip segments that don't have PQ - if (CompressionType.PRODUCT_QUANTIZATION != searcher.getCompression().type) - return null; - - // Because we sorted based on size then timestamp, this is the best match - return new PqInfo(searcher.getPQ(), searcher.containsUnitVectors(), segment.metadata.numRows); - } - else - { - // We have to load from disk here - var segmentMetadata = sstableIndex.getSegmentMetadatas().get(segmentPosition); - // Returns null if wrong uses wrong compression type. - return SSTableIndexWriter.maybeReadPqFromSegment(segmentMetadata, sstableIndex.pq()); - } - } - - @Override - public int compareTo(CustomSegmentSorter o) - { - // Sort by size descending, then timestamp descending for sstables - int cmp = Long.compare(numRowsOrMaxPQTrainingSetSize, o.numRowsOrMaxPQTrainingSetSize); - if (cmp != 0) - return cmp; - return Long.compare(timestamp, o.timestamp); - } - - private static Stream streamSegments(SSTableIndex index) - { - var results = new ArrayList(); - - var metadatas = index.getSegmentMetadatas(); - for (int i = 0; i < metadatas.size(); i++) - results.add(new CustomSegmentSorter(index, metadatas.get(i).numRows, i)); - - return results.stream(); - } - } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java new file mode 100644 index 000000000000..7c3fffcf9329 --- /dev/null +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.disk.vector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.stream.Stream; + +import io.github.jbellis.jvector.quantization.ProductQuantization; +import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.SSTableIndex; +import org.apache.cassandra.index.sai.disk.format.IndexComponentType; +import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; +import org.apache.cassandra.io.util.FileHandle; + +public class ProductQuantizationFetcher +{ + + /** + * Return the best previous CompressedVectors for this column that matches the `matcher` predicate. + * "Best" means the most recent one that hits the row count target of {@link ProductQuantization#MAX_PQ_TRAINING_SET_SIZE}, + * or the one with the most rows if none are larger than that. + */ + public static PqInfo getPqIfPresent(IndexContext indexContext) throws IOException + { + // TODO when compacting, this view is likely the whole table, is it worth only considering the sstables that + // are being compacted? + // Flatten all segments, sorted by size then timestamp (size is capped to MAX_PQ_TRAINING_SET_SIZE) + var sortedSegments = indexContext.getView().getIndexes().stream() + .flatMap(CustomSegmentSorter::streamSegments) + .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize > CassandraOnHeapGraph.MIN_PQ_ROWS) + .sorted() + .iterator(); + + while (sortedSegments.hasNext()) + { + var customSegment = sortedSegments.next(); + // Because we sorted based on size then timestamp, this is the best match (assuming it exists) + var pqInfo = customSegment.getPqInfo(); + if (pqInfo != null) + return pqInfo; + } + + return null; // nothing matched + } + + public static class PqInfo + { + public final ProductQuantization pq; + /** an empty Optional indicates that the index was written with an older version that did not record this information */ + public final boolean unitVectors; + public final long rowCount; + + public PqInfo(ProductQuantization pq, boolean unitVectors, long rowCount) + { + this.pq = pq; + this.unitVectors = unitVectors; + this.rowCount = rowCount; + } + } + + private static class CustomSegmentSorter implements Comparable + { + private final SSTableIndex sstableIndex; + private final int numRowsOrMaxPQTrainingSetSize; + private final long timestamp; + private final int segmentPosition; + + private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segmentPosition) + { + this.sstableIndex = sstableIndex; + // TODO give the size cost of larger PQ sets, is it worth trying to get the PQ object closest to this + // value? I'm concerned that we'll grab something pretty large for mem. + this.numRowsOrMaxPQTrainingSetSize = (int) Math.min(numRows, ProductQuantization.MAX_PQ_TRAINING_SET_SIZE); + this.timestamp = sstableIndex.getSSTable().getMaxTimestamp(); + this.segmentPosition = segmentPosition; + } + + private PqInfo getPqInfo() throws IOException + { + if (sstableIndex.areSegmentsLoaded()) + { + var segment = sstableIndex.getSegments().get(segmentPosition); + V2VectorIndexSearcher searcher = (V2VectorIndexSearcher) segment.getIndexSearcher(); + // Skip segments that don't have PQ + if (VectorCompression.CompressionType.PRODUCT_QUANTIZATION != searcher.getCompression().type) + return null; + + // Because we sorted based on size then timestamp, this is the best match + return new PqInfo(searcher.getPQ(), searcher.containsUnitVectors(), segment.metadata.numRows); + } + else + { + // We have to load from disk here + var segmentMetadata = sstableIndex.getSegmentMetadatas().get(segmentPosition); + // Returns null if wrong uses wrong compression type. + return maybeReadPqFromSegment(segmentMetadata, sstableIndex.pq()); + } + } + + @Override + public int compareTo(CustomSegmentSorter o) + { + // Sort by size descending, then timestamp descending for sstables + int cmp = Long.compare(numRowsOrMaxPQTrainingSetSize, o.numRowsOrMaxPQTrainingSetSize); + if (cmp != 0) + return cmp; + return Long.compare(timestamp, o.timestamp); + } + + private static Stream streamSegments(SSTableIndex index) + { + var results = new ArrayList(); + + var metadatas = index.getSegmentMetadatas(); + for (int i = 0; i < metadatas.size(); i++) + results.add(new CustomSegmentSorter(index, metadatas.get(i).numRows, i)); + + return results.stream(); + } + } + + /** + * Takes a segment metadata and file handle and returns the PQ info if the segment has PQ that uses + * compression type {@link VectorCompression.CompressionType#PRODUCT_QUANTIZATION}. + * + * @param sm segment metadata + * @param fh PQ file handle, closed by this method + * @return PqInfo if the segment has PQ, null otherwise + * @throws IOException if an I/O error occurs + */ + public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) throws IOException + { + try (fh; var reader = fh.createReader()) + { + long offset = sm.componentMetadatas.get(IndexComponentType.PQ).offset; + // close parallel to code in CassandraDiskANN constructor, but different enough + // (we only want the PQ codebook) that it's difficult to extract into a common method + reader.seek(offset); + boolean unitVectors; + if (reader.readInt() == CassandraDiskAnn.PQ_MAGIC) + { + reader.readInt(); // skip over version + unitVectors = reader.readBoolean(); + } + else + { + unitVectors = true; + reader.seek(offset); + } + var compressionType = VectorCompression.CompressionType.values()[reader.readByte()]; + if (compressionType == VectorCompression.CompressionType.PRODUCT_QUANTIZATION) + { + var pq = ProductQuantization.load(reader); + return new PqInfo(pq, unitVectors, sm.numRows); + } + } + return null; + } +} From 7ffd115a7d521f7ebfc99d46d7df2b5b26fdbf0d Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 25 Apr 2025 17:24:34 -0500 Subject: [PATCH 03/14] Add comment asking about reducing memory utilization when loading PQ --- .../apache/cassandra/index/sai/SSTableIndex.java | 6 +++--- .../sai/disk/V1MetadataOnlySearchableIndex.java | 4 ++-- .../index/sai/disk/v1/SSTableIndexWriter.java | 5 +++-- .../disk/vector/ProductQuantizationFetcher.java | 15 +++++++++++---- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index 2f371a959205..baab37666368 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -43,6 +43,7 @@ import org.apache.cassandra.index.sai.disk.format.IndexComponents; import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet; import org.apache.cassandra.index.sai.disk.format.Version; +import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v1.V1SearchableIndex; @@ -56,7 +57,6 @@ import org.apache.cassandra.io.sstable.SSTableIdFactory; import org.apache.cassandra.io.sstable.SSTableWatcher; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CloseableIterator; @@ -147,10 +147,10 @@ public boolean areSegmentsLoaded() return searchableIndex instanceof V1SearchableIndex; } - public FileHandle pq() + public PerIndexFiles indexFiles() { assert searchableIndex instanceof V1MetadataOnlySearchableIndex; - return ((V1MetadataOnlySearchableIndex) searchableIndex).pq(); + return ((V1MetadataOnlySearchableIndex) searchableIndex).indexFiles(); } public long indexFileCacheSize() diff --git a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java index 515df949f1bf..2868b2d33f85 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java @@ -176,9 +176,9 @@ public List getSegmentMetadatas() return metadatas; } - public FileHandle pq() + public PerIndexFiles indexFiles() { - return indexFiles.pq(); + return indexFiles; } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 0c07180797c8..6b75e31b0fd7 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -413,9 +413,10 @@ private ProductQuantizationFetcher.PqInfo maybeReadPqFromLastSegment() throws IO { var pqComponent = perIndexComponents.get(IndexComponentType.PQ); assert pqComponent != null; // we always have a PQ component even if it's not actually PQ compression - try (var fhBuilder = StorageProvider.instance.indexBuildTimeFileHandleBuilderFor(pqComponent)) + try (var fhBuilder = StorageProvider.instance.indexBuildTimeFileHandleBuilderFor(pqComponent); + var fh = fhBuilder.complete()) { - return ProductQuantizationFetcher.maybeReadPqFromSegment(segments.get(segments.size() - 1), fhBuilder.complete()); + return ProductQuantizationFetcher.maybeReadPqFromSegment(segments.get(segments.size() - 1), fh); } } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index 7c3fffcf9329..3723b700371f 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -93,6 +93,7 @@ private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segment this.segmentPosition = segmentPosition; } + @SuppressWarnings("resource") private PqInfo getPqInfo() throws IOException { if (sstableIndex.areSegmentsLoaded()) @@ -110,8 +111,11 @@ private PqInfo getPqInfo() throws IOException { // We have to load from disk here var segmentMetadata = sstableIndex.getSegmentMetadatas().get(segmentPosition); - // Returns null if wrong uses wrong compression type. - return maybeReadPqFromSegment(segmentMetadata, sstableIndex.pq()); + try (var pq = sstableIndex.indexFiles().pq()) + { + // Returns null if wrong uses wrong compression type. + return maybeReadPqFromSegment(segmentMetadata, pq); + } } } @@ -142,13 +146,13 @@ private static Stream streamSegments(SSTableIndex index) * compression type {@link VectorCompression.CompressionType#PRODUCT_QUANTIZATION}. * * @param sm segment metadata - * @param fh PQ file handle, closed by this method + * @param fh PQ file handle * @return PqInfo if the segment has PQ, null otherwise * @throws IOException if an I/O error occurs */ public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) throws IOException { - try (fh; var reader = fh.createReader()) + try (var reader = fh.createReader()) { long offset = sm.componentMetadatas.get(IndexComponentType.PQ).offset; // close parallel to code in CassandraDiskANN constructor, but different enough @@ -168,6 +172,9 @@ public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) t var compressionType = VectorCompression.CompressionType.values()[reader.readByte()]; if (compressionType == VectorCompression.CompressionType.PRODUCT_QUANTIZATION) { + // TODO if this is really big, is there any way to either limit what we read or to just leave this + // as a disk backed PQ to reduce memory utilization? This is only used to help seed the initial + // training, so it seems excessive to load the whole thing into memory for large segments. var pq = ProductQuantization.load(reader); return new PqInfo(pq, unitVectors, sm.numRows); } From a98889782a0721fb554a6f2d6896a31bf39ae850 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 25 Apr 2025 17:39:32 -0500 Subject: [PATCH 04/14] Fix the broken allRowsHaveVectorsInWrittenSegments method This fixes the method so that it actually reads the postings list structure. However, there is a larger question of whether the implementation was actually correct. It all comes down to what is in the indexContext's view on a compactor. We really only want to get the info from the source components that are being compacted. Further investigation needed to determine if this is the right behavior. --- .../index/sai/disk/format/OnDiskFormat.java | 12 +++++++ .../index/sai/disk/v1/SSTableIndexWriter.java | 33 ++++++++++++++----- .../index/sai/disk/v1/V1OnDiskFormat.java | 8 +++++ .../index/sai/disk/v2/V2OnDiskFormat.java | 8 +++++ .../index/sai/disk/v3/V3OnDiskFormat.java | 1 - .../index/sai/disk/v5/V5OnDiskFormat.java | 9 +++++ 6 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java index d19dac7268cd..abcf92db83a4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java @@ -36,6 +36,8 @@ import org.apache.cassandra.index.sai.disk.v1.IndexSearcher; import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v5.V5OnDiskOrdinalsMap; +import org.apache.cassandra.index.sai.disk.vector.OnDiskOrdinalsMap; import org.apache.cassandra.index.sai.memory.RowMapping; import org.apache.cassandra.index.sai.memory.TrieMemtableIndex; import org.apache.cassandra.index.sai.utils.PrimaryKey; @@ -94,6 +96,16 @@ public interface OnDiskFormat */ public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexComponents.ForRead perSSTableComponents, PrimaryKey.Factory primaryKeyFactory, SSTableReader sstable) throws IOException; + /** + * Create a new {@link OnDiskOrdinalsMap} for the provided {@link PerIndexFiles} and {@link SegmentMetadata}. + * Only used by vector indexes currently. + * + * @param indexFiles + * @param segmentMetadata + * @return + */ + public OnDiskOrdinalsMap newOnDiskOrdinalsMap(PerIndexFiles indexFiles, SegmentMetadata segmentMetadata); + /** * Create a new {@link SearchableIndex} for an on-disk index. This is held by the {@SSTableIndex} * and shared between queries. diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 6b75e31b0fd7..96c64e6ff8e7 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -393,17 +393,34 @@ else if (indexContext.isLiteral()) private static boolean allRowsHaveVectorsInWrittenSegments(IndexContext indexContext) { - // TODO should we load all of these for this op? + // TODO should we load all of these for this op? It seems very unlikely that we want to consider the whole + // table when checking if all rows have vectors. A single empty vector will be enough to make this false, + // but that should really only impact that table. for (SSTableIndex index : indexContext.getView().getIndexes()) { - for (Segment segment : index.getSegments()) + if (index.areSegmentsLoaded()) { - if (segment.getIndexSearcher() instanceof V2VectorIndexSearcher) - return true; // V2 doesn't know, so we err on the side of being optimistic. See comments in CompactionGraph - var searcher = (V5VectorIndexSearcher) segment.getIndexSearcher(); - var structure = searcher.getPostingsStructure(); - if (structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) - return false; + for (Segment segment : index.getSegments()) + { + if (segment.getIndexSearcher() instanceof V2VectorIndexSearcher) + return true; // V2 doesn't know, so we err on the side of being optimistic. See comments in CompactionGraph + var searcher = (V5VectorIndexSearcher) segment.getIndexSearcher(); + var structure = searcher.getPostingsStructure(); + if (structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) + return false; + } + } + else + { + for (SegmentMetadata sm : index.getSegmentMetadatas()) + { + // May result in downloading file, but this metadata is valuable + try (var odm = index.getVersion().onDiskFormat().newOnDiskOrdinalsMap(index.indexFiles(), sm)) + { + if (odm.getStructure() == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) + return false; + } + } } } return true; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index 8afe5d62b4c1..3063fe802cf1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -49,6 +49,8 @@ import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet; import org.apache.cassandra.index.sai.disk.format.OnDiskFormat; import org.apache.cassandra.index.sai.disk.format.Version; +import org.apache.cassandra.index.sai.disk.v5.V5OnDiskOrdinalsMap; +import org.apache.cassandra.index.sai.disk.vector.OnDiskOrdinalsMap; import org.apache.cassandra.index.sai.memory.RowMapping; import org.apache.cassandra.index.sai.metrics.AbstractMetrics; import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; @@ -165,6 +167,12 @@ public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexComponents.ForRead per return new PartitionAwarePrimaryKeyMap.PartitionAwarePrimaryKeyMapFactory(perSSTableComponents, sstable, primaryKeyFactory); } + @Override + public OnDiskOrdinalsMap newOnDiskOrdinalsMap(PerIndexFiles indexFiles, SegmentMetadata segmentMetadata) + { + throw new UnsupportedOperationException(); + } + @Override public SearchableIndex newSearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead perIndexComponents) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java index 8ad67fcfd9e1..423b1c302a1e 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v2/V2OnDiskFormat.java @@ -41,6 +41,7 @@ import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v1.V1OnDiskFormat; +import org.apache.cassandra.index.sai.disk.vector.OnDiskOrdinalsMap; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -108,6 +109,13 @@ public PrimaryKeyMap.Factory newPrimaryKeyMapFactory(IndexComponents.ForRead per return new RowAwarePrimaryKeyMap.RowAwarePrimaryKeyMapFactory(perSSTableComponents, primaryKeyFactory, sstable); } + @Override + public OnDiskOrdinalsMap newOnDiskOrdinalsMap(PerIndexFiles indexFiles, SegmentMetadata segmentMetadata) + { + var postingListsMetadata = segmentMetadata.componentMetadatas.get(IndexComponentType.POSTING_LISTS); + return new V2OnDiskOrdinalsMap(indexFiles.postingLists(), postingListsMetadata.offset, postingListsMetadata.length); + } + @Override public IndexSearcher newIndexSearcher(SSTableContext sstableContext, IndexContext indexContext, diff --git a/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java index 3b3a76588ca9..905e2a750c37 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java @@ -23,7 +23,6 @@ import java.util.EnumSet; import java.util.Set; -import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java index 8b16ff5f55f9..acf185a4efe3 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v5/V5OnDiskFormat.java @@ -22,11 +22,13 @@ import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SSTableContext; +import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.v1.IndexSearcher; import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v4.V4OnDiskFormat; +import org.apache.cassandra.index.sai.disk.vector.OnDiskOrdinalsMap; public class V5OnDiskFormat extends V4OnDiskFormat { @@ -37,6 +39,13 @@ public static boolean writeV5VectorPostings() return Version.latest().onOrAfter(Version.DC); } + @Override + public OnDiskOrdinalsMap newOnDiskOrdinalsMap(PerIndexFiles indexFiles, SegmentMetadata segmentMetadata) + { + var postingListsMetadata = segmentMetadata.componentMetadatas.get(IndexComponentType.POSTING_LISTS); + return new V5OnDiskOrdinalsMap(indexFiles.postingLists(), postingListsMetadata.offset, postingListsMetadata.length); + } + @Override public IndexSearcher newIndexSearcher(SSTableContext sstableContext, IndexContext indexContext, From d3b2b22b059fc016876a8eae2351e14fb375d1e5 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Fri, 25 Apr 2025 20:35:48 -0500 Subject: [PATCH 05/14] Log IOException and continue processing to prevent unnecessary flakiness --- .../vector/ProductQuantizationFetcher.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index 3723b700371f..f0c1f8ac7913 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -20,8 +20,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Objects; import java.util.stream.Stream; +import org.slf4j.Logger; + import io.github.jbellis.jvector.quantization.ProductQuantization; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SSTableIndex; @@ -32,6 +35,7 @@ public class ProductQuantizationFetcher { + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ProductQuantizationFetcher.class); /** * Return the best previous CompressedVectors for this column that matches the `matcher` predicate. @@ -42,23 +46,15 @@ public static PqInfo getPqIfPresent(IndexContext indexContext) throws IOExceptio { // TODO when compacting, this view is likely the whole table, is it worth only considering the sstables that // are being compacted? - // Flatten all segments, sorted by size then timestamp (size is capped to MAX_PQ_TRAINING_SET_SIZE) - var sortedSegments = indexContext.getView().getIndexes().stream() - .flatMap(CustomSegmentSorter::streamSegments) - .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize > CassandraOnHeapGraph.MIN_PQ_ROWS) - .sorted() - .iterator(); - - while (sortedSegments.hasNext()) - { - var customSegment = sortedSegments.next(); - // Because we sorted based on size then timestamp, this is the best match (assuming it exists) - var pqInfo = customSegment.getPqInfo(); - if (pqInfo != null) - return pqInfo; - } - - return null; // nothing matched + // Flatten all segments, sorted by size (capped to MAX_PQ_TRAINING_SET_SIZE) then timestamp + return indexContext.getView().getIndexes().stream() + .flatMap(CustomSegmentSorter::streamSegments) + .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize > CassandraOnHeapGraph.MIN_PQ_ROWS) + .sorted() + .map(CustomSegmentSorter::getPqInfo) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); } public static class PqInfo @@ -94,7 +90,7 @@ private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segment } @SuppressWarnings("resource") - private PqInfo getPqInfo() throws IOException + private PqInfo getPqInfo() { if (sstableIndex.areSegmentsLoaded()) { @@ -150,7 +146,7 @@ private static Stream streamSegments(SSTableIndex index) * @return PqInfo if the segment has PQ, null otherwise * @throws IOException if an I/O error occurs */ - public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) throws IOException + public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) { try (var reader = fh.createReader()) { @@ -179,6 +175,10 @@ public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) t return new PqInfo(pq, unitVectors, sm.numRows); } } + catch (IOException e) + { + logger.warn("Failed to read PQ from segment {}. Skipping", sm, e); + } return null; } } From 6e4b72d0946aadf608fad574e7cf6891fdd060f2 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 6 May 2025 13:21:31 -0500 Subject: [PATCH 06/14] Cleanup unused throws in method signature --- .../index/sai/disk/vector/ProductQuantizationFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index eec7e5f3f640..e0de0f0b38e4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -43,7 +43,7 @@ public class ProductQuantizationFetcher * "Best" means the most recent one that hits the row count target of {@link ProductQuantization#MAX_PQ_TRAINING_SET_SIZE}, * or the one with the most rows if none are larger than that. */ - public static PqInfo getPqIfPresent(IndexContext indexContext) throws IOException + public static PqInfo getPqIfPresent(IndexContext indexContext) { // We get a referenced view becuase we might actually read a segment from disk, which requires that we // hold a lock on the index to prevent it from getting deleted concurrently. The PqInfo object is all in From bbfec37e45029d92901cac44de3ac49ca992e949 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 6 May 2025 21:30:52 -0500 Subject: [PATCH 07/14] Add test; minor fixes exposed by tests --- .../cassandra/index/sai/SSTableIndex.java | 3 +- .../vector/ProductQuantizationFetcher.java | 6 +- .../index/sai/cql/VectorCompactionTest.java | 89 ++++++++++++++++++- 3 files changed, 92 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index baab37666368..cd837eb3ee77 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -97,7 +97,8 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte if (CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.getBoolean()) { var context = perIndexComponents.context(); - if (context != null + if (!perIndexComponents.isEmpty() + && context != null && context.isVector() && CassandraRelevantProperties.SAI_INDEX_LOAD_SEGMENT_METADATA_ONLY.getBoolean()) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index e0de0f0b38e4..ab6718be0c01 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -55,8 +55,6 @@ public static PqInfo getPqIfPresent(IndexContext indexContext) return null; } - // TODO when compacting, this view is likely the whole table, is it worth only considering the sstables that - // are being compacted? try { // Retrieve the first compressed vectors for a segment with at least MAX_PQ_TRAINING_SET_SIZE rows @@ -64,7 +62,7 @@ public static PqInfo getPqIfPresent(IndexContext indexContext) // Flatten all segments, sorted by size (capped to MAX_PQ_TRAINING_SET_SIZE) then timestamp return view.getIndexes().stream() .flatMap(CustomSegmentSorter::streamSegments) - .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize > CassandraOnHeapGraph.MIN_PQ_ROWS) + .filter(customSegment -> customSegment.numRowsOrMaxPQTrainingSetSize >= CassandraOnHeapGraph.MIN_PQ_ROWS) .sorted() .map(CustomSegmentSorter::getPqInfo) .filter(Objects::nonNull) @@ -104,6 +102,8 @@ private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segment this.sstableIndex = sstableIndex; // TODO give the size cost of larger PQ sets, is it worth trying to get the PQ object closest to this // value? I'm concerned that we'll grab something pretty large for mem. + // Note to reviewers, it looks like we use the PQ in the BuildScoreProvider, so we might actually use more + // of the PQ vector space than I initially though. this.numRowsOrMaxPQTrainingSetSize = (int) Math.min(numRows, ProductQuantization.MAX_PQ_TRAINING_SET_SIZE); this.timestamp = sstableIndex.getSSTable().getMaxTimestamp(); this.segmentPosition = segmentPosition; diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java index 022665cb5778..af865dac08c7 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java @@ -21,13 +21,16 @@ import java.util.ArrayList; import java.util.stream.Collectors; +import org.junit.Before; import org.junit.Test; import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.marshal.FloatType; -import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat; +import org.apache.cassandra.index.sai.StorageAttachedIndex; +import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; -import org.apache.cassandra.index.sai.disk.vector.CompactionGraph; +import org.apache.cassandra.index.sai.disk.vector.VectorCompression; import static org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph.MIN_PQ_ROWS; import static org.junit.Assert.assertEquals; @@ -35,6 +38,12 @@ public class VectorCompactionTest extends VectorTester.Versioned { + @Before + public void resetDisabledReads() + { + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(false); + } + @Test public void testCompactionWithEnoughRowsForPQAndDeleteARow() { @@ -142,6 +151,82 @@ public void testZeroOrOneToManyCompaction() } } + @Test + public void testSaiIndexReadsDisabledWithEmptyVectorIndex() + { + createTable(); + + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(true); + + // Insert then delete a row, trigger compaction to produce the empty index + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", 0, vector(0, 1)); + flush(); + execute("DELETE FROM %s WHERE pk = 0"); + flush(); + compact(); + + // Now make index queryable + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(false); + reloadSSTableIndex(); + + // Confirm we can query the data (in an early implementation of the feature in this PR, this assertion failed) + assertRowCount(execute("SELECT * FROM %s ORDER BY v ANN OF [1,2] LIMIT 1"), 0); + } + + @SuppressWarnings("resource") + @Test + public void testSaiIndexReadsDisabled() + { + createTable("CREATE TABLE %s (pk int, v vector, PRIMARY KEY(pk))"); + var indexName = createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex'"); + + // We'll manage compaction manually for better control + disableCompaction(); + + for (int i = 0; i < MIN_PQ_ROWS; i++) + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i, randomVectorBoxed(2)); + + flush(); + + // Only add a handful of rows to the second sstable so we have to iterate to the older sstable in the + // pq selection logic + for (int i = MIN_PQ_ROWS; i < MIN_PQ_ROWS + 10; i++) + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i, randomVectorBoxed(2)); + + flush(); + + // Delete 100 rows so that we could only create a PQ if we use the original data. + for (int i = 0; i < 100; i++) + execute("DELETE FROM %s WHERE pk = ?", i); + + flush(); + + // Mimic setting of disabling reads, only takes effect on sstable index load, so reload + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(true); + reloadSSTableIndex(); + + // Now compact (this exercises the code, but doesn't actually prove that we used the past segment's PQ) + compact(); + + // Reload and make queryable + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(false); + reloadSSTableIndex(); + + // Assert that we have a product quantization + var index = (StorageAttachedIndex) getIndexManager(keyspace(), indexName).listIndexes().iterator().next(); + var sstableIndexes = index.getIndexContext().getView().getIndexes(); + assertEquals(1, sstableIndexes.size()); + var searcher = (V2VectorIndexSearcher) sstableIndexes.iterator().next().getSegments().iterator().next().getIndexSearcher(); + // We have a PQ even though we have fewer than MIN_PQ_ROWS because we used the original seed. + // We only test like this to confirm that it worked, not because this is a strictly required behavior. + // Further, it's unlikely we'll have so few rows. + assertEquals(VectorCompression.CompressionType.PRODUCT_QUANTIZATION, searcher.getCompression().type); + + // Confirm we can query the data + var result = execute( "SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 5", randomVectorBoxed(2)); + assertEquals(5, result.size()); + } + public void testZeroOrOneToManyCompactionInternal(int vectorsPerSstable, int sstables) { createTable(); From 0e3700b32d5cf05d0bf5fe6f23992f3a1cd9873f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 6 May 2025 21:38:20 -0500 Subject: [PATCH 08/14] Tweak postings strcuture logic (deeper changes to come) --- .../index/sai/disk/v1/SSTableIndexWriter.java | 44 +++++++++++-------- .../index/sai/cql/VectorCompactionTest.java | 1 + 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index a5833d8c3136..3beb06464fca 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -403,34 +403,42 @@ private static boolean allRowsHaveVectorsInWrittenSegments(IndexContext indexCon // TODO should we load all of these for this op? It seems very unlikely that we want to consider the whole // table when checking if all rows have vectors. A single empty vector will be enough to make this false, // but that should really only impact that table. - for (SSTableIndex index : indexContext.getView().getIndexes()) + var view = indexContext.getReferencedView(TimeUnit.SECONDS.toNanos(5)); + try { - if (index.areSegmentsLoaded()) + for (SSTableIndex index : indexContext.getView().getIndexes()) { - for (Segment segment : index.getSegments()) + if (index.areSegmentsLoaded()) { - if (segment.getIndexSearcher() instanceof V2VectorIndexSearcher) - return true; // V2 doesn't know, so we err on the side of being optimistic. See comments in CompactionGraph - var searcher = (V5VectorIndexSearcher) segment.getIndexSearcher(); - var structure = searcher.getPostingsStructure(); - if (structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) - return false; + for (Segment segment : index.getSegments()) + { + if (segment.getIndexSearcher() instanceof V2VectorIndexSearcher) + return true; // V2 doesn't know, so we err on the side of being optimistic. See comments in CompactionGraph + var searcher = (V5VectorIndexSearcher) segment.getIndexSearcher(); + var structure = searcher.getPostingsStructure(); + if (structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) + return false; + } } - } - else - { - for (SegmentMetadata sm : index.getSegmentMetadatas()) + else { - // May result in downloading file, but this metadata is valuable - try (var odm = index.getVersion().onDiskFormat().newOnDiskOrdinalsMap(index.indexFiles(), sm)) + for (SegmentMetadata sm : index.getSegmentMetadatas()) { - if (odm.getStructure() == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) - return false; + // May result in downloading file, but this metadata is valuable + try (var odm = index.getVersion().onDiskFormat().newOnDiskOrdinalsMap(index.indexFiles(), sm)) + { + if (odm.getStructure() == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) + return false; + } } } } + return true; + } + finally + { + view.release(); } - return true; } private ProductQuantizationFetcher.PqInfo maybeReadPqFromLastSegment() throws IOException diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java index af865dac08c7..d7367d1f20f5 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java @@ -221,6 +221,7 @@ public void testSaiIndexReadsDisabled() // We only test like this to confirm that it worked, not because this is a strictly required behavior. // Further, it's unlikely we'll have so few rows. assertEquals(VectorCompression.CompressionType.PRODUCT_QUANTIZATION, searcher.getCompression().type); + assertEquals(V5VectorPostingsWriter.Structure.ONE_TO_ONE, searcher.getPostingsStructure()); // Confirm we can query the data var result = execute( "SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 5", randomVectorBoxed(2)); From cf9a2bd1afc0d0c1aa429c7205393fd93bb11185 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 7 May 2025 16:24:28 -0500 Subject: [PATCH 09/14] Add more tests * cover building the vector index after creation * cover different methods within the V1MetadataOnlySearchableIndex --- .../disk/V1MetadataOnlySearchableIndex.java | 35 ++++++++++-- .../index/sai/cql/VectorCompactionTest.java | 57 +++++++++++++++++++ 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java index 2868b2d33f85..3be5be2caa38 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.virtual.SimpleDataSet; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.SSTableContext; @@ -41,11 +42,21 @@ import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.Throwables; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.CELL_COUNT; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.COLUMN_NAME; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.COMPONENT_METADATA; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.END_TOKEN; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.MAX_SSTABLE_ROW_ID; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.MAX_TERM; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.MIN_SSTABLE_ROW_ID; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.MIN_TERM; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.START_TOKEN; +import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.TABLE_NAME; + /** * An index that eagerly loads segment metadata and nothing else. It is currently only used for vector indexes to * read PQ files during compaction. @@ -59,13 +70,14 @@ public class V1MetadataOnlySearchableIndex implements SearchableIndex private final ByteBuffer maxTerm; private final long minSSTableRowId, maxSSTableRowId; private final long numRows; + private final IndexContext indexContext; private PerIndexFiles indexFiles; public V1MetadataOnlySearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead perIndexComponents) { - var indexContext = perIndexComponents.context(); try { + this.indexContext = perIndexComponents.context(); this.indexFiles = new PerIndexFiles(perIndexComponents); final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents); @@ -96,7 +108,7 @@ public V1MetadataOnlySearchableIndex(SSTableContext sstableContext, IndexCompone @Override public long indexFileCacheSize() { - // TODO what is the right value here? + // In V1IndexSearcher we accumulate the index file cache size from the segments, so this is 0. return 0; } @@ -184,7 +196,22 @@ public PerIndexFiles indexFiles() @Override public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable) { - // TODO what is valid here? + Token.TokenFactory tokenFactory = sstable.metadata().partitioner.getTokenFactory(); + + for (SegmentMetadata metadata : metadatas) + { + dataSet.row(sstable.metadata().keyspace, indexContext.getIndexName(), sstable.getFilename(), metadata.segmentRowIdOffset) + .column(TABLE_NAME, sstable.descriptor.cfname) + .column(COLUMN_NAME, indexContext.getColumnName()) + .column(CELL_COUNT, metadata.numRows) + .column(MIN_SSTABLE_ROW_ID, metadata.minSSTableRowId) + .column(MAX_SSTABLE_ROW_ID, metadata.maxSSTableRowId) + .column(START_TOKEN, tokenFactory.toString(metadata.minKey.partitionKey().getToken())) + .column(END_TOKEN, tokenFactory.toString(metadata.maxKey.partitionKey().getToken())) + .column(MIN_TERM, "N/A") + .column(MAX_TERM, "N/A") + .column(COMPONENT_METADATA, metadata.componentMetadatas.asMap()); + } } @Override diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java index d7367d1f20f5..f48fcb562eb6 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java @@ -31,9 +31,14 @@ import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.disk.vector.VectorCompression; +import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph.MIN_PQ_ROWS; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class VectorCompactionTest extends VectorTester.Versioned @@ -66,6 +71,28 @@ public void testCompactionWithEnoughRowsForPQAndDeleteARow() assertRowCount(execute("SELECT * FROM %s ORDER BY v ANN OF [1,2] LIMIT 1"), 1); } + @Test + public void testDelayedIndexCreation() + { + createTable("CREATE TABLE %s (pk int, v vector, PRIMARY KEY(pk))"); + disableCompaction(); + + for (int i = 0; i <= MIN_PQ_ROWS; i++) + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i, vector(i, i + 1)); + flush(); + + // Disable reads, then create index asynchronously since it won't become queryable until reads are enabled + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(true); + createIndexAsync("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex'"); + CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(false); + reloadSSTableIndex(); + // Because it was async, we wait here to ensure it worked. + waitForTableIndexesQueryable(); + + // Confirm we can query the data + assertRowCount(execute("SELECT * FROM %s ORDER BY v ANN OF [1,2] LIMIT 1"), 1); + } + @Test public void testPQRefine() { @@ -205,6 +232,36 @@ public void testSaiIndexReadsDisabled() CassandraRelevantProperties.SAI_INDEX_READS_DISABLED.setBoolean(true); reloadSSTableIndex(); + // Confirm that V1MetadataOnlySearchableIndex has valid metadata. + var metadataOnlyIndex = (StorageAttachedIndex) getIndexManager(keyspace(), indexName).listIndexes().iterator().next(); + var metadataOnlySSTableIndexes = metadataOnlyIndex.getIndexContext().getView().getIndexes(); + assertEquals(3, metadataOnlySSTableIndexes.size()); + metadataOnlySSTableIndexes.forEach(i -> { + // Skip the empty index + if (i.isEmpty()) + return; + + assertFalse(i.areSegmentsLoaded()); + assertEquals(0, i.indexFileCacheSize()); + assertThrows(UnsupportedOperationException.class, i::getSegments); + // This is vector specific, so if we broaden the functionality of V1MetadataOnlySearchableIndex, we might + // need to update this. + assertArrayEquals(new byte[] { 0, 0, 0, 0 }, ByteBufferUtil.getArray(i.minTerm())); + assertArrayEquals(new byte[] { 0, 0, 0, 0 }, ByteBufferUtil.getArray(i.maxTerm())); + // Confirm count matches sstable row id range (the range is inclusive) + var sstableDiff = i.maxSSTableRowId() - i.minSSTableRowId() + 1; + assertEquals(i.getRowCount(), sstableDiff); + + assertEquals(1, i.getSegmentMetadatas().size()); + for (var segmentMetadata : i.getSegmentMetadatas()) + { + try (var odm = i.getVersion().onDiskFormat().newOnDiskOrdinalsMap(i.indexFiles(), segmentMetadata)) + { + assertEquals(V5VectorPostingsWriter.Structure.ONE_TO_ONE, odm.getStructure()); + } + } + }); + // Now compact (this exercises the code, but doesn't actually prove that we used the past segment's PQ) compact(); From 2c820b75553e8390264afea94b02e834175df963 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Thu, 8 May 2025 14:52:02 -0500 Subject: [PATCH 10/14] Refactor API for getting postings structures --- .../cassandra/index/sai/SSTableIndex.java | 7 ++ .../cassandra/index/sai/disk/EmptyIndex.java | 8 ++ .../index/sai/disk/SearchableIndex.java | 4 + .../disk/V1MetadataOnlySearchableIndex.java | 16 ++++ .../index/sai/disk/v1/SSTableIndexWriter.java | 39 ++------ .../index/sai/disk/v1/V1SearchableIndex.java | 13 +++ .../index/sai/cql/VectorCompactionTest.java | 96 +++++++++++++++++-- 7 files changed, 145 insertions(+), 38 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index cd837eb3ee77..b0638198fc67 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; @@ -47,6 +48,7 @@ import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v1.V1SearchableIndex; +import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.iterators.KeyRangeAntiJoinIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; @@ -289,6 +291,11 @@ public Version getVersion() return perIndexComponents.version(); } + public Stream getPostingsStructures() + { + return searchableIndex.getPostingsStructures(); + } + public IndexFeatureSet indexFeatureSet() { return getVersion().onDiskFormat().indexFeatureSet(); diff --git a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java index 97216580bc8c..3e4347b8dfa1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.stream.Stream; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; @@ -29,6 +30,7 @@ import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -120,6 +122,12 @@ public List getSegmentMetadatas() return List.of(); } + @Override + public Stream getPostingsStructures() + { + return Stream.empty(); + } + @Override public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java index 214eff2cc29a..80040db8d328 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.stream.Stream; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; @@ -30,6 +31,7 @@ import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -87,6 +89,8 @@ public List> orderResultsBy(QueryContex List getSegmentMetadatas(); + Stream getPostingsStructures(); + public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable); long estimateMatchingRowsCount(Expression predicate, AbstractBounds keyRange); diff --git a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java index 3be5be2caa38..32182537ca06 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.stream.Stream; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; @@ -35,6 +36,7 @@ import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; +import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -188,6 +190,20 @@ public List getSegmentMetadatas() return metadatas; } + @Override + public Stream getPostingsStructures() + { + // May result in downloading file, but this metadata is valuable. We use a stream to avoid loading all the + // structures at once. + return metadatas.stream() + .map(m -> { + try (var odm = m.version.onDiskFormat().newOnDiskOrdinalsMap(indexFiles, m)) + { + return odm.getStructure(); + } + }); + } + public PerIndexFiles indexFiles() { return indexFiles; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index 3beb06464fca..40f1c0b9558d 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -406,38 +406,19 @@ private static boolean allRowsHaveVectorsInWrittenSegments(IndexContext indexCon var view = indexContext.getReferencedView(TimeUnit.SECONDS.toNanos(5)); try { - for (SSTableIndex index : indexContext.getView().getIndexes()) - { - if (index.areSegmentsLoaded()) - { - for (Segment segment : index.getSegments()) - { - if (segment.getIndexSearcher() instanceof V2VectorIndexSearcher) - return true; // V2 doesn't know, so we err on the side of being optimistic. See comments in CompactionGraph - var searcher = (V5VectorIndexSearcher) segment.getIndexSearcher(); - var structure = searcher.getPostingsStructure(); - if (structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) - return false; - } - } - else - { - for (SegmentMetadata sm : index.getSegmentMetadatas()) - { - // May result in downloading file, but this metadata is valuable - try (var odm = index.getVersion().onDiskFormat().newOnDiskOrdinalsMap(index.indexFiles(), sm)) - { - if (odm.getStructure() == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY) - return false; - } - } - } - } - return true; + // If we couldn't get the view, assume best case scenario. + if (view == null) + return true; + + return view.getIndexes() + .stream() + .flatMap(SSTableIndex::getPostingsStructures) + .noneMatch(structure -> structure == V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY); } finally { - view.release(); + if (view != null) + view.release(); } } diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java index d00950c9e6a6..3c8f3f78188d 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Stream; import com.google.common.collect.ImmutableList; @@ -36,6 +37,8 @@ import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.disk.SearchableIndex; import org.apache.cassandra.index.sai.disk.format.IndexComponents; +import org.apache.cassandra.index.sai.disk.v5.V5VectorIndexSearcher; +import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.iterators.KeyRangeConcatIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; @@ -258,6 +261,16 @@ public List getSegmentMetadatas() return metadatas; } + @Override + public Stream getPostingsStructures() + { + return segments.stream() + // V2 doesn't know, so we skip it and err on the side of being optimistic. See comments in CompactionGraph + .filter(s -> s.getIndexSearcher() instanceof V5VectorIndexSearcher) + .map(s -> (V5VectorIndexSearcher) s.getIndexSearcher()) + .map(V5VectorIndexSearcher::getPostingsStructure); + } + @Override public void populateSystemView(SimpleDataSet dataset, SSTableReader sstable) { diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java index f48fcb562eb6..6d106c63ec89 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.index.sai.cql; import java.util.ArrayList; +import java.util.HashSet; import java.util.stream.Collectors; import org.junit.Before; @@ -28,6 +29,7 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.marshal.FloatType; import org.apache.cassandra.index.sai.StorageAttachedIndex; +import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; import org.apache.cassandra.index.sai.disk.vector.VectorCompression; @@ -37,7 +39,6 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -252,14 +253,9 @@ public void testSaiIndexReadsDisabled() var sstableDiff = i.maxSSTableRowId() - i.minSSTableRowId() + 1; assertEquals(i.getRowCount(), sstableDiff); - assertEquals(1, i.getSegmentMetadatas().size()); - for (var segmentMetadata : i.getSegmentMetadatas()) - { - try (var odm = i.getVersion().onDiskFormat().newOnDiskOrdinalsMap(i.indexFiles(), segmentMetadata)) - { - assertEquals(V5VectorPostingsWriter.Structure.ONE_TO_ONE, odm.getStructure()); - } - } + var postingsStructures = i.getPostingsStructures().collect(Collectors.toList()); + assertEquals(1, postingsStructures.size()); + assertEquals(V5VectorPostingsWriter.Structure.ONE_TO_ONE, postingsStructures.get(0)); }); // Now compact (this exercises the code, but doesn't actually prove that we used the past segment's PQ) @@ -285,6 +281,88 @@ public void testSaiIndexReadsDisabled() assertEquals(5, result.size()); } + @SuppressWarnings("resource") + @Test + public void testSaiVectorIndexPostingStructure() + { + createTable("CREATE TABLE %s (pk int, v vector, PRIMARY KEY(pk))"); + var indexName = createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex'"); + + // We'll manage compaction manually for better control + disableCompaction(); + + // first sstable has one-to-one + for (int i = 0; i < MIN_PQ_ROWS; i++) + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i, randomVectorBoxed(2)); + + flush(); + + // second sstable has one-to-many + for (int i = MIN_PQ_ROWS; i < MIN_PQ_ROWS * 3; i += 2) + { + var dupedVector = randomVectorBoxed(2); + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i, dupedVector); + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i + 1, dupedVector); + } + + flush(); + + // third sstable has zero or one-to-many + for (int i = MIN_PQ_ROWS * 3; i < MIN_PQ_ROWS * 4; i += 2) + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", i, randomVectorBoxed(2)); + // this row doesn't get a vector, to force it to the zero or one to many case + execute("INSERT INTO %s (pk) VALUES (?)", MIN_PQ_ROWS * 4); + flush(); + + // Confirm that V1MetadataOnlySearchableIndex has valid metadata. + var metadataOnlyIndex = (StorageAttachedIndex) getIndexManager(keyspace(), indexName).listIndexes().iterator().next(); + var metadataOnlySSTableIndexes = metadataOnlyIndex.getIndexContext().getView().getIndexes(); + assertEquals(3, metadataOnlySSTableIndexes.size()); + + var structures = new HashSet(); + metadataOnlySSTableIndexes.forEach(i -> { + assertTrue(i.areSegmentsLoaded()); + structures.addAll(i.getPostingsStructures().collect(Collectors.toList())); + }); + + if (version.onOrAfter(Version.DC)) + { + assertEquals(3, structures.size()); + assertTrue(structures.contains(V5VectorPostingsWriter.Structure.ONE_TO_ONE)); + assertTrue(structures.contains(V5VectorPostingsWriter.Structure.ONE_TO_MANY)); + assertTrue(structures.contains(V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY)); + } + else + { + assertTrue(structures.isEmpty()); + } + + // Run compaction without disabling reads to test that path too + compact(); + + // Confirm what is happening now. + var compactedSSTableIndexes = metadataOnlyIndex.getIndexContext().getView().getIndexes(); + assertEquals(1, compactedSSTableIndexes.size()); + compactedSSTableIndexes.forEach(i -> { + assertTrue(i.areSegmentsLoaded()); + assertEquals(1, i.getSegmentMetadatas().size()); + var postingsStructures = i.getPostingsStructures().collect(Collectors.toList()); + if (version.onOrAfter(Version.DC)) + { + assertEquals(1, postingsStructures.size()); + assertEquals(V5VectorPostingsWriter.Structure.ZERO_OR_ONE_TO_MANY, postingsStructures.get(0)); + } + else + { + assertEquals(0, postingsStructures.size()); + } + }); + + // Confirm we can query the data + var result = execute( "SELECT * FROM %s ORDER BY v ANN OF ? LIMIT 5", randomVectorBoxed(2)); + assertEquals(5, result.size()); + } + public void testZeroOrOneToManyCompactionInternal(int vectorsPerSstable, int sstables) { createTable(); From 3748693397d3e1fca3cbf69e64f6e9fe682df46b Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 28 May 2025 15:53:21 -0500 Subject: [PATCH 11/14] Initial refactors after review --- .../cassandra/config/CassandraRelevantProperties.java | 9 +++++---- .../org/apache/cassandra/index/sai/SSTableIndex.java | 2 +- .../index/sai/disk/v1/SSTableIndexWriter.java | 10 ---------- .../cassandra/index/sai/disk/v1/V1OnDiskFormat.java | 1 - .../index/sai/disk/vector/CassandraOnHeapGraph.java | 6 ------ .../sai/disk/vector/ProductQuantizationFetcher.java | 2 +- 6 files changed, 7 insertions(+), 23 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a9d121a94d2e..3f9ba3fc3295 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -540,11 +540,12 @@ public enum CassandraRelevantProperties SAI_INDEX_READS_DISABLED("cassandra.sai.disabled_reads", "false"), /** - * Only takes effect when SAI_INDEX_READS_DISABLED is true. If true, creates a lazy index searcher that only loads - * segment metadata and has the ability to load extra files, like the PQ file for vector indexes. Currently only - * affects vector indexes. Other indexes fall back to the empty index searcher when SAI_INDEX_READS_DISABLED is true. + * Only takes effect when SAI_INDEX_READS_DISABLED is true. If true, creates an index searcher that loads + * segment metadata and has the ability to load extra files, like the PQ file for vector indexes, but does not do + * so until it is actually needed. Currently only affects vector indexes. Other indexes fall back to the empty index + * searcher when SAI_INDEX_READS_DISABLED is true. */ - SAI_INDEX_LOAD_SEGMENT_METADATA_ONLY("cassandra.sai.load_segment_metadata_only", "true"), + SAI_INDEX_LOAD_SEGMENT_METADATA("cassandra.sai.load_segment_metadata", "true"), /** * Allows custom implementation of {@link SensorsFactory} to optionally create diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index b0638198fc67..263bdcf57594 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -102,7 +102,7 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte if (!perIndexComponents.isEmpty() && context != null && context.isVector() - && CassandraRelevantProperties.SAI_INDEX_LOAD_SEGMENT_METADATA_ONLY.getBoolean()) + && CassandraRelevantProperties.SAI_INDEX_LOAD_SEGMENT_METADATA.getBoolean()) { logger.info("Creating a lazy index searcher for sstable {} as SAI index reads are disabled, but this is a vector index", sstableContext.sstable.descriptor.id); return new V1MetadataOnlySearchableIndex(sstableContext, perIndexComponents); diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index f0be4d074308..ef6c302fbc33 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; -import java.util.function.Function; -import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; import com.google.common.base.Preconditions; @@ -34,7 +32,6 @@ import org.slf4j.LoggerFactory; import io.github.jbellis.jvector.quantization.BinaryQuantization; -import io.github.jbellis.jvector.quantization.ProductQuantization; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.VectorType; @@ -42,23 +39,16 @@ import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.SSTableIndex; import org.apache.cassandra.index.sai.disk.PerIndexWriter; -import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.IndexComponents; -import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat; -import org.apache.cassandra.index.sai.disk.v5.V5VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; -import org.apache.cassandra.index.sai.disk.vector.CassandraDiskAnn; -import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph; import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; -import org.apache.cassandra.index.sai.disk.vector.VectorCompression; import org.apache.cassandra.index.sai.disk.vector.VectorCompression.CompressionType; import org.apache.cassandra.index.sai.utils.NamedMemoryLimiter; import org.apache.cassandra.index.sai.utils.PrimaryKey; import org.apache.cassandra.index.sai.utils.TypeUtil; import org.apache.cassandra.io.storage.StorageProvider; -import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java index 74aa19c80277..6b0f34e9c231 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java @@ -49,7 +49,6 @@ import org.apache.cassandra.index.sai.disk.format.IndexFeatureSet; import org.apache.cassandra.index.sai.disk.format.OnDiskFormat; import org.apache.cassandra.index.sai.disk.format.Version; -import org.apache.cassandra.index.sai.disk.v5.V5OnDiskOrdinalsMap; import org.apache.cassandra.index.sai.disk.vector.OnDiskOrdinalsMap; import org.apache.cassandra.index.sai.memory.RowMapping; import org.apache.cassandra.index.sai.metrics.AbstractMetrics; diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java index 8b1605c47675..99ef0f72a534 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java @@ -21,18 +21,15 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; import java.util.function.ToIntFunction; import java.util.stream.IntStream; -import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -69,13 +66,10 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.sai.IndexContext; import org.apache.cassandra.index.sai.QueryContext; -import org.apache.cassandra.index.sai.SSTableIndex; import org.apache.cassandra.index.sai.disk.format.IndexComponent; import org.apache.cassandra.index.sai.disk.format.IndexComponentType; import org.apache.cassandra.index.sai.disk.format.IndexComponents; -import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; -import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v2.V2VectorPostingsWriter; import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat; import org.apache.cassandra.index.sai.disk.v5.V5OnDiskFormat; diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index ab6718be0c01..f23c57f7f305 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -197,7 +197,7 @@ public static PqInfo maybeReadPqFromSegment(SegmentMetadata sm, FileHandle fh) } catch (IOException e) { - logger.warn("Failed to read PQ from segment {}. Skipping", sm, e); + logger.warn("Failed to read PQ from segment {} for {}. Skipping", sm, fh, e); } return null; } From b5441cdae3dc85236d0ed3e8146ef802f0aea250 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 28 May 2025 16:05:07 -0500 Subject: [PATCH 12/14] Push getting PqInfo into SearchableIndex interface --- .../cassandra/index/sai/SSTableIndex.java | 8 ++++--- .../cassandra/index/sai/disk/EmptyIndex.java | 7 ++++++ .../index/sai/disk/SearchableIndex.java | 3 +++ .../disk/V1MetadataOnlySearchableIndex.java | 11 ++++++++-- .../index/sai/disk/v1/V1SearchableIndex.java | 17 ++++++++++++++ .../vector/ProductQuantizationFetcher.java | 22 +------------------ 6 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index 263bdcf57594..555d3cfd1745 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; @@ -49,6 +50,7 @@ import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v1.V1SearchableIndex; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; +import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; import org.apache.cassandra.index.sai.iterators.KeyRangeAntiJoinIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; @@ -145,15 +147,15 @@ public List getSegmentMetadatas() return searchableIndex.getSegmentMetadatas(); } + @VisibleForTesting public boolean areSegmentsLoaded() { return searchableIndex instanceof V1SearchableIndex; } - public PerIndexFiles indexFiles() + public ProductQuantizationFetcher.PqInfo getPqInfo(int segmentPosition) { - assert searchableIndex instanceof V1MetadataOnlySearchableIndex; - return ((V1MetadataOnlySearchableIndex) searchableIndex).indexFiles(); + return searchableIndex.getPqInfo(segmentPosition); } public long indexFileCacheSize() diff --git a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java index 3e4347b8dfa1..012367b697b4 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/EmptyIndex.java @@ -31,6 +31,7 @@ import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; +import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -128,6 +129,12 @@ public Stream getPostingsStructures() return Stream.empty(); } + @Override + public ProductQuantizationFetcher.PqInfo getPqInfo(int segmentPosition) + { + return null; + } + @Override public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java index 80040db8d328..eaf3914636e1 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/SearchableIndex.java @@ -32,6 +32,7 @@ import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; +import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -91,6 +92,8 @@ public List> orderResultsBy(QueryContex Stream getPostingsStructures(); + ProductQuantizationFetcher.PqInfo getPqInfo(int segmentPosition); + public void populateSystemView(SimpleDataSet dataSet, SSTableReader sstable); long estimateMatchingRowsCount(Expression predicate, AbstractBounds keyRange); diff --git a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java index 0cba2cc6033b..9983b9661515 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java @@ -37,6 +37,7 @@ import org.apache.cassandra.index.sai.disk.v1.Segment; import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; +import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; import org.apache.cassandra.index.sai.plan.Orderer; @@ -204,9 +205,15 @@ public Stream getPostingsStructures() }); } - public PerIndexFiles indexFiles() + @Override + public ProductQuantizationFetcher.PqInfo getPqInfo(int segmentPosition) { - return indexFiles; + // We have to load from disk here + try (var pq = indexFiles.pq()) + { + // Returns null if wrong uses wrong compression type. + return ProductQuantizationFetcher.maybeReadPqFromSegment(metadatas.get(segmentPosition), pq); + } } @Override diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java index 15353b53cb84..2be55679a69a 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/V1SearchableIndex.java @@ -36,8 +36,11 @@ import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.disk.SearchableIndex; import org.apache.cassandra.index.sai.disk.format.IndexComponents; +import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v5.V5VectorIndexSearcher; import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter; +import org.apache.cassandra.index.sai.disk.vector.ProductQuantizationFetcher; +import org.apache.cassandra.index.sai.disk.vector.VectorCompression; import org.apache.cassandra.index.sai.iterators.KeyRangeConcatIterator; import org.apache.cassandra.index.sai.iterators.KeyRangeIterator; import org.apache.cassandra.index.sai.plan.Expression; @@ -271,6 +274,20 @@ public Stream getPostingsStructures() .map(V5VectorIndexSearcher::getPostingsStructure); } + @Override + public ProductQuantizationFetcher.PqInfo getPqInfo(int segmentPosition) + { + var segment = segments.get(segmentPosition); + assert segment.getIndexSearcher() instanceof V2VectorIndexSearcher : "Index searcher is not a V2VectorIndexSearcher. Found: " + segment.getIndexSearcher().getClass(); + V2VectorIndexSearcher searcher = (V2VectorIndexSearcher) segment.getIndexSearcher(); + // Skip segments that don't have PQ + if (VectorCompression.CompressionType.PRODUCT_QUANTIZATION != searcher.getCompression().type) + return null; + + // Because we sorted based on size then timestamp, this is the best match + return new ProductQuantizationFetcher.PqInfo(searcher.getPQ(), searcher.containsUnitVectors(), segment.metadata.numRows); + } + @Override public void populateSystemView(SimpleDataSet dataset, SSTableReader sstable) { diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index f23c57f7f305..248f783e195c 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -112,27 +112,7 @@ private CustomSegmentSorter(SSTableIndex sstableIndex, long numRows, int segment @SuppressWarnings("resource") private PqInfo getPqInfo() { - if (sstableIndex.areSegmentsLoaded()) - { - var segment = sstableIndex.getSegments().get(segmentPosition); - V2VectorIndexSearcher searcher = (V2VectorIndexSearcher) segment.getIndexSearcher(); - // Skip segments that don't have PQ - if (VectorCompression.CompressionType.PRODUCT_QUANTIZATION != searcher.getCompression().type) - return null; - - // Because we sorted based on size then timestamp, this is the best match - return new PqInfo(searcher.getPQ(), searcher.containsUnitVectors(), segment.metadata.numRows); - } - else - { - // We have to load from disk here - var segmentMetadata = sstableIndex.getSegmentMetadatas().get(segmentPosition); - try (var pq = sstableIndex.indexFiles().pq()) - { - // Returns null if wrong uses wrong compression type. - return maybeReadPqFromSegment(segmentMetadata, pq); - } - } + return sstableIndex.getPqInfo(segmentPosition); } @Override From a872344c5b8cad18f56f4db477370244fad91d3f Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 28 May 2025 16:24:23 -0500 Subject: [PATCH 13/14] Fix comments; add version filtering to getPostingsStructures --- .../index/sai/disk/V1MetadataOnlySearchableIndex.java | 9 ++++++--- .../cassandra/index/sai/cql/VectorCompactionTest.java | 11 +++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java index 9983b9661515..22ac8d1c82bd 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/disk/V1MetadataOnlySearchableIndex.java @@ -32,6 +32,7 @@ import org.apache.cassandra.index.sai.QueryContext; import org.apache.cassandra.index.sai.SSTableContext; import org.apache.cassandra.index.sai.disk.format.IndexComponents; +import org.apache.cassandra.index.sai.disk.format.Version; import org.apache.cassandra.index.sai.disk.v1.MetadataSource; import org.apache.cassandra.index.sai.disk.v1.PerIndexFiles; import org.apache.cassandra.index.sai.disk.v1.Segment; @@ -61,8 +62,8 @@ import static org.apache.cassandra.index.sai.virtual.SegmentsSystemView.TABLE_NAME; /** - * An index that eagerly loads segment metadata and nothing else. It is currently only used for vector indexes to - * read PQ files during compaction. + * An index that eagerly loads segment metadata, can load index files on demand to provide insight into information + * about the index, but does not support searching. */ public class V1MetadataOnlySearchableIndex implements SearchableIndex { @@ -197,6 +198,8 @@ public Stream getPostingsStructures() // May result in downloading file, but this metadata is valuable. We use a stream to avoid loading all the // structures at once. return metadatas.stream() + // V2 doesn't know, so we skip it and err on the side of being optimistic. See comments in CompactionGraph + .filter(m -> m.version.onOrAfter(Version.DC)) .map(m -> { try (var odm = m.version.onDiskFormat().newOnDiskOrdinalsMap(indexFiles, m)) { @@ -211,7 +214,7 @@ public ProductQuantizationFetcher.PqInfo getPqInfo(int segmentPosition) // We have to load from disk here try (var pq = indexFiles.pq()) { - // Returns null if wrong uses wrong compression type. + // Returns null if segment does not use PQ compression. return ProductQuantizationFetcher.maybeReadPqFromSegment(metadatas.get(segmentPosition), pq); } } diff --git a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java index 6d106c63ec89..619e7a3b7647 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/VectorCompactionTest.java @@ -254,8 +254,15 @@ public void testSaiIndexReadsDisabled() assertEquals(i.getRowCount(), sstableDiff); var postingsStructures = i.getPostingsStructures().collect(Collectors.toList()); - assertEquals(1, postingsStructures.size()); - assertEquals(V5VectorPostingsWriter.Structure.ONE_TO_ONE, postingsStructures.get(0)); + if (Version.current().onOrAfter(Version.DC)) + { + assertEquals(1, postingsStructures.size()); + assertEquals(V5VectorPostingsWriter.Structure.ONE_TO_ONE, postingsStructures.get(0)); + } + else + { + assertEquals(0, postingsStructures.size()); + } }); // Now compact (this exercises the code, but doesn't actually prove that we used the past segment's PQ) From e342824975ee246df5f34bb9babce70bacbf2dda Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 28 May 2025 16:53:41 -0500 Subject: [PATCH 14/14] More cleanup --- src/java/org/apache/cassandra/index/sai/SSTableIndex.java | 2 +- .../index/sai/disk/vector/ProductQuantizationFetcher.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java index 555d3cfd1745..47e95fe6b768 100644 --- a/src/java/org/apache/cassandra/index/sai/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sai/SSTableIndex.java @@ -106,7 +106,7 @@ private static SearchableIndex createSearchableIndex(SSTableContext sstableConte && context.isVector() && CassandraRelevantProperties.SAI_INDEX_LOAD_SEGMENT_METADATA.getBoolean()) { - logger.info("Creating a lazy index searcher for sstable {} as SAI index reads are disabled, but this is a vector index", sstableContext.sstable.descriptor.id); + logger.info("Creating a V1MetadataOnlySearchableIndex for sstable {} as SAI index reads are disabled, but this is a vector index", sstableContext.sstable.descriptor.id); return new V1MetadataOnlySearchableIndex(sstableContext, perIndexComponents); } else diff --git a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java index 248f783e195c..aeb031aae776 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java +++ b/src/java/org/apache/cassandra/index/sai/disk/vector/ProductQuantizationFetcher.java @@ -78,7 +78,6 @@ public static PqInfo getPqIfPresent(IndexContext indexContext) public static class PqInfo { public final ProductQuantization pq; - /** an empty Optional indicates that the index was written with an older version that did not record this information */ public final boolean unitVectors; public final long rowCount;