diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java index 3ea1bb4642df..b354307678e9 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SegmentBuilder.java @@ -36,6 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.UniformReservoir; import io.github.jbellis.jvector.quantization.VectorCompressor; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; @@ -78,8 +80,17 @@ public abstract class SegmentBuilder { private static final Logger logger = LoggerFactory.getLogger(SegmentBuilder.class); - /** for parallelism within a single compaction */ - public static final ExecutorService compactionExecutor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + private static final int COMPACTION_EXECUTOR_NUM_THREADS = Integer.getInteger("cassandra.sai.compaction.executor.threads", + Runtime.getRuntime().availableProcessors() / 2); + + static { + logger.debug("SAI compaction executor threads: {}", COMPACTION_EXECUTOR_NUM_THREADS); + } + + /** for parallelism within a single compaction + * see comments to JVector PhysicalCoreExecutor -- HT tends to cause contention for the SIMD units + */ + public static final ExecutorService compactionExecutor = new DebuggableThreadPoolExecutor(COMPACTION_EXECUTOR_NUM_THREADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10 * Runtime.getRuntime().availableProcessors()), @@ -246,6 +257,12 @@ public boolean requiresFlush() public static class VectorOffHeapSegmentBuilder extends SegmentBuilder { private final CompactionGraph graphIndex; + // Only using the no-sample reservoir to observe the behavior without sampling. Not for production use. + private final Histogram totalAddTimeHist = new Histogram(new UniformReservoir(10_000_000)); + private final Histogram addGraphTimeHist = new Histogram(new UniformReservoir(10_000_000)); + private LongAdder totalDuration = new LongAdder(); + private LongAdder addGraphTimeDuration = new LongAdder(); + private LongAdder totalAdds = new LongAdder(); public VectorOffHeapSegmentBuilder(IndexComponents.ForWrite components, long rowIdOffset, @@ -284,6 +301,7 @@ protected long addInternal(List terms, int segmentRowId) protected long addInternalAsync(List terms, int segmentRowId) { assert terms.size() == 1; + final long start = System.nanoTime(); // CompactionGraph splits adding a node into two parts: // (1) maybeAddVector, which must be done serially because it writes to disk incrementally @@ -302,6 +320,7 @@ protected long addInternalAsync(List terms, int segmentRowId) updatesInFlight.incrementAndGet(); compactionExecutor.submit(() -> { + long beforeAddGraphNode = System.nanoTime(); try { long bytesAdded = result.bytesUsed + graphIndex.addGraphNode(result); @@ -315,6 +334,14 @@ protected long addInternalAsync(List terms, int segmentRowId) finally { updatesInFlight.decrementAndGet(); + long now = System.nanoTime(); + totalAdds.increment(); + long addInternalAsyncDuration = now - start; + totalDuration.add(addInternalAsyncDuration); + totalAddTimeHist.update(addInternalAsyncDuration / 1000); + long addGraphNodeDuration = now - beforeAddGraphNode; + addGraphTimeDuration.add(addGraphNodeDuration); + addGraphTimeHist.update(addGraphNodeDuration / 1000); } }); // bytes allocated will be approximated immediately as the average of recently added terms, @@ -333,7 +360,21 @@ protected void flushInternal(SegmentMetadataBuilder metadataBuilder) throws IOEx { if (graphIndex.isEmpty()) return; + // Print diagnostic info + if (totalAdds.sum() > 0) + { + logger.info("Add internal async: average time: {} us, total time: {} us, total adds: {}", (double) totalDuration.sum() / totalAdds.sum() / 1000, totalDuration.sum() / 1000, totalAdds.sum()); + var totalAddTimeHistSnapshot = totalAddTimeHist.getSnapshot(); + logger.info("Add internal async time histogram: min: {} us, max: {} us, mean: {} us, median: {} us, 75th percentile: {} us, 95th percentile: {} us, 99th percentile: {} us, 99.9th percentile: {} us", + totalAddTimeHistSnapshot.getMin(), totalAddTimeHistSnapshot.getMax(), totalAddTimeHistSnapshot.getMean(), totalAddTimeHistSnapshot.getMedian(), totalAddTimeHistSnapshot.get75thPercentile(), totalAddTimeHistSnapshot.get95thPercentile(), totalAddTimeHistSnapshot.get99thPercentile(), totalAddTimeHistSnapshot.get999thPercentile()); + logger.info("Add graph node: average time {} us, total time: {} us, total adds: {}", (double) addGraphTimeDuration.sum() / totalAdds.sum() / 1000, addGraphTimeDuration.sum() / 1000, totalAdds.sum()); + var addGraphTimeHistSnapshot = addGraphTimeHist.getSnapshot(); + logger.info("Add graph node time histogram: min: {} us, max: {} us, mean: {} us, median: {} us, 75th percentile: {} us, 95th percentile: {} us, 99th percentile: {} us, 99.9th percentile: {} us", + addGraphTimeHistSnapshot.getMin(), addGraphTimeHistSnapshot.getMax(), addGraphTimeHistSnapshot.getMean(), addGraphTimeHistSnapshot.getMedian(), addGraphTimeHistSnapshot.get75thPercentile(), addGraphTimeHistSnapshot.get95thPercentile(), addGraphTimeHistSnapshot.get99thPercentile(), addGraphTimeHistSnapshot.get999thPercentile()); + } + long start = System.nanoTime(); var componentsMetadata = graphIndex.flush(); + logger.info("Flushing graph took {}ms", (System.nanoTime() - start) / 1_000_000); metadataBuilder.setComponentsMetadata(componentsMetadata); }