Skip to content

Commit 1127854

Browse files
committed
Merge branch 'cassandra-5.0' into trunk
* cassandra-5.0: Flush SAI segment builder when current SSTable writer is switched
2 parents c501c33 + 98e7cd5 commit 1127854

File tree

11 files changed

+126
-3
lines changed

11 files changed

+126
-3
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@
234234
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
235235
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
236236
Merged from 5.0:
237+
* Flush SAI segment builder when current SSTable writer is switched (CASSANDRA-20752)
237238
* Throw RTE instead of FSError when RTE is thrown from FileUtis.write in TOCComponent (CASSANDRA-20917)
238239
* Represent complex settings as JSON on system_views.settings table (CASSANDRA-20827)
239240
* Sort SSTable TOC entries for determinism (CASSANDRA-20494)

src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public interface PerColumnIndexWriter
4141
*/
4242
void complete(Stopwatch stopwatch) throws IOException;
4343

44+
/**
45+
* Called when current SSTable writer is switched during sharded compaction to free any in-memory resources associated
46+
* with the SSTable for current index without waiting for full transaction to complete
47+
*/
48+
void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException;
49+
4450
/**
4551
* Aborts accumulating data. Allows to clean up resources on error.
4652
* <p>

src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ public void staticRow(Row staticRow)
164164
}
165165
}
166166

167+
@Override
168+
public void onSSTableWriterSwitched()
169+
{
170+
if (aborted) return;
171+
172+
try
173+
{
174+
for (PerColumnIndexWriter w : perIndexWriters)
175+
{
176+
w.onSSTableWriterSwitched(stopwatch);
177+
}
178+
}
179+
catch (Throwable t)
180+
{
181+
logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t);
182+
abort(t, true);
183+
}
184+
}
185+
167186
@Override
168187
public void complete()
169188
{

src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ public void complete(Stopwatch stopwatch) throws IOException
166166
}
167167
}
168168

169+
@Override
170+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
171+
{
172+
// no-op for memtable index where all terms are already inside memory index, we can't get rid of memory index
173+
// until full flush are completed
174+
}
175+
169176
private long flush(MemtableTermsIterator terms) throws IOException
170177
{
171178
SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier)

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
100100
}
101101
}
102102

103+
@Override
104+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
105+
{
106+
if (maybeAbort())
107+
return;
108+
109+
boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
110+
logger.debug(index.identifier().logMessage("Flushing index with {}buffered data on SSTable writer switched..."), emptySegment ? "no " : "");
111+
if (!emptySegment)
112+
flushSegment();
113+
}
114+
103115
@Override
104116
public void complete(Stopwatch stopwatch) throws IOException
105117
{

src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class SegmentBuilder
4646
public static final long LAST_VALID_SEGMENT_ROW_ID = (Integer.MAX_VALUE / 2) - 1L;
4747
private static long testLastValidSegmentRowId = -1;
4848

49-
/** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */
49+
/** The number of column indexes being built globally. */
5050
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0);
5151

5252
/** Minimum flush size, dynamically updated as segment builds are started and completed/aborted. */

src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public interface SSTableFlushObserver
6868
*/
6969
void complete();
7070

71+
/**
72+
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
73+
* with the sstable without waiting for full transaction to complete
74+
*/
75+
default void onSSTableWriterSwitched() {}
76+
7177
/**
7278
* Clean up resources on error. There should be no side effects if called multiple times.
7379
*/

src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ public void switchWriter(SSTableWriter newWriter)
282282

283283
currentlyOpenedEarlyAt = 0;
284284
bytesWritten += writer.getFilePointer();
285+
writer.onSSTableWriterSwitched();
285286
writer = newWriter;
286287
}
287288

src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ public final void prepareToCommit()
282282
txnProxy.prepareToCommit();
283283
}
284284

285+
// notify sstable flush observer about sstable writer switched
286+
public final void onSSTableWriterSwitched()
287+
{
288+
observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched);
289+
}
290+
285291
public final Throwable commit(Throwable accumulate)
286292
{
287293
try

test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
2529

2630
import com.google.common.collect.Lists;
2731
import org.junit.Assert;
@@ -42,6 +46,7 @@
4246
import org.apache.cassandra.index.sai.SAITester;
4347
import org.apache.cassandra.index.sai.utils.IndexIdentifier;
4448
import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter;
49+
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
4550
import org.apache.cassandra.index.sai.utils.IndexTermType;
4651
import org.apache.cassandra.inject.ActionBuilder;
4752
import org.apache.cassandra.inject.Expression;
@@ -61,6 +66,7 @@
6166
import org.apache.cassandra.utils.TimeUUID;
6267
import org.apache.cassandra.utils.concurrent.Refs;
6368

69+
import static org.assertj.core.api.Assertions.assertThat;
6470
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6571
import static org.junit.Assert.assertNotEquals;
6672
import static org.junit.Assert.assertNull;
@@ -352,4 +358,53 @@ public void testConcurrentIndexDropWithCompaction() throws Throwable
352358
.isInstanceOf(InvalidQueryException.class)
353359
.hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
354360
}
361+
362+
@Test
363+
public void testSegmentBuilderFlushWithShardedCompaction() throws Throwable
364+
{
365+
int shards = 64;
366+
String createTable = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
367+
"{'class' : 'UnifiedCompactionStrategy', 'enabled' : false, 'base_shard_count': " + shards + ", 'min_sstable_size': '1KiB' }";
368+
createTable(createTable);
369+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
370+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
371+
disableCompaction(keyspace(), currentTable());
372+
373+
int rowsPerSSTable = 2000;
374+
int numSSTables = 4;
375+
int key = 0;
376+
for (int s = 0; s < numSSTables; s++)
377+
{
378+
for (int i = 0; i < rowsPerSSTable; i++)
379+
{
380+
execute("INSERT INTO %s (id1, v1, v2) VALUES (?, 0, '01e2wefnewirui32e21e21wre')", Integer.toString(key++));
381+
}
382+
flush();
383+
}
384+
385+
ExecutorService executor = Executors.newSingleThreadExecutor();
386+
try
387+
{
388+
Future<?> future = executor.submit(() -> {
389+
getCurrentColumnFamilyStore().forceMajorCompaction(false, 1);
390+
waitForCompactions();
391+
});
392+
393+
// verify that it's not accumulating segment builders
394+
while (!future.isDone())
395+
{
396+
// ACTIVE_BUILDER_COUNT starts from 0. There are 2 segments for 2 indexes
397+
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(2);
398+
}
399+
future.get(30, TimeUnit.SECONDS);
400+
401+
// verify results are sharded
402+
assertThat(getCurrentColumnFamilyStore().getLiveSSTables()).hasSize(shards);
403+
}
404+
finally
405+
{
406+
executor.shutdown();
407+
assertThat(executor.awaitTermination(30, TimeUnit.SECONDS)).isTrue();
408+
}
409+
}
355410
}

0 commit comments

Comments
 (0)