Skip to content

Commit c5300ae

Browse files
authored
CNDB-14614-re-merge: abort compaction task or index build if index is unloaded after tenant unassignment (#1766)
### What is the issue When schema is unloaded after tenant unassignment, compaction task might finishes without corresponding index files, making index non-queryable. ### What does this PR fix and why was it fixed Replace `isValid` with `isDropped` and `isUnloaded`. If index is dropped, compaction task or index build can proceed without the index, same behavior as before. If index is unloaded, compaction task or index build will be aborted to avoid completing without index files. --- #1754 was reverted because[ CNDB PR](riptano/cndb#14179) compiled failed with wrong hash. Re-merge it again.
1 parent 84fd47b commit c5300ae

File tree

5 files changed

+159
-14
lines changed

5 files changed

+159
-14
lines changed

src/java/org/apache/cassandra/index/sai/StorageAttachedIndex.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,9 @@ public List<SecondaryIndexBuilder> getParallelIndexBuildTasks(ColumnFamilyStore
234234
// Tracks whether or not we've started the index build on initialization.
235235
private volatile boolean canFlushFromMemtableIndex = false;
236236

237-
// Tracks whether the index has been invalidated due to removal, a table drop, etc.
238-
private volatile boolean valid = true;
237+
// Tracks whether the index has been dropped due to removal, a table drop, etc or index schema is unloaded after schema unassignment
238+
private volatile boolean dropped = false;
239+
private volatile boolean unloaded = false;
239240

240241
/**
241242
* Called via reflection from SecondaryIndexManager
@@ -554,7 +555,7 @@ public Callable<?> getInvalidateTask()
554555
return () ->
555556
{
556557
// mark index as invalid, in-progress SSTableIndexWriters will abort
557-
valid = false;
558+
dropped = true;
558559

559560
// in case of dropping table, SSTable indexes should already been removed by SSTableListChangedNotification.
560561
for (SSTableIndex sstableIndex : indexContext.getView().getIndexes())
@@ -574,7 +575,7 @@ public Callable<?> getUnloadTask()
574575
return () ->
575576
{
576577
// mark index as invalid, in-progress SSTableIndexWriters will abort
577-
valid = false;
578+
unloaded = true;
578579

579580
indexContext.invalidate(false);
580581
return null;
@@ -600,9 +601,14 @@ public boolean canFlushFromMemtableIndex()
600601
return canFlushFromMemtableIndex;
601602
}
602603

603-
public BooleanSupplier isIndexValid()
604+
public BooleanSupplier isDropped()
604605
{
605-
return () -> valid;
606+
return () -> dropped;
607+
}
608+
609+
public BooleanSupplier isUnloaded()
610+
{
611+
return () -> unloaded;
606612
}
607613

608614
private Future<?> startPreJoinTask()

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public class SSTableIndexWriter implements PerIndexWriter
6868
private final IndexContext indexContext;
6969
private final int nowInSec = FBUtilities.nowInSeconds();
7070
private final NamedMemoryLimiter limiter;
71-
private final BooleanSupplier isIndexValid;
71+
private final BooleanSupplier isIndexDropped;
72+
private final BooleanSupplier isIndexUnloaded;
7273
private final long keyCount;
7374

7475
private boolean aborted = false;
@@ -77,13 +78,15 @@ public class SSTableIndexWriter implements PerIndexWriter
7778
private SegmentBuilder currentBuilder;
7879
private final List<SegmentMetadata> segments = new ArrayList<>();
7980

80-
public SSTableIndexWriter(IndexComponents.ForWrite perIndexComponents, NamedMemoryLimiter limiter, BooleanSupplier isIndexValid, long keyCount)
81+
public SSTableIndexWriter(IndexComponents.ForWrite perIndexComponents, NamedMemoryLimiter limiter,
82+
BooleanSupplier isIndexDropped, BooleanSupplier isIndexUnloaded, long keyCount)
8183
{
8284
this.perIndexComponents = perIndexComponents;
8385
this.indexContext = perIndexComponents.context();
8486
Preconditions.checkNotNull(indexContext, "Provided components %s are the per-sstable ones, expected per-index ones", perIndexComponents);
8587
this.limiter = limiter;
86-
this.isIndexValid = isIndexValid;
88+
this.isIndexDropped = isIndexDropped;
89+
this.isIndexUnloaded = isIndexUnloaded;
8790
this.keyCount = keyCount;
8891
}
8992

@@ -222,11 +225,23 @@ private boolean maybeAbort()
222225
if (aborted)
223226
return true;
224227

225-
if (isIndexValid.getAsBoolean())
228+
boolean dropped = isIndexDropped.getAsBoolean();
229+
boolean unloaded = isIndexUnloaded.getAsBoolean();
230+
if (!dropped && !unloaded)
226231
return false;
227232

228-
abort(new RuntimeException(String.format("index %s is dropped", indexContext.getIndexName())));
229-
return true;
233+
String message = String.format("index %s is %s", indexContext.getIndexName(), dropped ? "dropped" : "unloaded");
234+
RuntimeException runtimeException = new RuntimeException(message);
235+
236+
// abort index build for remove on disk index file
237+
abort(runtimeException);
238+
239+
// if index is dropped, we can continue compaction task or index build without current index
240+
if (dropped)
241+
return true;
242+
243+
// if index is unloaded after unassigning tenant, fail the compaction task or index build to avoid incomplete index files
244+
throw runtimeException;
230245
}
231246

232247
private boolean addTerm(ByteBuffer term, PrimaryKey key, long sstableRowId, AbstractType<?> type) throws IOException

src/java/org/apache/cassandra/index/sai/disk/v1/V1OnDiskFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public PerIndexWriter newPerIndexWriter(StorageAttachedIndex index,
207207
logger.debug(index.getIndexContext().logMessage("Starting a compaction index build. Global segment memory usage: {}"),
208208
prettyPrintMemory(limiter.currentBytesUsed()));
209209

210-
return new SSTableIndexWriter(perIndexComponents, limiter, index.isIndexValid(), keyCount);
210+
return new SSTableIndexWriter(perIndexComponents, limiter, index.isDropped(), index.isUnloaded(), keyCount);
211211
}
212212

213213
return new MemtableIndexWriter(context.getPendingMemtableIndex(tracker),

test/unit/org/apache/cassandra/index/sai/cql/NativeIndexDDLTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.cassandra.index.sai.StorageAttachedIndex;
6363
import org.apache.cassandra.index.sai.StorageAttachedIndexBuilder;
6464
import org.apache.cassandra.index.sai.StorageAttachedIndexGroup;
65+
import org.apache.cassandra.index.sai.disk.StorageAttachedIndexWriter;
6566
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
6667
import org.apache.cassandra.index.sai.disk.format.Version;
6768
import org.apache.cassandra.index.sai.disk.v1.SegmentBuilder;
@@ -125,6 +126,11 @@ public class NativeIndexDDLTest extends SAITester
125126
.onMethod("<init>"))
126127
.add(ActionBuilder.newActionBuilder().actions().doThrow(RuntimeException.class, Expression.quote("Injected failure!")))
127128
.build();
129+
130+
private static final Injection pauseSAIWriterComplete = Injections.newPause("pause_sai_writer_complete", 10_000)
131+
.add(InvokePointBuilder.newInvokePoint().onClass(StorageAttachedIndexWriter.class).onMethod("complete"))
132+
.build();
133+
128134
@BeforeClass
129135
public static void init()
130136
{
@@ -960,6 +966,124 @@ public void testIndexCompactionAborted() throws Throwable
960966
assertEquals(4, rows.all().size());
961967
}
962968

969+
@Test
970+
public void testIndexCompactionWhenIndexIsDropped() throws Throwable
971+
{
972+
// prepare schema and data
973+
createTable(CREATE_TABLE_TEMPLATE);
974+
String indexName1 = createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
975+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
976+
977+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0');");
978+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('1', 1, '0');");
979+
flush();
980+
981+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('2', 0, '0');");
982+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('3', 1, '0');");
983+
flush();
984+
985+
ResultSet rows = executeNet("SELECT id1 FROM %s WHERE v1>=0");
986+
assertEquals(4, rows.all().size());
987+
988+
// pause index writer completion
989+
Injections.inject(pauseSAIWriterComplete);
990+
pauseSAIWriterComplete.enable();
991+
992+
try
993+
{
994+
995+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
996+
assertThat(cfs.getLiveSSTables()).hasSize(2);
997+
998+
new TestWithConcurrentVerification(
999+
() -> cfs.forceMajorCompaction(),
1000+
() -> {
1001+
try
1002+
{
1003+
// wait for compaction paused
1004+
FBUtilities.sleepQuietly(5000);
1005+
1006+
dropIndex("DROP INDEX %s." + indexName1);
1007+
}
1008+
catch (Exception e)
1009+
{
1010+
throw new RuntimeException(e);
1011+
}
1012+
}, -1 // run verification task once
1013+
).start();
1014+
1015+
// compaction completed
1016+
assertThat(cfs.getLiveSSTables()).hasSize(1);
1017+
1018+
StorageAttachedIndexGroup saiGroup = StorageAttachedIndexGroup.getIndexGroup(cfs);
1019+
assertThat(saiGroup.sstableContextManager().size()).isEqualTo(1);
1020+
}
1021+
finally
1022+
{
1023+
pauseSAIWriterComplete.disable();
1024+
}
1025+
}
1026+
1027+
@Test
1028+
public void testIndexCompactionWhenIndexIsUnloaded() throws Throwable
1029+
{
1030+
// prepare schema and data
1031+
createTable(CREATE_TABLE_TEMPLATE);
1032+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
1033+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
1034+
1035+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('0', 0, '0');");
1036+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('1', 1, '0');");
1037+
flush();
1038+
1039+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('2', 0, '0');");
1040+
execute("INSERT INTO %s (id1, v1, v2) VALUES ('3', 1, '0');");
1041+
flush();
1042+
1043+
ResultSet rows = executeNet("SELECT id1 FROM %s WHERE v1>=0");
1044+
assertEquals(4, rows.all().size());
1045+
1046+
// pause index writer completion
1047+
Injections.inject(pauseSAIWriterComplete);
1048+
pauseSAIWriterComplete.enable();
1049+
1050+
try
1051+
{
1052+
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
1053+
assertThat(cfs.getLiveSSTables()).hasSize(2);
1054+
TestWithConcurrentVerification compactionTask = new TestWithConcurrentVerification(
1055+
() -> cfs.forceMajorCompaction(),
1056+
() -> {
1057+
try
1058+
{
1059+
// wait for compaction paused
1060+
FBUtilities.sleepQuietly(5000);
1061+
1062+
SecondaryIndexManager sim = cfs.getIndexManager();
1063+
sim.unloadAllIndexes();
1064+
}
1065+
catch (Exception e)
1066+
{
1067+
throw new RuntimeException(e);
1068+
}
1069+
}, -1 // run verification task once
1070+
);
1071+
1072+
assertThatThrownBy(compactionTask::start).hasMessageContaining("is unloaded");
1073+
1074+
// compaction is aborted
1075+
assertThat(cfs.getLiveSSTables()).hasSize(2);
1076+
1077+
// indexes are unloaded
1078+
StorageAttachedIndexGroup saiGroup = StorageAttachedIndexGroup.getIndexGroup(cfs);
1079+
assertThat(saiGroup.sstableContextManager().size()).isEqualTo(0);
1080+
}
1081+
finally
1082+
{
1083+
pauseSAIWriterComplete.disable();
1084+
}
1085+
}
1086+
9631087
@Test
9641088
public void verifyRebuildCorruptedFiles() throws Throwable
9651089
{

test/unit/org/apache/cassandra/index/sai/disk/v1/SegmentFlushTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private void testFlushBetweenRowIds(long sstableRowId1, long sstableRowId2, int
155155
MockSchema.newCFS("ks"));
156156

157157
IndexComponents.ForWrite components = indexDescriptor.newPerIndexComponentsForWrite(indexContext);
158-
SSTableIndexWriter writer = new SSTableIndexWriter(components, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER, () -> true, 2);
158+
SSTableIndexWriter writer = new SSTableIndexWriter(components, V1OnDiskFormat.SEGMENT_BUILD_MEMORY_LIMITER, () -> false, () -> false, 2);
159159

160160
List<DecoratedKey> keys = Arrays.asList(dk("1"), dk("2"));
161161
Collections.sort(keys);

0 commit comments

Comments
 (0)