Skip to content

Commit f358303

Browse files
CNDB-16252: Add IndexComponentsImpl#tmpFileFor to allow vector CompactionGraph to index multiple columns (#2177)
### What is the issue Fixes riptano/cndb#16252 ### What does this PR fix and why was it fixed Add a new method to the `IndexComponents.ForWrite` interface named `tmpFileFor`. The new method is expected to create temporary files that are namespaced to the index build. This fixes an issue in the `CompactionGraph` where we incorrectly created temp files with the same name in such a way that they collided. Now the files have the column name and the build id, so they will be properly namespaced while maintaining some form of meaningful name.
1 parent a79fd6e commit f358303

File tree

4 files changed

+37
-4
lines changed

4 files changed

+37
-4
lines changed

src/java/org/apache/cassandra/index/sai/disk/format/IndexComponents.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.cassandra.io.sstable.Component;
3636
import org.apache.cassandra.io.sstable.Descriptor;
3737
import org.apache.cassandra.io.sstable.SSTable;
38+
import org.apache.cassandra.io.util.File;
3839
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
3940

4041
/**
@@ -346,5 +347,14 @@ interface ForWrite extends ForRead
346347
* should be added to this writer after this call).
347348
*/
348349
void markComplete() throws IOException;
350+
351+
/**
352+
* Create a temporary {@link File} namespaced within the per index components. Repeated calls with the same
353+
* componentName will produce the same file.
354+
* @param componentName - unique name within the per index components
355+
* @return a temprory file for use during index construction
356+
* @throws IOException
357+
*/
358+
File tmpFileFor(String componentName) throws IOException;
349359
}
350360
}

src/java/org/apache/cassandra/index/sai/disk/format/IndexDescriptor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,14 @@ public IndexComponent.ForWrite getForWrite(IndexComponentType component)
510510
return info;
511511
}
512512

513+
@Override
514+
public File tmpFileFor(String componentName) throws IOException
515+
{
516+
String name = context != null ? String.format("%s_%s_%s", buildId, context.getColumnName(), componentName)
517+
: String.format("%s_%s", buildId, componentName);
518+
return descriptor.tmpFileFor(new Component(Component.Type.CUSTOM, name));
519+
}
520+
513521
@Override
514522
public void forceDeleteAllComponents()
515523
{

src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ public CompactionGraph(IndexComponents.ForWrite perIndexComponents, VectorCompre
184184
this.useSyntheticOrdinals = !V5OnDiskFormat.writeV5VectorPostings(context.version()) || !allRowsHaveVectors;
185185

186186
// the extension here is important to signal to CFS.scrubDataDirectories that it should be removed if present at restart
187-
Component tmpComponent = new Component(Component.Type.CUSTOM, "chronicle" + Descriptor.TMP_EXT);
188-
postingsFile = dd.fileFor(tmpComponent);
187+
postingsFile = perIndexComponents.tmpFileFor("postings_chonicle_map");
189188
postingsMap = ChronicleMapBuilder.of((Class<VectorFloat<?>>) (Class) VectorFloat.class, (Class<CompactionVectorPostings>) (Class) CompactionVectorPostings.class)
190189
.averageKeySize(dimension * Float.BYTES)
191190
.averageValueSize(VectorPostings.emptyBytesUsed() + RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2 * Integer.BYTES)
@@ -195,8 +194,7 @@ public CompactionGraph(IndexComponents.ForWrite perIndexComponents, VectorCompre
195194
.createPersistedTo(postingsFile.toJavaIOFile());
196195

197196
// Formatted so that the full resolution vector is written at the ordinal * vector dimension offset
198-
Component vectorsByOrdinalComponent = new Component(Component.Type.CUSTOM, "vectors_by_ordinal");
199-
vectorsByOrdinalTmpFile = dd.tmpFileFor(vectorsByOrdinalComponent);
197+
vectorsByOrdinalTmpFile = perIndexComponents.tmpFileFor("vectors_by_ordinal");
200198
vectorsByOrdinalBufferedWriter = new BufferedRandomAccessWriter(vectorsByOrdinalTmpFile.toPath());
201199

202200
// VSTODO add LVQ

test/unit/org/apache/cassandra/index/sai/cql/VectorTypeTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,6 +1060,23 @@ public void testUpdateRowScoreToWorsePositionButIncludeInBatch()
10601060
assertRows(execute("SELECT c FROM %s ORDER BY r ANN OF [0.1, 0.1] LIMIT 10"), row(2), row(1));
10611061
}
10621062

1063+
@Test
1064+
public void testIndexingMultipleVectorColumns() throws Throwable
1065+
{
1066+
createTable("CREATE TABLE %s (pk int, val1 vector<float, 128>, val2 vector<float, 128>, PRIMARY KEY(pk))");
1067+
createIndex("CREATE CUSTOM INDEX ON %s(val1) USING 'StorageAttachedIndex'");
1068+
createIndex("CREATE CUSTOM INDEX ON %s(val2) USING 'StorageAttachedIndex'");
1069+
1070+
for (int i = 0; i < 2 * CassandraOnHeapGraph.MIN_PQ_ROWS; i++)
1071+
execute("INSERT INTO %s (pk, val1, val2) VALUES (?, ?, ?)", i, randomVectorBoxed(128), randomVectorBoxed(128));
1072+
1073+
runThenFlushThenCompact(() -> {
1074+
// Run a search on each as a sanity check
1075+
assertRowCount(execute("SELECT pk FROM %s ORDER BY val1 ANN OF ? LIMIT 10", randomVectorBoxed(128)), 10);
1076+
assertRowCount(execute("SELECT pk FROM %s ORDER BY val2 ANN OF ? LIMIT 10", randomVectorBoxed(128)), 10);
1077+
});
1078+
}
1079+
10631080
@Test
10641081
public void testRowIdIteratorClosedOnHasNextFailure() throws Throwable
10651082
{

0 commit comments

Comments
 (0)