Skip to content

Conversation

pranavshenoy
Copy link
Contributor

add SSTableFlushObserver#onSSTableWriterSwitched to flush SAI segment builder for written shards without waiting for entire transaction to complete

This reduces memory usage during sharded compaction.

The PR is cherry-picked from here.

The Cassandra Jira

… SAI segment builder for written shards without waiting for entire transaction to complete (apache#1859)

This reduces memory usage during sharded compaction.

riptano/cndb#14725 OOM during SAI compaction
with large num of shards

Flush segment builder when sstable writer is switched to free memory
without waiting full compaction to complete

/** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0);
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(1);
Copy link
Contributor

@maedhroz maedhroz Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the DS fork, we actually do set this to 1, but here in OSS, we start at zero and ensure safety by making sure the access and increment/decrement operations happen in the correct order below. I would revert this and remove the misleading field comment. However, that does mean you'll probably get a failure in CompactionTest#testSegmentBuilderFlushWithShardedCompaction(), which assumes we initialize to 1 (and this is correct in the DS fork, of course). That should be easy enough to fix in the test itself...

while (!future.isDone())
{
// ACTIVE_BUILDER_COUNT starts from 1. There are 2 segments for 2 indexes
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(1).isLessThanOrEqualTo(3);
Copy link
Contributor

@maedhroz maedhroz Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we revert the ACTIVE_BUILDER_COUNT thing above, this becomes...

// ACTIVE_BUILDER_COUNT starts from 0. There are 2 segments for 2 indexes
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(2);

(Suggested by Claude, but I think it's correct.)

finally
{
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this times out, we might want to fail the test?

import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Assert;
import static org.junit.Assert.assertFalse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

return;

boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
logger.debug("Flushing index {} with {}buffered data on sstable writer switched...", index.identifier().indexName, emptySegment ? "no " : "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the standard format...

Suggested change
logger.debug("Flushing index {} with {}buffered data on sstable writer switched...", index.identifier().indexName, emptySegment ? "no " : "");
logger.debug(index.identifier().logMessage("Flushing index with {}buffered data on SSTable writer switched..."), emptySegment ? "no " : "");

boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
logger.debug("Flushing index {} with {}buffered data on sstable writer switched...", index.identifier().indexName, emptySegment ? "no " : "");
if (!emptySegment)
flushSegment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we don't have to aggressively create a new builder here because addTerm(), complete(), and abort() will all handle the currentBuilder == null case appropriately.

void complete(Stopwatch stopwatch) throws IOException;

/**
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
* Called when current SSTable writer is switched during sharded compaction to free any in-memory resources associated


/**
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
* with the sstable for current index without waiting for full transaction to complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* with the sstable for current index without waiting for full transaction to complete
* with the SSTable for current index without waiting for full transaction to complete

logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t);
abort(t, true);
// fail compaction task or index build task if SAI failed
throw Throwables.unchecked(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary. When abort() is called with fromIndex == true, as we do here, the exception will be thrown at the end of abort().

Copy link
Contributor

@maedhroz maedhroz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped some comments throughout the PR. We should probably also make sure we don't refer to CNDB-14725 in the commit message ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants