Skip to content

Commit 8474453

Browse files
jasonstackdriftx
authored andcommitted
CNDB-14725: add SSTableFlushObserver#onSSTableWriterSwitched to flush SAI segment builder for written shards without waiting for entire transaction to complete (#1859)
This reduces memory usage during sharded compaction. ### What is the issue riptano/cndb#14725 OOM during SAI compaction with large num of shards ### What does this PR fix and why was it fixed Flush segment builder when sstable writer is switched to free memory without waiting full compaction to complete
1 parent 9e3e639 commit 8474453

File tree

9 files changed

+134
-6
lines changed

9 files changed

+134
-6
lines changed

src/java/org/apache/cassandra/index/sai/disk/PerIndexWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public interface PerIndexWriter
4646
*/
4747
void complete(Stopwatch stopwatch) throws IOException;
4848

49+
/**
50+
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
51+
* with the sstable for current index without waiting for full transaction to complete
52+
*/
53+
void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException;
54+
4955
/**
5056
* Aborts accumulating data. Allows to clean up resources on error.
5157
*

src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,29 @@ public void staticRow(Row staticRow)
184184
}
185185
}
186186

187+
@Override
188+
public void onSSTableWriterSwitched()
189+
{
190+
if (aborted) return;
191+
192+
try
193+
{
194+
long start = Clock.Global.nanoTime();
195+
for (PerIndexWriter w : perIndexWriters)
196+
{
197+
w.onSSTableWriterSwitched(stopwatch);
198+
}
199+
totalTimeSpent += (Clock.Global.nanoTime() - start);
200+
}
201+
catch (Throwable t)
202+
{
203+
logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t);
204+
abort(t, true);
205+
// fail compaction task or index build task if SAI failed
206+
throw Throwables.unchecked(t);
207+
}
208+
}
209+
187210
@Override
188211
public void complete(SSTable sstable)
189212
{

src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ public void abort(Throwable cause)
101101
perIndexComponents.forceDeleteAllComponents();
102102
}
103103

104+
@Override
105+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
106+
{
107+
// no-op for memtable index where all terms are already inside memory index, we can't get rid of memory index
108+
// until full flush are completed
109+
}
110+
104111
@Override
105112
public void complete(Stopwatch stopwatch) throws IOException
106113
{

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,18 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
142142

143143
}
144144

145+
@Override
146+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
147+
{
148+
if (maybeAbort())
149+
return;
150+
151+
boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
152+
logger.debug("Flushing index {} with {}buffered data on sstable writer switched...", indexContext.getIndexName(), emptySegment ? "no " : "");
153+
if (!emptySegment)
154+
flushSegment();
155+
}
156+
145157
@Override
146158
public void complete(Stopwatch stopwatch) throws IOException
147159
{
@@ -322,8 +334,9 @@ private void flushSegment() throws IOException
322334
if (indexContext.getIndexMetrics() != null)
323335
indexContext.getIndexMetrics().compactionSegmentBytesPerSecond.update((long)(segmentBytes / flushMillis * 1000.0));
324336

325-
logger.debug("Flushed segment with {} cells for a total of {} in {} ms",
326-
(long) rowCount, FBUtilities.prettyPrintMemory((long) segmentBytes), flushMillis);
337+
logger.debug("Flushed segment with {} cells for a total of {} in {} ms for index {} with starting row id {} for sstable {}",
338+
(long) rowCount, FBUtilities.prettyPrintMemory((long) segmentBytes), flushMillis, indexContext.getIndexName(),
339+
segmentMetadata.minSSTableRowId, perIndexComponents.descriptor());
327340
}
328341

329342
// Builder memory is released against the limiter at the conclusion of a successful
@@ -402,9 +415,10 @@ else if (indexContext.isLiteral())
402415
}
403416

404417
long globalBytesUsed = limiter.increment(builder.totalBytesAllocated());
405-
logger.debug("Created new segment builder while flushing SSTable {}. Global segment memory usage now at {}",
418+
logger.debug("Created new segment builder while flushing SSTable {}. Global segment memory usage now at {} with {} active segment builders",
406419
perIndexComponents.descriptor(),
407-
FBUtilities.prettyPrintMemory(globalBytesUsed));
420+
FBUtilities.prettyPrintMemory(globalBytesUsed),
421+
SegmentBuilder.ACTIVE_BUILDER_COUNT.get() - 1);
408422

409423
return builder;
410424
}

src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ public interface SSTableFlushObserver
6767
*/
6868
void complete(SSTable ssTable);
6969

70+
/**
71+
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
72+
* with the sstable without waiting for full transaction to complete
73+
*/
74+
default void onSSTableWriterSwitched()
75+
{
76+
}
77+
7078
/**
7179
* Clean up resources on error. There should be no side effects if called multiple times.
7280
*/

src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ public void switchWriter(SSTableWriter newWriter)
306306

307307
currentlyOpenedEarlyAt = 0;
308308
bytesWritten += writer.getFilePointer();
309+
writer.onSSTableWriterSwitched();
309310
writer = newWriter;
310311
}
311312

src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,12 @@ public final void abort()
337337
}
338338
}
339339

340+
// notify sstable flush observer about sstable writer switched
341+
public final void onSSTableWriterSwitched()
342+
{
343+
observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched);
344+
}
345+
340346
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
341347
{
342348
return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),

test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
2529

2630
import javax.management.InstanceNotFoundException;
2731

@@ -46,6 +50,7 @@
4650
import org.apache.cassandra.index.IndexNotAvailableException;
4751
import org.apache.cassandra.index.sai.IndexContext;
4852
import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter;
53+
import org.apache.cassandra.index.sai.disk.v1.SegmentBuilder;
4954
import org.apache.cassandra.index.sai.metrics.AbstractMetricsTest;
5055
import org.apache.cassandra.inject.ActionBuilder;
5156
import org.apache.cassandra.inject.Expression;
@@ -65,12 +70,13 @@
6570
import org.apache.cassandra.utils.TimeUUID;
6671
import org.apache.cassandra.utils.concurrent.Refs;
6772

68-
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
73+
import static org.assertj.core.api.Assertions.assertThat;
6974
import static org.assertj.core.api.Assertions.assertThatThrownBy;
7075
import static org.junit.Assert.assertNotEquals;
7176
import static org.junit.Assert.assertThrows;
7277
import static org.junit.Assert.assertTrue;
7378
import static org.junit.Assert.fail;
79+
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
7480

7581
public class CompactionTest extends AbstractMetricsTest
7682
{
@@ -403,4 +409,53 @@ protected long getDiskUsage()
403409
{
404410
return (long) getMetricValue(objectNameNoIndex("DiskUsedBytes", KEYSPACE, currentTable(), "IndexGroupMetrics"));
405411
}
412+
413+
@Test
414+
public void testSegmentBuilderFlushWithShardedCompaction() throws Throwable
415+
{
416+
int shards = 64;
417+
String createTable = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
418+
"{'class' : 'UnifiedCompactionStrategy', 'enabled' : false, 'base_shard_count': " + shards + ", 'min_sstable_size': '1KiB' }";
419+
createTable(createTable);
420+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
421+
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
422+
disableCompaction(keyspace(), currentTable());
423+
424+
int rowsPerSSTable = 2000;
425+
int numSSTables = 4;
426+
int key = 0;
427+
for (int s = 0; s < numSSTables; s++)
428+
{
429+
for (int i = 0; i < rowsPerSSTable; i++)
430+
{
431+
execute("INSERT INTO %s (id1, v1, v2) VALUES (?, 0, '01e2wefnewirui32e21e21wre')", Integer.toString(key++));
432+
}
433+
flush();
434+
}
435+
436+
ExecutorService executor = Executors.newSingleThreadExecutor();
437+
try
438+
{
439+
Future<?> future = executor.submit(() -> {
440+
getCurrentColumnFamilyStore().forceMajorCompaction(false, 1);
441+
waitForCompactions();
442+
});
443+
444+
// verify that it's not accumulating segment builders
445+
while (!future.isDone())
446+
{
447+
// ACTIVE_BUILDER_COUNT starts from 1. There are 2 segments for 2 indexes
448+
assertThat(SegmentBuilder.ACTIVE_BUILDER_COUNT.get()).isGreaterThanOrEqualTo(1).isLessThanOrEqualTo(3);
449+
}
450+
future.get(30, TimeUnit.SECONDS);
451+
452+
// verify results are sharded
453+
assertThat(getCurrentColumnFamilyStore().getLiveSSTables()).hasSize(shards);
454+
}
455+
finally
456+
{
457+
executor.shutdown();
458+
executor.awaitTermination(30, TimeUnit.SECONDS);
459+
}
460+
}
406461
}

test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,15 @@ public void testFlushObserver() throws Exception
168168
BufferCell.live(getColumn(cfm, "height"), now, LongType.instance.decompose(178L))));
169169

170170
writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));
171-
171+
writer.onSSTableWriterSwitched();
172172
reader = writer.finish(true, null);
173173
}
174174
finally
175175
{
176176
FileUtils.closeQuietly(writer);
177177
}
178178

179+
Assert.assertTrue(observer.isWriterSwitched);
179180
Assert.assertTrue(observer.isComplete);
180181
Assert.assertEquals(expected.size(), observer.rows.size());
181182

@@ -269,6 +270,7 @@ private static class FlushObserver implements SSTableFlushObserver
269270
private boolean beginCalled;
270271
private boolean failOnBegin;
271272
private boolean abortCalled;
273+
private boolean isWriterSwitched;
272274

273275
@Override
274276
public void begin()
@@ -303,6 +305,12 @@ public void staticRow(Row staticRow)
303305
staticRow.forEach((c) -> staticRows.put(currentKey, (Cell<?>) c));
304306
}
305307

308+
@Override
309+
public void onSSTableWriterSwitched()
310+
{
311+
isWriterSwitched = true;
312+
}
313+
306314
@Override
307315
public void abort(Throwable accumulate)
308316
{

0 commit comments

Comments
 (0)