Skip to content

Commit 27dbb77

Browse files
committed
CNDB-14725: add SSTableFlushObserver#onSSTableWriterSwitched to flush SAI segment builder for written shards without waiting for entire transaction to complete (#1859)
This reduces memory usage during sharded compaction. riptano/cndb#14725 OOM during SAI compaction with large num of shards Flush segment builder when sstable writer is switched to free memory without waiting full compaction to complete
1 parent 1200040 commit 27dbb77

File tree

12 files changed

+407
-3
lines changed

12 files changed

+407
-3
lines changed

src/java/org/apache/cassandra/index/sai/disk/PerColumnIndexWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ public interface PerColumnIndexWriter
4141
*/
4242
void complete(Stopwatch stopwatch) throws IOException;
4343

44+
/**
45+
* Called when current SSTable writer is switched during sharded compaction to free any in-memory resources associated
46+
* with the SSTable for current index without waiting for full transaction to complete
47+
*/
48+
void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException;
49+
4450
/**
4551
* Aborts accumulating data. Allows to clean up resources on error.
4652
* <p>

src/java/org/apache/cassandra/index/sai/disk/StorageAttachedIndexWriter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ public void staticRow(Row staticRow)
164164
}
165165
}
166166

167+
@Override
168+
public void onSSTableWriterSwitched()
169+
{
170+
if (aborted) return;
171+
172+
try
173+
{
174+
for (PerColumnIndexWriter w : perIndexWriters)
175+
{
176+
w.onSSTableWriterSwitched(stopwatch);
177+
}
178+
}
179+
catch (Throwable t)
180+
{
181+
logger.error(indexDescriptor.logMessage("Failed to flush segment on sstable writer switched"), t);
182+
abort(t, true);
183+
}
184+
}
185+
167186
@Override
168187
public void complete()
169188
{

src/java/org/apache/cassandra/index/sai/disk/v1/MemtableIndexWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ public void complete(Stopwatch stopwatch) throws IOException
166166
}
167167
}
168168

169+
@Override
170+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
171+
{
172+
// no-op for memtable index where all terms are already inside memory index, we can't get rid of memory index
173+
// until full flush are completed
174+
}
175+
169176
private long flush(MemtableTermsIterator terms) throws IOException
170177
{
171178
SegmentWriter writer = indexTermType.isLiteral() ? new LiteralIndexWriter(indexDescriptor, indexIdentifier)

src/java/org/apache/cassandra/index/sai/disk/v1/SSTableIndexWriter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ public void addRow(PrimaryKey key, Row row, long sstableRowId) throws IOExceptio
100100
}
101101
}
102102

103+
@Override
104+
public void onSSTableWriterSwitched(Stopwatch stopwatch) throws IOException
105+
{
106+
if (maybeAbort())
107+
return;
108+
109+
boolean emptySegment = currentBuilder == null || currentBuilder.isEmpty();
110+
logger.debug(index.identifier().logMessage("Flushing index with {}buffered data on SSTable writer switched..."), emptySegment ? "no " : "");
111+
if (!emptySegment)
112+
flushSegment();
113+
}
114+
103115
@Override
104116
public void complete(Stopwatch stopwatch) throws IOException
105117
{

src/java/org/apache/cassandra/index/sai/disk/v1/segment/SegmentBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class SegmentBuilder
4646
public static final long LAST_VALID_SEGMENT_ROW_ID = (Integer.MAX_VALUE / 2) - 1L;
4747
private static long testLastValidSegmentRowId = -1;
4848

49-
/** The number of column indexes being built globally. (Starts at one to avoid divide by zero.) */
49+
/** The number of column indexes being built globally. */
5050
private static final AtomicInteger ACTIVE_BUILDER_COUNT = new AtomicInteger(0);
5151

5252
/** Minimum flush size, dynamically updated as segment builds are started and completed/aborted. */

src/java/org/apache/cassandra/io/sstable/SSTableFlushObserver.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public interface SSTableFlushObserver
6868
*/
6969
void complete();
7070

71+
/**
72+
* Called when current sstable writer is switched during sharded compaction to free any in-memory resources associated
73+
* with the sstable without waiting for full transaction to complete
74+
*/
75+
default void onSSTableWriterSwitched() {}
76+
7177
/**
7278
* Clean up resources on error. There should be no side effects if called multiple times.
7379
*/

src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ public void switchWriter(SSTableWriter newWriter)
282282

283283
currentlyOpenedEarlyAt = 0;
284284
bytesWritten += writer.getFilePointer();
285+
writer.onSSTableWriterSwitched();
285286
writer = newWriter;
286287
}
287288

src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ public final void prepareToCommit()
282282
txnProxy.prepareToCommit();
283283
}
284284

285+
// notify sstable flush observer about sstable writer switched
286+
public final void onSSTableWriterSwitched()
287+
{
288+
observers.forEach(SSTableFlushObserver::onSSTableWriterSwitched);
289+
}
290+
285291
public final Throwable commit(Throwable accumulate)
286292
{
287293
try
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.cql3.validation;
20+
21+
import org.junit.Test;
22+
23+
import org.apache.cassandra.cql3.CQLTester;
24+
25+
/**
26+
* Test UDT (User Defined Type) field queries in Cassandra.
27+
*
28+
* This test demonstrates how to:
29+
* 1. Create UDTs with multiple fields
30+
* 2. Select specific UDT fields
31+
* 3. Query UDT fields in WHERE clauses
32+
* 4. Use UDT fields with different operators
33+
*/
34+
public class UDTFieldQueryTest extends CQLTester
35+
{
36+
@Test
37+
public void testUDTFieldSelection() throws Throwable
38+
{
39+
// Create a UDT with multiple fields
40+
String personType = createType("CREATE TYPE %s (name text, age int, city text, active boolean)");
41+
String personTypeName = KEYSPACE + '.' + personType;
42+
43+
// Create table with UDT column
44+
createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")");
45+
46+
// Insert data
47+
execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, city: 'New York', active: true})");
48+
execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25, city: 'Boston', active: false})");
49+
execute("INSERT INTO %s (id, person) VALUES (3, {name: 'Bob', age: 35, city: 'Chicago', active: true})");
50+
51+
// Test selecting specific UDT fields
52+
assertRows(execute("SELECT person.name FROM %s WHERE id = 1"), row("John"));
53+
assertRows(execute("SELECT person.age FROM %s WHERE id = 2"), row(25));
54+
assertRows(execute("SELECT person.city FROM %s WHERE id = 3"), row("Chicago"));
55+
assertRows(execute("SELECT person.active FROM %s WHERE id = 1"), row(true));
56+
57+
// Test selecting multiple UDT fields
58+
assertRows(execute("SELECT person.name, person.age FROM %s WHERE id = 1"),
59+
row("John", 30));
60+
61+
// Test selecting all UDT fields
62+
assertRows(execute("SELECT person FROM %s WHERE id = 1"),
63+
row(userType("name", "John", "age", 30, "city", "New York", "active", true)));
64+
}
65+
66+
@Test
67+
public void testUDTFieldQueries() throws Throwable
68+
{
69+
// Create a UDT with multiple fields
70+
String personType = createType("CREATE TYPE %s (name text, age int, city text, active boolean)");
71+
String personTypeName = KEYSPACE + '.' + personType;
72+
73+
// Create table with UDT column
74+
createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")");
75+
76+
// Insert data
77+
execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, city: 'New York', active: true})");
78+
execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25, city: 'Boston', active: false})");
79+
execute("INSERT INTO %s (id, person) VALUES (3, {name: 'Bob', age: 35, city: 'Chicago', active: true})");
80+
execute("INSERT INTO %s (id, person) VALUES (4, {name: 'Alice', age: 28, city: 'San Francisco', active: true})");
81+
82+
// Test equality queries on UDT fields
83+
assertRows(execute("SELECT id FROM %s WHERE person.name = 'John' ALLOW FILTERING"),
84+
row(1));
85+
assertRows(execute("SELECT id FROM %s WHERE person.age = 25 ALLOW FILTERING"),
86+
row(2));
87+
assertRows(execute("SELECT id FROM %s WHERE person.city = 'Chicago' ALLOW FILTERING"),
88+
row(3));
89+
assertRows(execute("SELECT id FROM %s WHERE person.active = true ALLOW FILTERING"),
90+
row(1), row(3), row(4));
91+
92+
// Test range queries on UDT fields
93+
assertRows(execute("SELECT id FROM %s WHERE person.age > 30 ALLOW FILTERING"),
94+
row(3));
95+
assertRows(execute("SELECT id FROM %s WHERE person.age >= 25 ALLOW FILTERING"),
96+
row(1), row(2), row(3), row(4));
97+
assertRows(execute("SELECT id FROM %s WHERE person.age < 30 ALLOW FILTERING"),
98+
row(2), row(4));
99+
100+
// Test IN queries on UDT fields
101+
assertRows(execute("SELECT id FROM %s WHERE person.age IN (25, 30, 35) ALLOW FILTERING"),
102+
row(1), row(2), row(3));
103+
assertRows(execute("SELECT id FROM %s WHERE person.city IN ('New York', 'Boston') ALLOW FILTERING"),
104+
row(1), row(2));
105+
106+
// Test multiple conditions on UDT fields
107+
assertRows(execute("SELECT id FROM %s WHERE person.age > 25 AND person.active = true ALLOW FILTERING"),
108+
row(1), row(3), row(4));
109+
assertRows(execute("SELECT id FROM %s WHERE person.age >= 30 AND person.city = 'New York' ALLOW FILTERING"),
110+
row(1));
111+
}
112+
113+
@Test
114+
public void testUDTFieldQueriesWithNullValues() throws Throwable
115+
{
116+
// Create a UDT with nullable fields
117+
String personType = createType("CREATE TYPE %s (name text, age int, city text)");
118+
String personTypeName = KEYSPACE + '.' + personType;
119+
120+
// Create table with UDT column
121+
createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")");
122+
123+
// Insert data with null values
124+
execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, city: null})");
125+
execute("INSERT INTO %s (id, person) VALUES (2, {name: null, age: 25, city: 'Boston'})");
126+
execute("INSERT INTO %s (id, person) VALUES (3, {name: 'Bob', age: null, city: 'Chicago'})");
127+
128+
// Test queries with null values
129+
assertRows(execute("SELECT id FROM %s WHERE person.name = null ALLOW FILTERING"),
130+
row(2));
131+
assertRows(execute("SELECT id FROM %s WHERE person.city = null ALLOW FILTERING"),
132+
row(1));
133+
assertRows(execute("SELECT id FROM %s WHERE person.age = null ALLOW FILTERING"),
134+
row(3));
135+
136+
// Test queries with non-null values
137+
assertRows(execute("SELECT id FROM %s WHERE person.name != null ALLOW FILTERING"),
138+
row(1), row(3));
139+
assertRows(execute("SELECT id FROM %s WHERE person.city != null ALLOW FILTERING"),
140+
row(2), row(3));
141+
}
142+
143+
@Test
144+
public void testUDTFieldQueriesWithFrozenUDT() throws Throwable
145+
{
146+
// Create a UDT
147+
String personType = createType("CREATE TYPE %s (name text, age int)");
148+
String personTypeName = KEYSPACE + '.' + personType;
149+
150+
// Create table with frozen UDT column
151+
createTable("CREATE TABLE %s (id int PRIMARY KEY, person frozen<" + personTypeName + ">)");
152+
153+
// Insert data
154+
execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30})");
155+
execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25})");
156+
157+
// Test UDT field selection with frozen UDT
158+
assertRows(execute("SELECT person.name FROM %s WHERE id = 1"), row("John"));
159+
assertRows(execute("SELECT person.age FROM %s WHERE id = 2"), row(25));
160+
161+
// Test UDT field queries with frozen UDT
162+
assertRows(execute("SELECT id FROM %s WHERE person.name = 'John' ALLOW FILTERING"),
163+
row(1));
164+
assertRows(execute("SELECT id FROM %s WHERE person.age > 25 ALLOW FILTERING"),
165+
row(1));
166+
}
167+
168+
@Test
169+
public void testUDTFieldQueriesWithComplexUDT() throws Throwable
170+
{
171+
// Create nested UDTs
172+
String addressType = createType("CREATE TYPE %s (street text, city text, zip text)");
173+
String addressTypeName = KEYSPACE + '.' + addressType;
174+
175+
String personType = createType("CREATE TYPE %s (name text, age int, address " + addressTypeName + ")");
176+
String personTypeName = KEYSPACE + '.' + personType;
177+
178+
// Create table with complex UDT
179+
createTable("CREATE TABLE %s (id int PRIMARY KEY, person " + personTypeName + ")");
180+
181+
// Insert data with nested UDT
182+
execute("INSERT INTO %s (id, person) VALUES (1, {name: 'John', age: 30, address: {street: '123 Main St', city: 'New York', zip: '10001'}})");
183+
execute("INSERT INTO %s (id, person) VALUES (2, {name: 'Jane', age: 25, address: {street: '456 Oak Ave', city: 'Boston', zip: '02101'}})");
184+
185+
// Test nested UDT field selection
186+
assertRows(execute("SELECT person.address.city FROM %s WHERE id = 1"), row("New York"));
187+
assertRows(execute("SELECT person.address.street FROM %s WHERE id = 2"), row("456 Oak Ave"));
188+
189+
// Test nested UDT field queries
190+
assertRows(execute("SELECT id FROM %s WHERE person.address.city = 'New York' ALLOW FILTERING"),
191+
row(1));
192+
assertRows(execute("SELECT id FROM %s WHERE person.address.zip = '02101' ALLOW FILTERING"),
193+
row(2));
194+
195+
// Test combining nested UDT field queries with regular UDT field queries
196+
assertRows(execute("SELECT id FROM %s WHERE person.age > 25 AND person.address.city = 'New York' ALLOW FILTERING"),
197+
row(1));
198+
}
199+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.cql3.validation.operations;
20+
21+
import org.junit.Test;
22+
23+
import org.apache.cassandra.cql3.CQLTester;
24+
25+
public class UDTParsingTest extends CQLTester
26+
{
27+
@Test
28+
public void testUDTFieldAccessParsing() throws Throwable
29+
{
30+
// Create the UDT type
31+
execute("CREATE TYPE IF NOT EXISTS ks1.udt_type (f0 smallint)");
32+
33+
// Create table with UDT column
34+
execute("CREATE TABLE IF NOT EXISTS ks1.tbl (" +
35+
"pk0 time, " +
36+
"pk1 float, " +
37+
"ck0 text, " +
38+
"v0 frozen<udt_type>, " +
39+
"PRIMARY KEY ((pk0, pk1), ck0)" +
40+
")");
41+
42+
// Test the problematic query
43+
try {
44+
execute("SELECT v0.f0 FROM ks1.tbl PER PARTITION LIMIT 817 LIMIT 134 ALLOW FILTERING");
45+
} catch (Exception e) {
46+
System.err.println("=== UDT PARSING ERROR ===");
47+
System.err.println("Error: " + e.getMessage());
48+
System.err.println("Exception type: " + e.getClass().getName());
49+
e.printStackTrace();
50+
System.err.println("=== END ERROR ===");
51+
throw e;
52+
}
53+
}
54+
55+
@Test
56+
public void testUDTFieldAccessInWhereClause() throws Throwable
57+
{
58+
// Create the UDT type
59+
execute("CREATE TYPE IF NOT EXISTS ks1.udt_type (f0 smallint)");
60+
61+
// Create table with UDT column
62+
execute("CREATE TABLE IF NOT EXISTS ks1.tbl (" +
63+
"pk0 time, " +
64+
"pk1 float, " +
65+
"ck0 text, " +
66+
"v0 frozen<udt_type>, " +
67+
"PRIMARY KEY ((pk0, pk1), ck0)" +
68+
")");
69+
70+
// Test UDT field access in WHERE clause
71+
try {
72+
execute("SELECT * FROM ks1.tbl WHERE v0.f0 = 123");
73+
} catch (Exception e) {
74+
System.err.println("=== UDT WHERE CLAUSE ERROR ===");
75+
System.err.println("Error: " + e.getMessage());
76+
System.err.println("Exception type: " + e.getClass().getName());
77+
e.printStackTrace();
78+
System.err.println("=== END ERROR ===");
79+
throw e;
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)