Skip to content

Commit ef692a2

Browse files
michaeljmarshalldriftx
authored andcommitted
CNDB-14301: couple jvector file format and SAI version (#1786)
### What is the issue Fixes: riptano/cndb#14301 ### What does this PR fix and why was it fixed Make version `EC` and later use jvector file format 4. Also allow for some configurability in earlier versions just in case we have some clusters relying on that behavior via the `cassandra.sai.jvector_version` system property (defaults to 2 and is ignored for `EC` and later).
1 parent 1f3e8d3 commit ef692a2

File tree

10 files changed

+148
-70
lines changed

10 files changed

+148
-70
lines changed

src/java/org/apache/cassandra/index/sai/disk/format/OnDiskFormat.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ default public Set<IndexComponentType> perIndexComponentTypes(IndexContext index
214214
*/
215215
ByteBuffer decodeFromTrie(ByteComparable value, AbstractType<?> type);
216216

217-
217+
/**
218+
* @return the JVector file format version that this on-disk format uses.
219+
*/
220+
int jvectorFileFormatVersion();
218221

219222
}

src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,4 +327,10 @@ private boolean isVectorDataComponent(IndexContext context, IndexComponentType i
327327
indexComponentType == IndexComponentType.TERMS_DATA ||
328328
indexComponentType == IndexComponentType.POSTING_LISTS;
329329
}
330+
331+
@Override
332+
public int jvectorFileFormatVersion()
333+
{
334+
throw new UnsupportedOperationException("JVector is not supported in V2OnDiskFormat");
335+
}
330336
}

src/java/org/apache/cassandra/index/sai/disk/v3/V3OnDiskFormat.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ public class V3OnDiskFormat extends V2OnDiskFormat
5353
// JVector doesn't give us a way to access its default, so we set it here, but allow it to be overridden.
5454
public static boolean JVECTOR_USE_PRUNING_DEFAULT = SAI_VECTOR_USE_PRUNING_DEFAULT.getBoolean();
5555

56-
// These are built to be backwards and forwards compatible. Not final only for testing.
57-
public static int JVECTOR_VERSION = SAI_JVECTOR_VERSION.getInt();
56+
// We allow the version to be configured via a system property because of some legacy use cases, but it is
57+
// generally not recommended to change this directly. Instead, use the cassandra.sai.latest.version system property
58+
// to control the on-disk format version.
59+
private final static int JVECTOR_VERSION = SAI_JVECTOR_VERSION.getInt();
5860
static
5961
{
6062
// JVector 3 is not compatible with the latest jvector changes, so we fail fast if the config is enabled.
@@ -120,4 +122,10 @@ public Set<IndexComponentType> perIndexComponentTypes(AbstractType<?> validator)
120122
return VECTOR_COMPONENTS_V3;
121123
return super.perIndexComponentTypes(validator);
122124
}
125+
126+
@Override
127+
public int jvectorFileFormatVersion()
128+
{
129+
return JVECTOR_VERSION;
130+
}
123131
}

src/java/org/apache/cassandra/index/sai/disk/v7/V7OnDiskFormat.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,13 @@ public Set<IndexComponentType> perIndexComponentTypes(AbstractType<?> validator)
4343
return LITERAL_COMPONENTS;
4444
return super.perIndexComponentTypes(validator);
4545
}
46+
47+
@Override
48+
public int jvectorFileFormatVersion()
49+
{
50+
// Before version EC, we write JVector format 2. Version EB introduced the ability for jvector to read format 4,
51+
// so we can safely start writing it for versions EC (V7) and later while maintaining proper backward
52+
// compatibility.
53+
return 4;
54+
}
4655
}

src/java/org/apache/cassandra/index/sai/disk/vector/CassandraOnHeapGraph.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@
7575
import org.apache.cassandra.index.sai.disk.format.IndexComponent;
7676
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
7777
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
78+
import org.apache.cassandra.index.sai.disk.format.Version;
7879
import org.apache.cassandra.index.sai.disk.v1.Segment;
7980
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
8081
import org.apache.cassandra.index.sai.disk.v2.V2VectorIndexSearcher;
8182
import org.apache.cassandra.index.sai.disk.v2.V2VectorPostingsWriter;
82-
import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat;
8383
import org.apache.cassandra.index.sai.disk.v5.V5OnDiskFormat;
8484
import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter;
8585
import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter.Structure;
@@ -95,7 +95,6 @@
9595
import org.apache.lucene.util.StringHelper;
9696

9797
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
98-
import static org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat.JVECTOR_VERSION;
9998

10099
public class CassandraOnHeapGraph<T> implements Accountable
101100
{
@@ -158,18 +157,19 @@ public CassandraOnHeapGraph(IndexContext context, boolean forSearching, Memtable
158157
vectorsByKey = forSearching ? new NonBlockingHashMap<>() : null;
159158
invalidVectorBehavior = forSearching ? InvalidVectorBehavior.FAIL : InvalidVectorBehavior.IGNORE;
160159

160+
int jvectorVersion = Version.current().onDiskFormat().jvectorFileFormatVersion();
161161
// This is only a warning since it's not a fatal error to write without hierarchy
162-
if (indexConfig.isHierarchyEnabled() && V3OnDiskFormat.JVECTOR_VERSION < 4)
162+
if (indexConfig.isHierarchyEnabled() && jvectorVersion < 4)
163163
logger.warn("Hierarchical graphs configured but node configured with V3OnDiskFormat.JVECTOR_VERSION {}. " +
164-
"Skipping setting for {}", V3OnDiskFormat.JVECTOR_VERSION, indexConfig.getIndexName());
164+
"Skipping setting for {}", jvectorVersion, indexConfig.getIndexName());
165165

166166
builder = new GraphIndexBuilder(vectorValues,
167167
similarityFunction,
168168
indexConfig.getAnnMaxDegree(),
169169
indexConfig.getConstructionBeamWidth(),
170170
indexConfig.getNeighborhoodOverflow(1.0f), // no overflow means add will be a bit slower but flush will be faster
171171
indexConfig.getAlpha(dimension > 3 ? 1.2f : 2.0f),
172-
indexConfig.isHierarchyEnabled() && V3OnDiskFormat.JVECTOR_VERSION >= 4);
172+
indexConfig.isHierarchyEnabled() && jvectorVersion >= 4);
173173
searchers = ThreadLocal.withInitial(() -> new GraphSearcherAccessManager(new GraphSearcher(builder.getGraph())));
174174
}
175175

@@ -440,7 +440,7 @@ public SegmentMetadata.ComponentMetadataMap flush(IndexComponents.ForWrite perIn
440440
try (var pqOutput = perIndexComponents.addOrGet(IndexComponentType.PQ).openOutput(true);
441441
var postingsOutput = perIndexComponents.addOrGet(IndexComponentType.POSTING_LISTS).openOutput(true);
442442
var indexWriter = new OnDiskGraphIndexWriter.Builder(builder.getGraph(), indexFile.toPath())
443-
.withVersion(JVECTOR_VERSION)
443+
.withVersion(Version.current().onDiskFormat().jvectorFileFormatVersion())
444444
.withMapper(ordinalMapper)
445445
.with(new InlineVectors(vectorValues.dimension()))
446446
.withStartOffset(termsOffset)
@@ -593,14 +593,14 @@ private long writePQ(SequentialWriter writer, V5VectorPostingsWriter.RemappedPos
593593
return writer.position();
594594

595595
// save (outside the synchronized block, this is io-bound not CPU)
596-
cv.write(writer, JVECTOR_VERSION);
596+
cv.write(writer, Version.current().onDiskFormat().jvectorFileFormatVersion());
597597
return writer.position();
598598
}
599599

600600
static void writePqHeader(DataOutput writer, boolean unitVectors, CompressionType type)
601601
throws IOException
602602
{
603-
if (V3OnDiskFormat.JVECTOR_VERSION >= 3)
603+
if (Version.current().onDiskFormat().jvectorFileFormatVersion() >= 3)
604604
{
605605
// version and optional fields
606606
writer.writeInt(CassandraDiskAnn.PQ_MAGIC);

src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@
7272
import org.apache.cassandra.index.sai.IndexContext;
7373
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
7474
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
75+
import org.apache.cassandra.index.sai.disk.format.Version;
7576
import org.apache.cassandra.index.sai.disk.v1.SegmentMetadata;
7677
import org.apache.cassandra.index.sai.disk.v2.V2VectorPostingsWriter;
77-
import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat;
7878
import org.apache.cassandra.index.sai.disk.v5.V5OnDiskFormat;
7979
import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter;
8080
import org.apache.cassandra.index.sai.disk.v5.V5VectorPostingsWriter.Structure;
@@ -91,7 +91,6 @@
9191
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
9292
import static java.lang.Math.max;
9393
import static java.lang.Math.min;
94-
import static org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat.JVECTOR_VERSION;
9594

9695
public class CompactionGraph implements Closeable, Accountable
9796
{
@@ -200,17 +199,18 @@ else if (compressor instanceof BinaryQuantization)
200199
{
201200
throw new IllegalArgumentException("Unsupported compressor: " + compressor);
202201
}
203-
if (indexConfig.isHierarchyEnabled() && V3OnDiskFormat.JVECTOR_VERSION < 4)
202+
int jvectorVersion = Version.current().onDiskFormat().jvectorFileFormatVersion();
203+
if (indexConfig.isHierarchyEnabled() && jvectorVersion < 4)
204204
logger.warn("Hierarchical graphs configured but node configured with V3OnDiskFormat.JVECTOR_VERSION {}. " +
205-
"Skipping setting for {}", V3OnDiskFormat.JVECTOR_VERSION, indexConfig.getIndexName());
205+
"Skipping setting for {}", jvectorVersion, indexConfig.getIndexName());
206206

207207
builder = new GraphIndexBuilder(bsp,
208208
dimension,
209209
indexConfig.getAnnMaxDegree(),
210210
indexConfig.getConstructionBeamWidth(),
211211
indexConfig.getNeighborhoodOverflow(1.2f),
212212
indexConfig.getAlpha(dimension > 3 ? 1.2f : 1.4f),
213-
indexConfig.isHierarchyEnabled() && V3OnDiskFormat.JVECTOR_VERSION >= 4,
213+
indexConfig.isHierarchyEnabled() && jvectorVersion >= 4,
214214
compactionSimdPool, compactionFjp);
215215

216216
termsFile = perIndexComponents.addOrGet(IndexComponentType.TERMS_DATA).file();
@@ -227,7 +227,7 @@ private OnDiskGraphIndexWriter.Builder createTermsWriterBuilder() throws IOExcep
227227
return new OnDiskGraphIndexWriter.Builder(builder.getGraph(), termsFile.toPath())
228228
.withStartOffset(termsOffset)
229229
.with(new InlineVectors(dimension))
230-
.withVersion(JVECTOR_VERSION);
230+
.withVersion(Version.current().onDiskFormat().jvectorFileFormatVersion());
231231
}
232232

233233
@Override
@@ -396,7 +396,7 @@ public SegmentMetadata.ComponentMetadataMap flush() throws IOException
396396
// write PQ (time to do this is negligible, don't bother doing it async)
397397
long pqOffset = pqOutput.getFilePointer();
398398
CassandraOnHeapGraph.writePqHeader(pqOutput.asSequentialWriter(), unitVectors, VectorCompression.CompressionType.PRODUCT_QUANTIZATION);
399-
compressedVectors.write(pqOutput.asSequentialWriter(), JVECTOR_VERSION); // VSTODO old version until we add APQ
399+
compressedVectors.write(pqOutput.asSequentialWriter(), Version.current().onDiskFormat().jvectorFileFormatVersion());
400400
long pqLength = pqOutput.getFilePointer() - pqOffset;
401401

402402
// write postings asynchronously while we run cleanup()

test/unit/org/apache/cassandra/index/sai/cql/VectorDotProductWithLengthTest.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,40 @@
1919
package org.apache.cassandra.index.sai.cql;
2020

2121
import java.util.ArrayList;
22+
import java.util.Collection;
2223
import java.util.stream.Collectors;
2324

25+
import org.junit.Before;
2426
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
import org.junit.runners.Parameterized;
2529

2630
import io.github.jbellis.jvector.vector.VectorSimilarityFunction;
2731
import org.apache.cassandra.db.marshal.FloatType;
28-
import org.apache.cassandra.index.sai.disk.v3.V3OnDiskFormat;
32+
import org.apache.cassandra.index.sai.SAIUtil;
33+
import org.apache.cassandra.index.sai.disk.format.Version;
2934

35+
@RunWith(Parameterized.class)
3036
public class VectorDotProductWithLengthTest extends VectorTester
3137
{
32-
@Override
33-
public void setup() throws Throwable
38+
@Parameterized.Parameter
39+
public Version version;
40+
41+
@Parameterized.Parameters(name = "{0}")
42+
public static Collection<Object[]> data()
43+
{
44+
// we are testing unit vector detection which was introduced in jvector format 4
45+
return Version.ALL.stream()
46+
.filter(v -> v.onOrAfter(Version.JVECTOR_EARLIEST))
47+
.filter(v -> v.onDiskFormat().jvectorFileFormatVersion() >= 4)
48+
.map(v -> new Object[]{ v})
49+
.collect(Collectors.toList());
50+
}
51+
52+
@Before
53+
public void setVersion()
3454
{
35-
super.setup();
36-
// we are testing unit vector detection which is part of the v3 changes, but continues in all subsequent versions
37-
if (V3OnDiskFormat.JVECTOR_VERSION < 4)
38-
V3OnDiskFormat.JVECTOR_VERSION = 4;
55+
SAIUtil.setCurrentVersion(version);
3956
}
4057

4158
// This tests our detection of unit-length vectors used with dot product and PQ.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.index.sai.cql;
20+
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.stream.Collectors;
24+
25+
import org.junit.Test;
26+
27+
import org.apache.cassandra.index.sai.SAIUtil;
28+
import org.apache.cassandra.index.sai.disk.format.Version;
29+
import org.apache.cassandra.index.sai.disk.vector.CassandraOnHeapGraph;
30+
31+
// We do not parameterize this test because it is not intended to run multiple versions at once.
32+
public class VectorIndexMixedVersionTest extends VectorTester
33+
{
34+
// Versions in random order
35+
final static List<Version> VERSIONS = getVersions();
36+
37+
private static List<Version> getVersions()
38+
{
39+
var versions = Version.ALL.stream()
40+
.filter(v -> v.onOrAfter(Version.JVECTOR_EARLIEST))
41+
.collect(Collectors.toList());
42+
Collections.shuffle(versions, getRandom().getRandom());
43+
logger.info("Running mixed version test with versions: {}", versions);
44+
return versions;
45+
}
46+
47+
@Test
48+
public void testMultiVersionJVectorCompatibility() throws Throwable
49+
{
50+
createTable("CREATE TABLE %s (pk int, vec vector<float, 4>, PRIMARY KEY(pk))");
51+
createIndex("CREATE CUSTOM INDEX ON %s(vec) USING 'StorageAttachedIndex'");
52+
53+
// Note that we do not test the multi-version path where compaction produces different sstables, which is
54+
// the norm in CNDB. If we had a way to compact individual sstables, we could.
55+
disableCompaction();
56+
57+
for (var version : VERSIONS)
58+
{
59+
SAIUtil.setCurrentVersion(version);
60+
// Insert 2x the minimum number of rows to ensure we have enough for PQ training, even if there are
61+
// duplicate vectors.
62+
for (int i = 0; i < CassandraOnHeapGraph.MIN_PQ_ROWS * 2; i++)
63+
execute("INSERT INTO %s (pk, vec) VALUES (?, ?)", i, randomVectorBoxed(4));
64+
flush();
65+
}
66+
67+
// Run basic query to confirm we can, no need to validate results
68+
execute("SELECT pk FROM %s ORDER BY vec ANN OF [2.0, 2.0, 3.0, 4.0] LIMIT 2");
69+
70+
// Confirm we can compact them all and run a query too
71+
compact();
72+
execute("SELECT pk FROM %s ORDER BY vec ANN OF [2.0, 2.0, 3.0, 4.0] LIMIT 2");
73+
}
74+
}

test/unit/org/apache/cassandra/index/sai/cql/VectorTester.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public static double computeRecall(List<float[]> vectors, float[] query, List<fl
174174
}
175175

176176
/**
177-
* {@link VectorTester} parameterized for {@link Version#CA} and {@link Version#DC}.
177+
* {@link VectorTester} parameterized for {@link Version#CA}, {@link Version#DC}, and {@link Version#EB}.
178178
*/
179179
@Ignore
180180
@RunWith(Parameterized.class)
@@ -186,14 +186,13 @@ abstract static class Versioned extends VectorTester
186186
@Parameterized.Parameters(name = "{0}")
187187
public static Collection<Object[]> data()
188188
{
189-
return Stream.of(Version.CA, Version.DC).map(v -> new Object[]{ v }).collect(Collectors.toList());
189+
// See Version file for explanation of changes associated with each version
190+
return Stream.of(Version.CA, Version.DC, Version.EC).map(v -> new Object[]{ v }).collect(Collectors.toList());
190191
}
191192

192193
@Before
193-
@Override
194-
public void setup() throws Throwable
194+
public void setCurrentVersion() throws Throwable
195195
{
196-
super.setup();
197196
SAIUtil.setCurrentVersion(version);
198197
}
199198
}

0 commit comments

Comments
 (0)