-
Notifications
You must be signed in to change notification settings - Fork 21
CNDB-13952: Handle Chronicle Map entry overflow in vector index compaction #1731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Checklist before you submit for review
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good,
I have left some minor comments
maxRow = max(maxRow, rowId); | ||
} | ||
|
||
assert maxOldOrdinal >= 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the previous behavior with "orElseThrow" was to throw if the collection was empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this now throws an IllegalStateException
if the values were not set.
src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java
Outdated
Show resolved
Hide resolved
for (int posting : postings.getPostings()) | ||
writer.writeVInt(posting); | ||
} | ||
catch (Exception e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(here and below) catching Exception is usually a code smell, and then rethrowing as unchecked RuntimeException is also bad
do we have a better way to rethrow ?
should we also handle InterruptedException ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated it to handle only IOException
. We do not expect to hit that condition though. We have this exception because I used the lucene DataOutput
class to handle the vint serde.
} | ||
|
||
@Override | ||
public void writeByte(byte b) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cannot throw IOException, maybe we can remove the "throws" clause and simplify the code above ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right that we can (and should) remove this, but it doesn't fix the above code because the class has an exception in the readVInt()
method signature.
test/unit/org/apache/cassandra/index/sai/disk/vector/CompactionGraphTest.java
Outdated
Show resolved
Hide resolved
@eolivelli - this is ready for another review, please take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I have a couple of minor suggestions.
// means that we might have a smaller vector graph than desired, but at least we will not | ||
// fail to build the index. | ||
value.setShouldCompress(true); | ||
map.put(key, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible it sill fails after compression? If so, then what? Maybe we should still provide at least some diagnostic message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given my testing and our usage, it's unlikely to fail a second time. We add postings to this map one row at a time on a single thread, so when we cross the threshold for max postings in an array, we do so by 4 bytes. I'll update the debug log line above since that'll likely be sufficient. The other option is to see if there is a better data structure fore us. I heard there was a lucene option that might handle this more gracefully without special encoding.
for (int posting : postings.getPostings()) | ||
writer.writeVInt(posting); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are those postings sorted? Just an idea: if they are sorted, maybe better to use delta-encoding (and if they are not sorted, maybe we could sort them?). Deltas would be usually smaller, hence vints would be smaller as well. Especially if there are lot of duplicates, you'd get many zeroes which compress down to 1 byte. Looks like you don't need a random access to the middle of a posting list on disk, but you deserialize all at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are sorted, yes. We could consider delta encoding too. There are not expected to be duplicates though because a row has at most one vector when constructing a graph during compaction. And you're correct that we consume the list entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main reason for not going further in the compression is that we flush immediately after hitting this code block, and we only need to find 4 bytes of savings to prevent a subsequent failure on put.
@@ -203,8 +213,25 @@ static class Marshaller implements BytesReader<CompactionVectorPostings>, BytesW | |||
public void write(Bytes out, CompactionVectorPostings postings) { | |||
out.writeInt(postings.ordinal); | |||
out.writeInt(postings.size()); | |||
for (Integer posting : postings.getPostings()) { | |||
out.writeInt(posting); | |||
out.writeBoolean(postings.shouldCompress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing: doesn't it break backwards compatibility? The format is different now. Shouldn't we bump up the version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is a temporary file that does not survive to the next instantiation of the JVM on the same node. See:
cassandra/src/java/org/apache/cassandra/index/sai/disk/vector/CompactionGraph.java
Lines 178 to 187 in ed33431
// the extension here is important to signal to CFS.scrubDataDirectories that it should be removed if present at restart | |
Component tmpComponent = new Component(Component.Type.CUSTOM, "chronicle" + Descriptor.TMP_EXT); | |
postingsFile = dd.fileFor(tmpComponent); | |
postingsMap = ChronicleMapBuilder.of((Class<VectorFloat<?>>) (Class) VectorFloat.class, (Class<CompactionVectorPostings>) (Class) CompactionVectorPostings.class) | |
.averageKeySize(dimension * Float.BYTES) | |
.averageValueSize(VectorPostings.emptyBytesUsed() + RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2 * Integer.BYTES) | |
.keyMarshaller(new VectorFloatMarshaller()) | |
.valueMarshaller(new VectorPostings.Marshaller()) | |
.entries(postingsEntriesAllocated) | |
.createPersistedTo(postingsFile.toJavaIOFile()); |
We use the Descriptor.TMP_EXT
file extension to ensure that the file is removed if present at restart.
|
✔️ Build ds-cassandra-pr-gate/PR-1731 approved by ButlerApproved by Butler |
What is the issue
Fixes: https://github.com/riptano/cndb/issues/13952
What does this PR fix and why was it fixed
We were hitting the entry size limit in some cases (where there were an excessive number of duplicates). This code handles that exception by attempting to reduce the size required for storing those duplicates by writing the dupes as varints instead of plain integers.
Note that most cases have only a handful of duplicated vectors per graph, so we do not optimize for this large number of dupe case. Further, chronicle map allocates a minimum chunk for an entry, and we are often under that size, so there is no reason to only write the ints as varints.