Skip to content

Commit 55d7132

Browse files
committed
CNDB-14725: add SSTableFlushObserver#onSSTableWriterSwitched to flush SAI segment builder for written shards without waiting for entire transaction to complete (#1859)
This reduces memory usage during sharded compaction. riptano/cndb#14725 OOM during SAI compaction with large num of shards Flush segment builder when sstable writer is switched to free memory without waiting full compaction to complete
1 parent 1200040 commit 55d7132

File tree

10 files changed

+128
-3
lines changed

10 files changed

+128
-3
lines changed

src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public interface PerColumnIndexWriter
4141
*/
4242
void complete(Stopwatch stopwatch) throws IOException;
4343

44+
/**
45+
* Called when current SSTable writer is switched during sharded compaction to free any in-memory resources associated
46+
* with the SSTable for current index without waiting for full transaction to complete
47+
*/
48+
void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException;
49+
4450
/**
4551
* Aborts accumulating data. Allows to clean up resources on error.
4652
* <p>

src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ public void staticRow(Row staticRow)
164164
}
165165
}
166166

167+
@Override
168+
public void onSSTableWriterSwitched()
169+
{
170+
if (aborted) return;
171+
172+
try
173+
{
174+
for (PerColumnIndexWriter w : perIndexWriters)
175+
{
176+
w.onSSTableWriterSwitched(stopwatch);
177+
}
178+
}
179+
catch (Throwable t)
180+
{
181+
logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t);
182+
abort(t, true);
183+
}
184+
}
185+
167186
@Override
168187
public void complete()
169188
{

src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ public void complete(Stopwatch stopwatch) throws IOException
166166
}
167167
}
168168

169+
@Override
170+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
171+
{
172+
// no-op for memtable index where all terms are already inside memory index, we can't get rid of memory index
173+
// until full flush are completed
174+
}
175+
169176
private long flush(MemtableTermsIterator terms) throws IOException
170177
{
171178
SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier)

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
100100
}
101101
}
102102

103+
@Override
104+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
105+
{
106+
if (maybeAbort())
107+
return;
108+
109+
boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
110+
logger.debug(index.identifier().logMessage("Flushing index with {}buffered data on SSTable writer switched..."), emptySegment ? "no " : "");
111+
if (!emptySegment)
112+
flushSegment();
113+
}
114+
103115
@Override
104116
public void complete(Stopwatch stopwatch) throws IOException
105117
{

src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class SegmentBuilder
4646
public static final long LAST_VALID_SEGMENT_ROW_ID = (Integer.MAX_VALUE / 2) - 1L;
4747
private static long testLastValidSegmentRowId = -1;
4848

49-
/** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */
49+
/** The number of column indexes being built globally. */
5050
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0);
5151

5252
/** Minimum flush size, dynamically updated as segment builds are started and completed/aborted. */

src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public interface SSTableFlushObserver
6868
*/
6969
void complete();
7070

71+
/**
72+
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
73+
* with the sstable without waiting for full transaction to complete
74+
*/
75+
default void onSSTableWriterSwitched() {}
76+
7177
/**
7278
* Clean up resources on error. There should be no side effects if called multiple times.
7379
*/

src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ public void switchWriter(SSTableWriter newWriter)
282282

283283
currentlyOpenedEarlyAt = 0;
284284
bytesWritten += writer.getFilePointer();
285+
writer.onSSTableWriterSwitched();
285286
writer = newWriter;
286287
}
287288

src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ public final void prepareToCommit()
282282
txnProxy.prepareToCommit();
283283
}
284284

285+
// notify sstable flush observer about sstable writer switched
286+
public final void onSSTableWriterSwitched()
287+
{
288+
observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched);
289+
}
290+
285291
public final Throwable commit(Throwable accumulate)
286292
{
287293
try

test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
2529

2630
import com.google.common.collect.Lists;
2731
import org.junit.Assert;
@@ -42,6 +46,7 @@
4246
import org.apache.cassandra.index.sai.SAITester;
4347
import org.apache.cassandra.index.sai.utils.IndexIdentifier;
4448
import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter;
49+
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
4550
import org.apache.cassandra.index.sai.utils.IndexTermType;
4651
import org.apache.cassandra.inject.ActionBuilder;
4752
import org.apache.cassandra.inject.Expression;
@@ -57,10 +62,12 @@
5762
import org.apache.cassandra.service.ActiveRepairService;
5863
import org.apache.cassandra.streaming.PreviewKind;
5964
import org.apache.cassandra.utils.ByteBufferUtil;
65+
import org.apache.cassandra.utils.FBUtilities;
6066
import org.apache.cassandra.utils.Throwables;
6167
import org.apache.cassandra.utils.TimeUUID;
6268
import org.apache.cassandra.utils.concurrent.Refs;
6369

70+
import static org.assertj.core.api.Assertions.assertThat;
6471
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6572
import static org.junit.Assert.assertNotEquals;
6673
import static org.junit.Assert.assertNull;
@@ -352,4 +359,54 @@ public void testConcurrentIndexDropWithCompaction() throws Throwable
352359
.isInstanceOf(InvalidQueryException.class)
353360
.hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
354361
}
362+
363+
@Test
364+
public void testSegmentBuilderFlushWithShardedCompaction() throws Throwable
365+
{
366+
int shards = 64;
367+
String createTable = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
368+
"{'class' : 'UnifiedCompactionStrategy', 'enabled' : false, 'base_shard_count': " + shards + ", 'min_sstable_size': '1KiB' }";
369+
createTable(createTable);
370+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
371+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
372+
disableCompaction(keyspace(), currentTable());
373+
374+
int rowsPerSSTable = 2000;
375+
int numSSTables = 4;
376+
int key = 0;
377+
for (int s = 0; s < numSSTables; s++)
378+
{
379+
for (int i = 0; i < rowsPerSSTable; i++)
380+
{
381+
execute("INSERT INTO %s (id1, v1, v2) VALUES (?, 0, '01e2wefnewirui32e21e21wre')", Integer.toString(key++));
382+
}
383+
flush();
384+
}
385+
386+
ExecutorService executor = Executors.newSingleThreadExecutor();
387+
try
388+
{
389+
Future<?> future = executor.submit(() -> {
390+
getCurrentColumnFamilyStore().forceMajorCompaction(false, 1);
391+
waitForCompactions();
392+
});
393+
394+
// verify that it's not accumulating segment builders
395+
while (!future.isDone())
396+
{
397+
// ACTIVE_BUILDER_COUNT starts from 0. There are 2 segments for 2 indexes
398+
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(2);
399+
}
400+
future.get(30, TimeUnit.SECONDS);
401+
402+
// verify results are sharded
403+
assertThat(getCurrentColumnFamilyStore().getLiveSSTables()).hasSize(shards);
404+
}
405+
finally
406+
{
407+
executor.shutdown();
408+
executor.awaitTermination(30, TimeUnit.SECONDS);
409+
assertThat(executor.isTerminated()).isTrue();
410+
}
411+
}
355412
}

test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232
import org.apache.commons.lang3.tuple.ImmutableTriple;
3333
import org.apache.commons.lang3.tuple.Triple;
3434
import org.junit.Assert;
35+
import static org.junit.Assert.assertFalse;
3536
import org.junit.Before;
3637
import org.junit.BeforeClass;
3738
import org.junit.Test;
3839

40+
3941
import org.apache.cassandra.config.DatabaseDescriptor;
4042
import org.apache.cassandra.db.Clustering;
4143
import org.apache.cassandra.db.DecoratedKey;
@@ -73,6 +75,7 @@
7375
import static org.mockito.Mockito.mock;
7476
import static org.mockito.Mockito.when;
7577

78+
7679
public class SSTableFlushObserverTest
7780
{
7881

@@ -167,14 +170,15 @@ public void testFlushObserver() throws Exception
167170
BufferCell.live(getColumn(cfm, "height"), now, LongType.instance.decompose(178L))));
168171

169172
writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));
170-
173+
writer.onSSTableWriterSwitched();
171174
reader = writer.finish(true);
172175
}
173176
finally
174177
{
175178
FileUtils.closeQuietly(writer);
176179
}
177-
180+
181+
Assert.assertTrue(observer.isWriterSwitched);
178182
Assert.assertTrue(observer.isComplete);
179183
Assert.assertEquals(expected.size(), observer.rows.size());
180184

@@ -265,6 +269,7 @@ private static class FlushObserver implements SSTableFlushObserver
265269
private boolean beginCalled;
266270
private boolean failOnBegin;
267271
private boolean abortCalled;
272+
private boolean isWriterSwitched;
268273

269274
@Override
270275
public void begin()
@@ -274,6 +279,12 @@ public void begin()
274279
throw new RuntimeException("Failed to initialize");
275280
}
276281

282+
@Override
283+
public void onSSTableWriterSwitched()
284+
{
285+
isWriterSwitched = true;
286+
}
287+
277288
@Override
278289
public void startPartition(DecoratedKey key, long dataPosition, long indexPosition)
279290
{

0 commit comments

Comments
 (0)