diff --git a/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java index 8f1d657515fb..1eaa406c8280 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java @@ -41,6 +41,12 @@ public interface PerColumnIndexWriter */ void complete(Stopwatch stopwatch) throws IOException; + /** + * Called when current SSTable writer is switched during sharded compaction to free any in-memory resources associated + * with the SSTable for current index without waiting for full transaction to complete + */ + void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException; + /** * Aborts accumulating data. Allows to clean up resources on error. *

diff --git a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java index ea1a7d69c76e..88c538ad798d 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java @@ -164,6 +164,25 @@ public void staticRow(Row staticRow) } } + @Override + public void onSSTableWriterSwitched() + { + if (aborted) return; + + try + { + for (PerColumnIndexWriter w : perIndexWriters) + { + w.onSSTableWriterSwitched(stopwatch); + } + } + catch (Throwable t) + { + logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t); + abort(t, true); + } + } + @Override public void complete() { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java index c5f833f4ac16..846f65505bfc 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java @@ -166,6 +166,13 @@ public void complete(Stopwatch stopwatch) throws IOException } } + @Override + public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException + { + // no-op for memtable index where all terms are already inside memory index, we can't get rid of memory index + // until full flush are completed + } + private long flush(MemtableTermsIterator terms) throws IOException { SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier) diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java index cb62c91e1edb..c3c83992b26b 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java @@ -100,6 +100,18 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio } } + @Override + public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException + { + if (maybeAbort()) + return; + + boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty(); + logger.debug(index.identifier().logMessage("Flushing index with {}buffered data on SSTable writer switched..."), emptySegment ? "no " : ""); + if (!emptySegment) + flushSegment(); + } + @Override public void complete(Stopwatch stopwatch) throws IOException { diff --git a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java index fbc0775363cf..d02f14582d53 100644 --- a/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java @@ -46,7 +46,7 @@ public abstract class SegmentBuilder public static final long LAST_VALID_SEGMENT_ROW_ID = (Integer.MAX_VALUE / 2) - 1L; private static long testLastValidSegmentRowId = -1; - /** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */ + /** The number of column indexes being built globally. */ private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0); /** Minimum flush size, dynamically updated as segment builds are started and completed/aborted. */ diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java index 159e0d43bdf2..0f28f62dab31 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java @@ -68,6 +68,12 @@ public interface SSTableFlushObserver */ void complete(); + /** + * Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated + * with the sstable without waiting for full transaction to complete + */ + default void onSSTableWriterSwitched() {} + /** * Clean up resources on error. There should be no side effects if called multiple times. */ diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 1b2c9c03a07d..35b23101902d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -282,6 +282,7 @@ public void switchWriter(SSTableWriter newWriter) currentlyOpenedEarlyAt = 0; bytesWritten += writer.getFilePointer(); + writer.onSSTableWriterSwitched(); writer = newWriter; } diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 113332b10207..073936f62961 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -282,6 +282,12 @@ public final void prepareToCommit() txnProxy.prepareToCommit(); } + // notify sstable flush observer about sstable writer switched + public final void onSSTableWriterSwitched() + { + observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched); + } + public final Throwable commit(Throwable accumulate) { try diff --git a/test/unit/org/apache/cassandra/cql3/validation/UDTFieldQueryTest.java b/test/unit/org/apache/cassandra/cql3/validation/UDTFieldQueryTest.java new file mode 100644 index 000000000000..fd5cd2ce10d3 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/validation/UDTFieldQueryTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.validation; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; + +/** + * Test UDT (User Defined Type) field queries in Cassandra. + * + * This test demonstrates how to: + * 1. Create UDTs with multiple fields + * 2. Select specific UDT fields + * 3. Query UDT fields in WHERE clauses + * 4. Use UDT fields with different operators + */ +public class UDTFieldQueryTest extends CQLTester +{ + @Test + public void testUDTFieldSelection() throws Throwable + { + // Create a UDT with multiple fields + String personType = createType("CREATE TYPE %s (name text, age int, city text, active boolean)"); + String personTypeName = KEYSPACE + '.' + personType; + + // Create table with UDT column + createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")"); + + // Insert data + execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, city: 'New York', active: true})"); + execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25, city: 'Boston', active: false})"); + execute("INSERT INTO %s (id, person) VALUES (3, {name: 'Bob', age: 35, city: 'Chicago', active: true})"); + + // Test selecting specific UDT fields + assertRows(execute("SELECT person.name FROM %s WHERE id = 1"), row("John")); + assertRows(execute("SELECT person.age FROM %s WHERE id = 2"), row(25)); + assertRows(execute("SELECT person.city FROM %s WHERE id = 3"), row("Chicago")); + assertRows(execute("SELECT person.active FROM %s WHERE id = 1"), row(true)); + + // Test selecting multiple UDT fields + assertRows(execute("SELECT person.name, person.age FROM %s WHERE id = 1"), + row("John", 30)); + + // Test selecting all UDT fields + assertRows(execute("SELECT person FROM %s WHERE id = 1"), + row(userType("name", "John", "age", 30, "city", "New York", "active", true))); + } + + @Test + public void testUDTFieldQueries() throws Throwable + { + // Create a UDT with multiple fields + String personType = createType("CREATE TYPE %s (name text, age int, city text, active boolean)"); + String personTypeName = KEYSPACE + '.' + personType; + + // Create table with UDT column + createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")"); + + // Insert data + execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, city: 'New York', active: true})"); + execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25, city: 'Boston', active: false})"); + execute("INSERT INTO %s (id, person) VALUES (3, {name: 'Bob', age: 35, city: 'Chicago', active: true})"); + execute("INSERT INTO %s (id, person) VALUES (4, {name: 'Alice', age: 28, city: 'San Francisco', active: true})"); + + // Test equality queries on UDT fields + assertRows(execute("SELECT id FROM %s WHERE person.name = 'John' ALLOW FILTERING"), + row(1)); + assertRows(execute("SELECT id FROM %s WHERE person.age = 25 ALLOW FILTERING"), + row(2)); + assertRows(execute("SELECT id FROM %s WHERE person.city = 'Chicago' ALLOW FILTERING"), + row(3)); + assertRows(execute("SELECT id FROM %s WHERE person.active = true ALLOW FILTERING"), + row(1), row(3), row(4)); + + // Test range queries on UDT fields + assertRows(execute("SELECT id FROM %s WHERE person.age > 30 ALLOW FILTERING"), + row(3)); + assertRows(execute("SELECT id FROM %s WHERE person.age >= 25 ALLOW FILTERING"), + row(1), row(2), row(3), row(4)); + assertRows(execute("SELECT id FROM %s WHERE person.age < 30 ALLOW FILTERING"), + row(2), row(4)); + + // Test IN queries on UDT fields + assertRows(execute("SELECT id FROM %s WHERE person.age IN (25, 30, 35) ALLOW FILTERING"), + row(1), row(2), row(3)); + assertRows(execute("SELECT id FROM %s WHERE person.city IN ('New York', 'Boston') ALLOW FILTERING"), + row(1), row(2)); + + // Test multiple conditions on UDT fields + assertRows(execute("SELECT id FROM %s WHERE person.age > 25 AND person.active = true ALLOW FILTERING"), + row(1), row(3), row(4)); + assertRows(execute("SELECT id FROM %s WHERE person.age >= 30 AND person.city = 'New York' ALLOW FILTERING"), + row(1)); + } + + @Test + public void testUDTFieldQueriesWithNullValues() throws Throwable + { + // Create a UDT with nullable fields + String personType = createType("CREATE TYPE %s (name text, age int, city text)"); + String personTypeName = KEYSPACE + '.' + personType; + + // Create table with UDT column + createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")"); + + // Insert data with null values + execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, city: null})"); + execute("INSERT INTO %s (id, person) VALUES (2, {name: null, age: 25, city: 'Boston'})"); + execute("INSERT INTO %s (id, person) VALUES (3, {name: 'Bob', age: null, city: 'Chicago'})"); + + // Test queries with null values + assertRows(execute("SELECT id FROM %s WHERE person.name = null ALLOW FILTERING"), + row(2)); + assertRows(execute("SELECT id FROM %s WHERE person.city = null ALLOW FILTERING"), + row(1)); + assertRows(execute("SELECT id FROM %s WHERE person.age = null ALLOW FILTERING"), + row(3)); + + // Test queries with non-null values + assertRows(execute("SELECT id FROM %s WHERE person.name != null ALLOW FILTERING"), + row(1), row(3)); + assertRows(execute("SELECT id FROM %s WHERE person.city != null ALLOW FILTERING"), + row(2), row(3)); + } + + @Test + public void testUDTFieldQueriesWithFrozenUDT() throws Throwable + { + // Create a UDT + String personType = createType("CREATE TYPE %s (name text, age int)"); + String personTypeName = KEYSPACE + '.' + personType; + + // Create table with frozen UDT column + createTable("CREATE TABLE %s (id int PRIMARY KEY, person frozen<" + personTypeName + ">)"); + + // Insert data + execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30})"); + execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25})"); + + // Test UDT field selection with frozen UDT + assertRows(execute("SELECT person.name FROM %s WHERE id = 1"), row("John")); + assertRows(execute("SELECT person.age FROM %s WHERE id = 2"), row(25)); + + // Test UDT field queries with frozen UDT + assertRows(execute("SELECT id FROM %s WHERE person.name = 'John' ALLOW FILTERING"), + row(1)); + assertRows(execute("SELECT id FROM %s WHERE person.age > 25 ALLOW FILTERING"), + row(1)); + } + + @Test + public void testUDTFieldQueriesWithComplexUDT() throws Throwable + { + // Create nested UDTs + String addressType = createType("CREATE TYPE %s (street text, city text, zip text)"); + String addressTypeName = KEYSPACE + '.' + addressType; + + String personType = createType("CREATE TYPE %s (name text, age int, address " + addressTypeName + ")"); + String personTypeName = KEYSPACE + '.' + personType; + + // Create table with complex UDT + createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")"); + + // Insert data with nested UDT + execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, address: {street: '123 Main St', city: 'New York', zip: '10001'}})"); + execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25, address: {street: '456 Oak Ave', city: 'Boston', zip: '02101'}})"); + + // Test nested UDT field selection + assertRows(execute("SELECT person.address.city FROM %s WHERE id = 1"), row("New York")); + assertRows(execute("SELECT person.address.street FROM %s WHERE id = 2"), row("456 Oak Ave")); + + // Test nested UDT field queries + assertRows(execute("SELECT id FROM %s WHERE person.address.city = 'New York' ALLOW FILTERING"), + row(1)); + assertRows(execute("SELECT id FROM %s WHERE person.address.zip = '02101' ALLOW FILTERING"), + row(2)); + + // Test combining nested UDT field queries with regular UDT field queries + assertRows(execute("SELECT id FROM %s WHERE person.age > 25 AND person.address.city = 'New York' ALLOW FILTERING"), + row(1)); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/UDTParsingTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/UDTParsingTest.java new file mode 100644 index 000000000000..065896cafa73 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/UDTParsingTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3.validation.operations; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; + +public class UDTParsingTest extends CQLTester +{ + @Test + public void testUDTFieldAccessParsing() throws Throwable + { + // Create the UDT type + execute("CREATE TYPE IF NOT EXISTS ks1.udt_type (f0 smallint)"); + + // Create table with UDT column + execute("CREATE TABLE IF NOT EXISTS ks1.tbl (" + + "pk0 time, " + + "pk1 float, " + + "ck0 text, " + + "v0 frozen, " + + "PRIMARY KEY ((pk0, pk1), ck0)" + + ")"); + + // Test the problematic query + try { + execute("SELECT v0.f0 FROM ks1.tbl PER PARTITION LIMIT 817 LIMIT 134 ALLOW FILTERING"); + } catch (Exception e) { + System.err.println("=== UDT PARSING ERROR ==="); + System.err.println("Error: " + e.getMessage()); + System.err.println("Exception type: " + e.getClass().getName()); + e.printStackTrace(); + System.err.println("=== END ERROR ==="); + throw e; + } + } + + @Test + public void testUDTFieldAccessInWhereClause() throws Throwable + { + // Create the UDT type + execute("CREATE TYPE IF NOT EXISTS ks1.udt_type (f0 smallint)"); + + // Create table with UDT column + execute("CREATE TABLE IF NOT EXISTS ks1.tbl (" + + "pk0 time, " + + "pk1 float, " + + "ck0 text, " + + "v0 frozen, " + + "PRIMARY KEY ((pk0, pk1), ck0)" + + ")"); + + // Test UDT field access in WHERE clause + try { + execute("SELECT * FROM ks1.tbl WHERE v0.f0 = 123"); + } catch (Exception e) { + System.err.println("=== UDT WHERE CLAUSE ERROR ==="); + System.err.println("Error: " + e.getMessage()); + System.err.println("Exception type: " + e.getClass().getName()); + e.printStackTrace(); + System.err.println("=== END ERROR ==="); + throw e; + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java b/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java index 5b38876de3e5..f8ab3b109cb9 100644 --- a/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java +++ b/test/unit/org/apache/cassandra/index/sai/functional/CompactionTest.java @@ -22,6 +22,10 @@ import java.util.Collection; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.junit.Assert; @@ -42,6 +46,7 @@ import org.apache.cassandra.index.sai.SAITester; import org.apache.cassandra.index.sai.utils.IndexIdentifier; import org.apache.cassandra.index.sai.disk.v1.SSTableIndexWriter; +import org.apache.cassandra.index.sai.disk.v1.segment.SegmentBuilder; import org.apache.cassandra.index.sai.utils.IndexTermType; import org.apache.cassandra.inject.ActionBuilder; import org.apache.cassandra.inject.Expression; @@ -61,6 +66,7 @@ import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.concurrent.Refs; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; @@ -352,4 +358,54 @@ public void testConcurrentIndexDropWithCompaction() throws Throwable .isInstanceOf(InvalidQueryException.class) .hasMessage(StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE); } + + @Test + public void testSegmentBuilderFlushWithShardedCompaction() throws Throwable + { + int shards = 64; + String createTable = "CREATE TABLE %s (id1 TEXT PRIMARY KEY, v1 INT, v2 TEXT) WITH compaction = " + + "{'class' : 'UnifiedCompactionStrategy', 'enabled' : false, 'base_shard_count': " + shards + ", 'min_sstable_size': '1KiB' }"; + createTable(createTable); + createIndex(String.format(CREATE_INDEX_TEMPLATE, "v1")); + createIndex(String.format(CREATE_INDEX_TEMPLATE, "v2")); + disableCompaction(keyspace(), currentTable()); + + int rowsPerSSTable = 2000; + int numSSTables = 4; + int key = 0; + for (int s = 0; s < numSSTables; s++) + { + for (int i = 0; i < rowsPerSSTable; i++) + { + execute("INSERT INTO %s (id1, v1, v2) VALUES (?, 0, '01e2wefnewirui32e21e21wre')", Integer.toString(key++)); + } + flush(); + } + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try + { + Future future = executor.submit(() -> { + getCurrentColumnFamilyStore().forceMajorCompaction(false, 1); + waitForCompactions(); + }); + + // verify that it's not accumulating segment builders + while (!future.isDone()) + { + // ACTIVE_BUILDER_COUNT starts from 0. There are 2 segments for 2 indexes + assertThat(SegmentBuilder.getActiveBuilderCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(2); + } + future.get(30, TimeUnit.SECONDS); + + // verify results are sharded + assertThat(getCurrentColumnFamilyStore().getLiveSSTables()).hasSize(shards); + } + finally + { + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + assertThat(executor.isTerminated()).isTrue(); + } + } } diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java index 4f3ff5a6fe50..f9f46a1b8c2d 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableFlushObserverTest.java @@ -36,6 +36,7 @@ import org.junit.BeforeClass; import org.junit.Test; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; @@ -73,6 +74,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + public class SSTableFlushObserverTest { @@ -167,14 +169,15 @@ public void testFlushObserver() throws Exception BufferCell.live(getColumn(cfm, "height"), now, LongType.instance.decompose(178L)))); writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key))))); - + writer.onSSTableWriterSwitched(); reader = writer.finish(true); } finally { FileUtils.closeQuietly(writer); } - + + Assert.assertTrue(observer.isWriterSwitched); Assert.assertTrue(observer.isComplete); Assert.assertEquals(expected.size(), observer.rows.size()); @@ -265,6 +268,7 @@ private static class FlushObserver implements SSTableFlushObserver private boolean beginCalled; private boolean failOnBegin; private boolean abortCalled; + private boolean isWriterSwitched; @Override public void begin() @@ -274,6 +278,12 @@ public void begin() throw new RuntimeException("Failed to initialize"); } + @Override + public void onSSTableWriterSwitched() + { + isWriterSwitched = true; + } + @Override public void startPartition(DecoratedKey key, long dataPosition, long indexPosition) {