Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public interface PerColumnIndexWriter
*/
void complete(Stopwatch stopwatch) throws IOException;

/**
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
* Called when current SSTable writer is switched during sharded compaction to free any in-memory resources associated

* with the sstable for current index without waiting for full transaction to complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* with the sstable for current index without waiting for full transaction to complete
* with the SSTable for current index without waiting for full transaction to complete

*/
void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException;

/**
* Aborts accumulating data. Allows to clean up resources on error.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,27 @@ public void staticRow(Row staticRow)
}
}

@Override
public void onSSTableWriterSwitched()
{
if (aborted) return;

try
{
for (PerColumnIndexWriter w : perIndexWriters)
{
w.onSSTableWriterSwitched(stopwatch);
}
}
catch (Throwable t)
{
logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t);
abort(t, true);
// fail compaction task or index build task if SAI failed
throw Throwables.unchecked(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary. When abort() is called with fromIndex == true, as we do here, the exception will be thrown at the end of abort().

}
}

@Override
public void complete()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public void complete(Stopwatch stopwatch) throws IOException
}
}

@Override
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
{
// no-op for memtable index where all terms are already inside memory index, we can't get rid of memory index
// until full flush are completed
}

private long flush(MemtableTermsIterator terms) throws IOException
{
SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
}
}

@Override
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
{
if (maybeAbort())
return;

boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
logger.debug("Flushing index {} with {}buffered data on sstable writer switched...", index.identifier().indexName, emptySegment ? "no " : "");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the standard format...

Suggested change
logger.debug("Flushing index {} with {}buffered data on sstable writer switched...", index.identifier().indexName, emptySegment ? "no " : "");
logger.debug(index.identifier().logMessage("Flushing index with {}buffered data on SSTable writer switched..."), emptySegment ? "no " : "");

if (!emptySegment)
flushSegment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we don't have to aggressively create a new builder here because addTerm(), complete(), and abort() will all handle the currentBuilder == null case appropriately.

}

@Override
public void complete(Stopwatch stopwatch) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class SegmentBuilder
private static long testLastValidSegmentRowId = -1;

/** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0);
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(1);
Copy link
Contributor

@maedhroz maedhroz Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the DS fork, we actually do set this to 1, but here in OSS, we start at zero and ensure safety by making sure the access and increment/decrement operations happen in the correct order below. I would revert this and remove the misleading field comment. However, that does mean you'll probably get a failure in CompactionTest#testSegmentBuilderFlushWithShardedCompaction(), which assumes we initialize to 1 (and this is correct in the DS fork, of course). That should be easy enough to fix in the test itself...


/** Minimum flush size, dynamically updated as segment builds are started and completed/aborted. */
private static volatile long minimumFlushBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public interface SSTableFlushObserver
*/
void complete();

/**
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
* with the sstable without waiting for full transaction to complete
*/
default void onSSTableWriterSwitched() {}

/**
* Clean up resources on error. There should be no side effects if called multiple times.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public void switchWriter(SSTableWriter newWriter)

currentlyOpenedEarlyAt = 0;
bytesWritten += writer.getFilePointer();
writer.onSSTableWriterSwitched();
writer = newWriter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ public final void prepareToCommit()
txnProxy.prepareToCommit();
}

// notify sstable flush observer about sstable writer switched
public final void onSSTableWriterSwitched()
{
observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched);
}

public final Throwable commit(Throwable accumulate)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import org.junit.Assert;
Expand All @@ -42,6 +46,7 @@
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.index.sai.utils.IndexIdentifier;
import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter;
import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder;
import org.apache.cassandra.index.sai.utils.IndexTermType;
import org.apache.cassandra.inject.ActionBuilder;
import org.apache.cassandra.inject.Expression;
Expand All @@ -57,10 +62,12 @@
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Refs;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -352,4 +359,53 @@ public void testConcurrentIndexDropWithCompaction() throws Throwable
.isInstanceOf(InvalidQueryException.class)
.hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE);
}

@Test
public void testSegmentBuilderFlushWithShardedCompaction() throws Throwable
{
int shards = 64;
String createTable = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " +
"{'class' : 'UnifiedCompactionStrategy', 'enabled' : false, 'base_shard_count': " + shards + ", 'min_sstable_size': '1KiB' }";
createTable(createTable);
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1"));
createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2"));
disableCompaction(keyspace(), currentTable());

int rowsPerSSTable = 2000;
int numSSTables = 4;
int key = 0;
for (int s = 0; s < numSSTables; s++)
{
for (int i = 0; i < rowsPerSSTable; i++)
{
execute("INSERT INTO %s (id1, v1, v2) VALUES (?, 0, '01e2wefnewirui32e21e21wre')", Integer.toString(key++));
}
flush();
}

ExecutorService executor = Executors.newSingleThreadExecutor();
try
{
Future<?> future = executor.submit(() -> {
getCurrentColumnFamilyStore().forceMajorCompaction(false, 1);
waitForCompactions();
});

// verify that it's not accumulating segment builders
while (!future.isDone())
{
// ACTIVE_BUILDER_COUNT starts from 1. There are 2 segments for 2 indexes
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(1).isLessThanOrEqualTo(3);
Copy link
Contributor

@maedhroz maedhroz Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we revert the ACTIVE_BUILDER_COUNT thing above, this becomes...

// ACTIVE_BUILDER_COUNT starts from 0. There are 2 segments for 2 indexes
assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(2);

(Suggested by Claude, but I think it's correct.)

}
future.get(30, TimeUnit.SECONDS);

// verify results are sharded
assertThat(getCurrentColumnFamilyStore().getLiveSSTables()).hasSize(shards);
}
finally
{
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If this times out, we might want to fail the test?

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.junit.Assert;
import static org.junit.Assert.assertFalse;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;


import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
Expand Down Expand Up @@ -73,6 +75,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class SSTableFlushObserverTest
{

Expand Down Expand Up @@ -167,14 +170,15 @@ public void testFlushObserver() throws Exception
BufferCell.live(getColumn(cfm, "height"), now, LongType.instance.decompose(178L))));

writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));

writer.onSSTableWriterSwitched();
reader = writer.finish(true);
}
finally
{
FileUtils.closeQuietly(writer);
}


Assert.assertTrue(observer.isWriterSwitched);
Assert.assertTrue(observer.isComplete);
Assert.assertEquals(expected.size(), observer.rows.size());

Expand Down Expand Up @@ -265,6 +269,7 @@ private static class FlushObserver implements SSTableFlushObserver
private boolean beginCalled;
private boolean failOnBegin;
private boolean abortCalled;
private boolean isWriterSwitched;

@Override
public void begin()
Expand All @@ -274,6 +279,12 @@ public void begin()
throw new RuntimeException("Failed to initialize");
}

@Override
public void onSSTableWriterSwitched()
{
isWriterSwitched = true;
}

@Override
public void startPartition(DecoratedKey key, long dataPosition, long indexPosition)
{
Expand Down