Skip to content

CNDB-14077: Reduce compaction thread pool size to match num of physical cores #1736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/index/sai/IndexContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public View getReferencedView(long timeoutNanos)
View view = viewManager.getView();
if (view.reference())
return view;
} while (MonotonicClock.approxTime.isAfter(deadline));
} while (!MonotonicClock.approxTime.isAfter(deadline));

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ public abstract class SegmentBuilder
{
private static final Logger logger = LoggerFactory.getLogger(SegmentBuilder.class);

/** for parallelism within a single compaction */
public static final ExecutorService compactionExecutor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
/** for parallelism within a single compaction
* see comments to JVector PhysicalCoreExecutor -- HT tends to cause contention for the SIMD units
*/
public static final ExecutorService compactionExecutor = new DebuggableThreadPoolExecutor(Runtime.getRuntime().availableProcessors() / 2,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about making this configurable ? in case we need to rollback

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, I copied this logic from:

// see comments to JVector PhysicalCoreExecutor -- HT tends to cause contention for the SIMD units
private static final ForkJoinPool compactionSimdPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() / 2,
new LowPriorityThreadFactory(),
null,
false);

Are we looking to make all of these configurable?

1,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(10 * Runtime.getRuntime().availableProcessors()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -42,17 +43,23 @@
import org.apache.cassandra.index.sai.SSTableIndex;
import org.apache.cassandra.index.sai.StorageAttachedIndex;
import org.apache.cassandra.index.sai.disk.format.IndexComponents;
import org.apache.cassandra.inject.Injections;
import org.apache.cassandra.inject.InvokePointBuilder;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MonotonicClock;
import org.awaitility.Awaitility;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -257,6 +264,87 @@ public void testMarkIndexWasDropped()
assertFalse(view.reference());
}

@Test
public void testRetryGetReferencedView() throws Throwable
{
createTable("CREATE TABLE %S (k INT PRIMARY KEY, v INT)");
String indexName = createIndex("CREATE CUSTOM INDEX ON %s(v) USING 'StorageAttachedIndex'");
disableCompaction();

ColumnFamilyStore store = getCurrentColumnFamilyStore();
IndexContext columnContext = columnIndex(store, indexName);

// Insert data and flush to create initial SSTable
execute("INSERT INTO %s(k, v) VALUES (1, 10)");
execute("INSERT INTO %s(k, v) VALUES (2, 20)");
flush();

// Create a barrier that will pause the getReferencedView method
Injections.Barrier viewReferencePause =
Injections.newBarrier("pause_get_referenced_view", 3, false)
.add(InvokePointBuilder.newInvokePoint()
.onClass(View.class)
.onMethod("reference"))
.build();

// Create a counter to track how many times IndexViewManager.getView() is called
Injections.Counter referenceCounter =
Injections.newCounter("get_view_counter")
.add(InvokePointBuilder.newInvokePoint()
.onClass(IndexViewManager.class)
.onMethod("getView"))
.build();

try
{
// Inject the barrier and counter
Injections.inject(viewReferencePause, referenceCounter);

// Start a thread that will try to get a referenced view. The deadline is high because we want to loop.
Future<View> viewFutureExpectSuccess = CompletableFuture.supplyAsync(() -> {
return columnContext.getReferencedView(TimeUnit.SECONDS.toNanos(100));
});

// Start a thread that will try to get a referenced view. The deadline is high because we want to loop.
Future<View> viewFutureExpectNull = CompletableFuture.supplyAsync(() -> {
return columnContext.getReferencedView(0);
});

// Barrier should have 1 remaining await
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> viewReferencePause.getCount() == 1);

// Sleep long enough for the next call to isAfter to return true
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(MonotonicClock.approxTime.error()));

// While getReferencedView is paused, perform a compaction to change the view
execute("INSERT INTO %s(k, v) VALUES (3, 30)");
flush();

// Confirm state before proceeding
assertEquals("getView should have been called once so far", 2, referenceCounter.get());

// Release just in case it hasn't been yet.
viewReferencePause.countDown();

// Get the result and verify it's not null
View result = viewFutureExpectSuccess.get(5, TimeUnit.SECONDS);
assertNotNull("Should have eventually gotten a referenced view", result);

// Verify that the other thread got a null result
assertNull("Should have eventually gotten a null view", viewFutureExpectNull.get());

// Verify that reference() was called 3 times (indicating retry on the first but not the second)
assertEquals("Reference should have been called 3 times", referenceCounter.get(), 3);

// Clean up
result.release();
}
finally
{
Injections.deleteAll();
}
}

private IndexContext columnIndex(ColumnFamilyStore store, String indexName)
{
assert store.indexManager != null;
Expand Down