Skip to content

Commit 2915032

Browse files
CNDB-13462: Add support for updating terms in SAI's TrieMemoryIndex (#1200)
Add support for updating a row's terms in the `TrieMemoryIndex`. There are two general cases for row to term mappings: 1:one and 1:many. In the 1 to 1 case, update is trivial. We check before insertion that we have a new value, then we insert the new value and remove the old value. In the 1 to many case, the solution is somewhat bespoke, but is well documnted and encapsulated. The main challenge comes in the form of when to add/remove terms without increasing the overall time complexity of the update operation. In this design, a majority of the complexity is in the PrimaryKeys object. There, we monitor seen keys and use whether we've seen the key to decide whether to reset the frequency counter and then subsequently to decide whether to leave the key or remove it (the term could be in both the old and the new values). This whole design hinges on the fact that we synchronize additions to the trie. The one drawback in terms of performance is that collections are added and removed within the sync block. This change is necessary to deal with encoding nuances unless we want to largely refactor the class. The motivation for needing updates is to ensure that we have unique primary keys in the postings list for scalar values and for a given map key. Supporting updates means that the order of terms within an index is internally consistent.
1 parent 0703ce3 commit 2915032

14 files changed

+594
-146
lines changed

src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,12 @@ public void addMemoryUsageTo(MemoryUsage stats)
183183

184184
public void markExtraOnHeapUsed(long additionalSpace, OpOrder.Group opGroup)
185185
{
186-
getAllocator().onHeap().allocate(additionalSpace, opGroup);
186+
getAllocator().onHeap().adjust(additionalSpace, opGroup);
187187
}
188188

189189
public void markExtraOffHeapUsed(long additionalSpace, OpOrder.Group opGroup)
190190
{
191-
getAllocator().offHeap().allocate(additionalSpace, opGroup);
191+
getAllocator().offHeap().adjust(additionalSpace, opGroup);
192192
}
193193

194194
@Override

src/java/org/apache/cassandra/index/sai/IndexContext.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -402,22 +402,26 @@ private boolean validateCumulativeAnalyzedTermLimit(DecoratedKey key, AbstractAn
402402

403403
public void update(DecoratedKey key, Row oldRow, Row newRow, Memtable memtable, OpOrder.Group opGroup)
404404
{
405-
if (!isVector())
406-
{
407-
index(key, newRow, memtable, opGroup);
408-
return;
409-
}
410-
411405
MemtableIndex target = liveMemtables.get(memtable);
412406
if (target == null)
413407
return;
414-
// Use 0 for nowInSecs to get the value from the oldRow regardless of its liveness status. To get to this point,
408+
409+
// Use 0 for nowInSecs to get the value(s) from the oldRow regardless of its liveness status. To get to this point,
415410
// C* has already determined this is the current represntation of the oldRow in the memtable, and that means
416411
// we need to add the newValue to the index and remove the oldValue from it, even if it has already expired via
417412
// TTL.
418-
ByteBuffer oldValue = getValueOf(key, oldRow, 0);
419-
ByteBuffer newValue = getValueOf(key, newRow, FBUtilities.nowInSeconds());
420-
target.update(key, oldRow.clustering(), oldValue, newValue, memtable, opGroup);
413+
if (isNonFrozenCollection())
414+
{
415+
Iterator<ByteBuffer> oldValues = getValuesOf(oldRow, 0);
416+
Iterator<ByteBuffer> newValues = getValuesOf(newRow, FBUtilities.nowInSeconds());
417+
target.update(key, oldRow.clustering(), oldValues, newValues, memtable, opGroup);
418+
}
419+
else
420+
{
421+
ByteBuffer oldValue = getValueOf(key, oldRow, 0);
422+
ByteBuffer newValue = getValueOf(key, newRow, FBUtilities.nowInSeconds());
423+
target.update(key, oldRow.clustering(), oldValue, newValue, memtable, opGroup);
424+
}
421425
}
422426

423427
public void renewMemtable(Memtable renewed)
@@ -720,7 +724,10 @@ public boolean supports(Operator op)
720724
if (op.isLike() || op == Operator.LIKE) return false;
721725
// Analyzed columns store the indexed result, so we are unable to compute raw equality.
722726
// The only supported operators are ANALYZER_MATCHES and BM25.
723-
if (op == Operator.ANALYZER_MATCHES || op == Operator.BM25) return isAnalyzed;
727+
if (op == Operator.ANALYZER_MATCHES) return isAnalyzed;
728+
// BM25 frequency calculations only work on non-collection columns because it assumes a 1:1 mapping from PrK
729+
// to frequency, but collections have mulitple documents.
730+
if (op == Operator.BM25) return isAnalyzed && !isCollection();
724731

725732
// If the column is analyzed and the operator is EQ, we need to check if the analyzer supports it.
726733
if (op == Operator.EQ && isAnalyzed && !analyzerFactory.supportsEquals())

src/java/org/apache/cassandra/index/sai/disk/vector/VectorMemtableIndex.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,12 @@ public void update(DecoratedKey key, Clustering clustering, ByteBuffer oldValue,
174174
}
175175
}
176176

177+
@Override
178+
public void update(DecoratedKey key, Clustering clustering, Iterator<ByteBuffer> oldValues, Iterator<ByteBuffer> newValues, Memtable memtable, OpOrder.Group opGroup)
179+
{
180+
throw new UnsupportedOperationException("Vector index does not support multi-value updates");
181+
}
182+
177183
private void updateKeyBounds(PrimaryKey primaryKey) {
178184
if (minimumKey == null)
179185
minimumKey = primaryKey;

src/java/org/apache/cassandra/index/sai/memory/MemoryIndex.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,28 @@ public abstract void add(DecoratedKey key,
5252
LongConsumer onHeapAllocationsTracker,
5353
LongConsumer offHeapAllocationsTracker);
5454

55+
/**
56+
* Update the index value for the given key and clustering by removing the old value and adding the new value.
57+
* This is meant to be used when the indexed column is any type other than a non-frozen collection.
58+
*/
59+
public abstract void update(DecoratedKey key,
60+
Clustering clustering,
61+
ByteBuffer oldValue,
62+
ByteBuffer newValue,
63+
LongConsumer onHeapAllocationsTracker,
64+
LongConsumer offHeapAllocationsTracker);
65+
66+
/**
67+
* Update the index value for the given key and clustering by removing the old values and adding the new values.
68+
* This is meant to be used when the indexed column is a non-frozen collection.
69+
*/
70+
public abstract void update(DecoratedKey key,
71+
Clustering clustering,
72+
Iterator<ByteBuffer> oldValues,
73+
Iterator<ByteBuffer> newValues,
74+
LongConsumer onHeapAllocationsTracker,
75+
LongConsumer offHeapAllocationsTracker);
76+
5577
public abstract CloseableIterator<PrimaryKeyWithSortKey> orderBy(Orderer orderer, Expression slice);
5678

5779
public abstract KeyRangeIterator search(Expression expression, AbstractBounds<PartitionPosition> keyRange);

src/java/org/apache/cassandra/index/sai/memory/MemtableIndex.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@ public interface MemtableIndex extends MemtableOrdering
7070

7171
void index(DecoratedKey key, Clustering clustering, ByteBuffer value, Memtable memtable, OpOrder.Group opGroup);
7272

73-
default void update(DecoratedKey key, Clustering clustering, ByteBuffer oldValue, ByteBuffer newValue, Memtable memtable, OpOrder.Group opGroup)
74-
{
75-
throw new UnsupportedOperationException();
76-
}
73+
void update(DecoratedKey key, Clustering clustering, ByteBuffer oldValue, ByteBuffer newValue, Memtable memtable, OpOrder.Group opGroup);
74+
void update(DecoratedKey key, Clustering clustering, Iterator<ByteBuffer> oldValues, Iterator<ByteBuffer> newValues, Memtable memtable, OpOrder.Group opGroup);
7775

7876
KeyRangeIterator search(QueryContext queryContext, Expression expression, AbstractBounds<PartitionPosition> keyRange, int limit);
7977

0 commit comments

Comments
 (0)