Skip to content

Commit 74de577

Browse files
Merge branch 'cassandra-5.0' into trunk
* cassandra-5.0: ninja-fix – Fix eclipse-warnings error for CASSANDRA-19564 ReadCommandController should close fast to avoid deadlock when building secondary index
2 parents e267b2c + d0c5f6b commit 74de577

File tree

4 files changed

+82
-5
lines changed

4 files changed

+82
-5
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ Merged from 5.0:
309309
* Prioritize built indexes in IndexStatusManager (CASSANDRA-19400)
310310
* Add java.base/java.lang.reflect among opens for jvm11-client.options (CASSANDRA-19780)
311311
Merged from 4.1:
312+
* ReadCommandController should close fast to avoid deadlock when building secondary index (CASSANDRA-19564)
312313
* Redact security-sensitive information in system_views.settings (CASSANDRA-20856)
313314
* Improve CommitLogSegmentReader to skip SyncBlocks correctly in case of CRC errors (CASSANDRA-20664)
314315
* Do not crash on first boot with data_disk_usage_max_disk_size set when data directory is not created yet (CASSANDRA-20787)

src/java/org/apache/cassandra/index/SecondaryIndexManager.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.cassandra.db.lifecycle.SSTableSet;
6262
import org.apache.cassandra.db.lifecycle.View;
6363
import org.apache.cassandra.db.memtable.Memtable;
64+
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
6465
import org.apache.cassandra.db.partitions.PartitionUpdate;
6566
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
6667
import org.apache.cassandra.db.rows.*;
@@ -1064,14 +1065,22 @@ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize, R
10641065
SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT);
10651066
while (!pager.isExhausted())
10661067
{
1068+
@SuppressWarnings("resource")
1069+
UnfilteredRowIterator partition;
10671070
try (ReadExecutionController controller = cmd.executionController();
1068-
WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing();
10691071
UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
10701072
{
10711073
if (!page.hasNext())
10721074
break;
10731075

1074-
try (UnfilteredRowIterator partition = page.next())
1076+
try (UnfilteredRowIterator onePartition = page.next())
1077+
{
1078+
partition = ImmutableBTreePartition.create(onePartition).unfilteredIterator();
1079+
}
1080+
}
1081+
1082+
try (WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing())
1083+
{
10751084
{
10761085
Set<Index.Indexer> indexers = new HashSet<>(indexGroups.size());
10771086

@@ -1135,6 +1144,13 @@ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize, R
11351144
indexers.forEach(Index.Indexer::finish);
11361145
}
11371146
}
1147+
finally
1148+
{
1149+
if (partition != null)
1150+
{
1151+
partition.close();
1152+
}
1153+
}
11381154
}
11391155
}
11401156
}

test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,29 +23,40 @@
2323
import java.util.Arrays;
2424
import java.util.Collections;
2525
import java.util.List;
26+
import java.util.Random;
2627
import java.util.UUID;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.Future;
2731
import java.util.concurrent.TimeUnit;
2832
import java.util.concurrent.atomic.AtomicInteger;
2933
import java.util.regex.Pattern;
3034
import java.util.stream.Collectors;
3135

36+
import com.google.common.collect.Sets;
37+
38+
import org.awaitility.Awaitility;
3239
import org.junit.After;
3340
import org.junit.AfterClass;
3441
import org.junit.Assert;
3542
import org.junit.Before;
3643
import org.junit.BeforeClass;
3744
import org.junit.Test;
3845

46+
import org.apache.cassandra.db.ColumnFamilyStore;
47+
import org.apache.cassandra.db.Keyspace;
48+
import org.apache.cassandra.db.marshal.ValueGenerator;
3949
import org.apache.cassandra.distributed.Cluster;
4050
import org.apache.cassandra.distributed.api.ConsistencyLevel;
4151
import org.apache.cassandra.utils.TimeUUID;
42-
import org.awaitility.Awaitility;
52+
53+
import static org.apache.cassandra.distributed.impl.IsolatedExecutor.waitOn;
4354

4455
public class SecondaryIndexTest extends TestBaseImpl
4556
{
4657
private static final int NUM_NODES = 3;
4758
private static final int REPLICATION_FACTOR = 1;
48-
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, PRIMARY KEY (k))";
59+
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v text, PRIMARY KEY (k))";
4960
private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON %s(v)";
5061

5162
private static final AtomicInteger seq = new AtomicInteger();
@@ -122,4 +133,48 @@ public void test_only_coordinator_chooses_index_for_query()
122133
});
123134
}
124135
}
136+
137+
@Test
138+
public void test_secondary_rebuild_with_small_memtable_memory()
139+
{
140+
// populate data
141+
Random rand = new Random();
142+
for (int i = 0 ; i < 100 ; ++i)
143+
cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, ValueGenerator.randomString(rand, 50000));
144+
145+
cluster.forEach(i -> i.flush(KEYSPACE));
146+
147+
// restart node 1 with small memtable allocation so that index rebuild will cause memtable flush which will need
148+
// to reclaim the memory. see CASSANDRA-19564
149+
waitOn(cluster.get(1).shutdown());
150+
Object originalMemTableHeapSpace = cluster.get(1).config().get("memtable_heap_space");
151+
cluster.get(1).config().set("memtable_heap_space", "1MiB");
152+
cluster.get(1).startup();
153+
String tableNameWithoutKeyspaceName = tableName.split("\\.")[1];
154+
String indexName = String.format("v_index_%d", seq.get());
155+
Runnable task = cluster.get(1).runsOnInstance(
156+
() -> {
157+
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableNameWithoutKeyspaceName);
158+
cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(indexName)));
159+
}
160+
);
161+
ExecutorService es = Executors.newFixedThreadPool(1);
162+
Future<?> future = es.submit(task);
163+
try
164+
{
165+
future.get(30, TimeUnit.SECONDS);
166+
}
167+
catch (Exception e)
168+
{
169+
e.printStackTrace();
170+
Assert.fail("Rebuild should finish within 30 seconds without issue.");
171+
}
172+
finally
173+
{
174+
// restore node1 to use default value for memtable_heap_space
175+
waitOn(cluster.get(1).shutdown());
176+
cluster.get(1).config().set("memtable_heap_space", originalMemTableHeapSpace);
177+
cluster.get(1).startup();
178+
}
179+
}
125180
}

test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ public static ByteBuffer randomBytes(Random random)
4747

4848
public static String randomString(Random random)
4949
{
50-
char[] chars = new char[random.nextInt(100)];
50+
return randomString(random, 100);
51+
}
52+
53+
public static String randomString(Random random, int length)
54+
{
55+
char[] chars = new char[random.nextInt(length)];
5156
for (int i=0; i<chars.length; i++)
5257
chars[i] = CHARS[random.nextInt(CHARS.length)];
5358
return new String(chars);

0 commit comments

Comments
 (0)