Skip to content

CNDB-14077: Reduce compaction thread pool size to match num of physical cores #1736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.UniformReservoir;
import io.github.jbellis.jvector.quantization.VectorCompressor;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
Expand Down Expand Up @@ -78,8 +80,17 @@ public abstract class SegmentBuilder
{
private static final Logger logger = LoggerFactory.getLogger(SegmentBuilder.class);

/** for parallelism within a single compaction */
public static final ExecutorService compactionExecutor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
private static final int COMPACTION_EXECUTOR_NUM_THREADS = Integer.getInteger("cassandra.sai.compaction.executor.threads",
Runtime.getRuntime().availableProcessors() / 2);

static {
logger.debug("SAI compaction executor threads: {}", COMPACTION_EXECUTOR_NUM_THREADS);
}

/** for parallelism within a single compaction
* see comments to JVector PhysicalCoreExecutor -- HT tends to cause contention for the SIMD units
*/
public static final ExecutorService compactionExecutor = new DebuggableThreadPoolExecutor(COMPACTION_EXECUTOR_NUM_THREADS,
1,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(10 * Runtime.getRuntime().availableProcessors()),
Expand Down Expand Up @@ -246,6 +257,12 @@ public boolean requiresFlush()
public static class VectorOffHeapSegmentBuilder extends SegmentBuilder
{
private final CompactionGraph graphIndex;
// Only using the no-sample reservoir to observe the behavior without sampling. Not for production use.
private final Histogram totalAddTimeHist = new Histogram(new UniformReservoir(10_000_000));
private final Histogram addGraphTimeHist = new Histogram(new UniformReservoir(10_000_000));
private LongAdder totalDuration = new LongAdder();
private LongAdder addGraphTimeDuration = new LongAdder();
private LongAdder totalAdds = new LongAdder();

public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components,
long rowIdOffset,
Expand Down Expand Up @@ -284,6 +301,7 @@ protected long addInternal(List<ByteBuffer> terms, int segmentRowId)
protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
{
assert terms.size() == 1;
final long start = System.nanoTime();

// CompactionGraph splits adding a node into two parts:
// (1) maybeAddVector, which must be done serially because it writes to disk incrementally
Expand All @@ -302,6 +320,7 @@ protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)

updatesInFlight.incrementAndGet();
compactionExecutor.submit(() -> {
long beforeAddGraphNode = System.nanoTime();
try
{
long bytesAdded = result.bytesUsed + graphIndex.addGraphNode(result);
Expand All @@ -315,6 +334,14 @@ protected long addInternalAsync(List<ByteBuffer> terms, int segmentRowId)
finally
{
updatesInFlight.decrementAndGet();
long now = System.nanoTime();
totalAdds.increment();
long addInternalAsyncDuration = now - start;
totalDuration.add(addInternalAsyncDuration);
totalAddTimeHist.update(addInternalAsyncDuration / 1000);
long addGraphNodeDuration = now - beforeAddGraphNode;
addGraphTimeDuration.add(addGraphNodeDuration);
addGraphTimeHist.update(addGraphNodeDuration / 1000);
}
});
// bytes allocated will be approximated immediately as the average of recently added terms,
Expand All @@ -333,7 +360,21 @@ protected void flushInternal(SegmentMetadataBuilder metadataBuilder) throws IOEx
{
if (graphIndex.isEmpty())
return;
// Print diagnostic info
if (totalAdds.sum() > 0)
{
logger.info("Add internal async: average time: {} us, total time: {} us, total adds: {}", (double) totalDuration.sum() / totalAdds.sum() / 1000, totalDuration.sum() / 1000, totalAdds.sum());
var totalAddTimeHistSnapshot = totalAddTimeHist.getSnapshot();
logger.info("Add internal async time histogram: min: {} us, max: {} us, mean: {} us, median: {} us, 75th percentile: {} us, 95th percentile: {} us, 99th percentile: {} us, 99.9th percentile: {} us",
totalAddTimeHistSnapshot.getMin(), totalAddTimeHistSnapshot.getMax(), totalAddTimeHistSnapshot.getMean(), totalAddTimeHistSnapshot.getMedian(), totalAddTimeHistSnapshot.get75thPercentile(), totalAddTimeHistSnapshot.get95thPercentile(), totalAddTimeHistSnapshot.get99thPercentile(), totalAddTimeHistSnapshot.get999thPercentile());
logger.info("Add graph node: average time {} us, total time: {} us, total adds: {}", (double) addGraphTimeDuration.sum() / totalAdds.sum() / 1000, addGraphTimeDuration.sum() / 1000, totalAdds.sum());
var addGraphTimeHistSnapshot = addGraphTimeHist.getSnapshot();
logger.info("Add graph node time histogram: min: {} us, max: {} us, mean: {} us, median: {} us, 75th percentile: {} us, 95th percentile: {} us, 99th percentile: {} us, 99.9th percentile: {} us",
addGraphTimeHistSnapshot.getMin(), addGraphTimeHistSnapshot.getMax(), addGraphTimeHistSnapshot.getMean(), addGraphTimeHistSnapshot.getMedian(), addGraphTimeHistSnapshot.get75thPercentile(), addGraphTimeHistSnapshot.get95thPercentile(), addGraphTimeHistSnapshot.get99thPercentile(), addGraphTimeHistSnapshot.get999thPercentile());
}
long start = System.nanoTime();
var componentsMetadata = graphIndex.flush();
logger.info("Flushing graph took {}ms", (System.nanoTime() - start) / 1_000_000);
metadataBuilder.setComponentsMetadata(componentsMetadata);
}

Expand Down