Skip to content

Commit d0c5f6b

Browse files
Merge branch 'cassandra-4.1' into cassandra-5.0
* cassandra-4.1: ninja-fix – Fix eclipse-warnings error for CASSANDRA-19564 ReadCommandController should close fast to avoid deadlock when building secondary index
2 parents b4dcef7 + bfcba9c commit d0c5f6b

File tree

4 files changed

+82
-4
lines changed

4 files changed

+82
-4
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
5.0.7
2+
Merged from 4.1:
3+
* ReadCommandController should close fast to avoid deadlock when building secondary index (CASSANDRA-19564)
24
Merged from 4.0:
35
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884)
46

src/java/org/apache/cassandra/index/SecondaryIndexManager.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.cassandra.db.lifecycle.SSTableSet;
5959
import org.apache.cassandra.db.lifecycle.View;
6060
import org.apache.cassandra.db.memtable.Memtable;
61+
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
6162
import org.apache.cassandra.db.partitions.PartitionUpdate;
6263
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
6364
import org.apache.cassandra.db.rows.*;
@@ -1048,14 +1049,22 @@ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize, R
10481049
SinglePartitionPager pager = new SinglePartitionPager(cmd, null, ProtocolVersion.CURRENT);
10491050
while (!pager.isExhausted())
10501051
{
1052+
@SuppressWarnings("resource")
1053+
UnfilteredRowIterator partition;
10511054
try (ReadExecutionController controller = cmd.executionController();
1052-
WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing();
10531055
UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata(), pageSize, controller))
10541056
{
10551057
if (!page.hasNext())
10561058
break;
10571059

1058-
try (UnfilteredRowIterator partition = page.next())
1060+
try (UnfilteredRowIterator onePartition = page.next())
1061+
{
1062+
partition = ImmutableBTreePartition.create(onePartition).unfilteredIterator();
1063+
}
1064+
}
1065+
1066+
try (WriteContext ctx = keyspace.getWriteHandler().createContextForIndexing())
1067+
{
10591068
{
10601069
Set<Index.Indexer> indexers = new HashSet<>(indexGroups.size());
10611070

@@ -1119,6 +1128,13 @@ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize, R
11191128
indexers.forEach(Index.Indexer::finish);
11201129
}
11211130
}
1131+
finally
1132+
{
1133+
if (partition != null)
1134+
{
1135+
partition.close();
1136+
}
1137+
}
11221138
}
11231139
}
11241140
}

test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@
2323
import java.util.Arrays;
2424
import java.util.Collections;
2525
import java.util.List;
26+
import java.util.Random;
2627
import java.util.UUID;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.Future;
2731
import java.util.concurrent.TimeUnit;
2832
import java.util.concurrent.atomic.AtomicInteger;
2933
import java.util.regex.Pattern;
3034
import java.util.stream.Collectors;
3135

36+
import com.google.common.collect.Sets;
37+
3238
import org.awaitility.Awaitility;
3339
import org.junit.After;
3440
import org.junit.AfterClass;
@@ -37,15 +43,20 @@
3743
import org.junit.BeforeClass;
3844
import org.junit.Test;
3945

46+
import org.apache.cassandra.db.ColumnFamilyStore;
47+
import org.apache.cassandra.db.Keyspace;
48+
import org.apache.cassandra.db.marshal.ValueGenerator;
4049
import org.apache.cassandra.distributed.Cluster;
4150
import org.apache.cassandra.distributed.api.ConsistencyLevel;
4251
import org.apache.cassandra.utils.TimeUUID;
4352

53+
import static org.apache.cassandra.distributed.impl.IsolatedExecutor.waitOn;
54+
4455
public class SecondaryIndexTest extends TestBaseImpl
4556
{
4657
private static final int NUM_NODES = 3;
4758
private static final int REPLICATION_FACTOR = 1;
48-
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, PRIMARY KEY (k))";
59+
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v text, PRIMARY KEY (k))";
4960
private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON %s(v)";
5061

5162
private static final AtomicInteger seq = new AtomicInteger();
@@ -122,4 +133,48 @@ public void test_only_coordinator_chooses_index_for_query()
122133
});
123134
}
124135
}
136+
137+
@Test
138+
public void test_secondary_rebuild_with_small_memtable_memory()
139+
{
140+
// populate data
141+
Random rand = new Random();
142+
for (int i = 0 ; i < 100 ; ++i)
143+
cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, ValueGenerator.randomString(rand, 50000));
144+
145+
cluster.forEach(i -> i.flush(KEYSPACE));
146+
147+
// restart node 1 with small memtable allocation so that index rebuild will cause memtable flush which will need
148+
// to reclaim the memory. see CASSANDRA-19564
149+
waitOn(cluster.get(1).shutdown());
150+
Object originalMemTableHeapSpace = cluster.get(1).config().get("memtable_heap_space");
151+
cluster.get(1).config().set("memtable_heap_space", "1MiB");
152+
cluster.get(1).startup();
153+
String tableNameWithoutKeyspaceName = tableName.split("\\.")[1];
154+
String indexName = String.format("v_index_%d", seq.get());
155+
Runnable task = cluster.get(1).runsOnInstance(
156+
() -> {
157+
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(tableNameWithoutKeyspaceName);
158+
cfs.indexManager.rebuildIndexesBlocking(Sets.newHashSet(Arrays.asList(indexName)));
159+
}
160+
);
161+
ExecutorService es = Executors.newFixedThreadPool(1);
162+
Future<?> future = es.submit(task);
163+
try
164+
{
165+
future.get(30, TimeUnit.SECONDS);
166+
}
167+
catch (Exception e)
168+
{
169+
e.printStackTrace();
170+
Assert.fail("Rebuild should finish within 30 seconds without issue.");
171+
}
172+
finally
173+
{
174+
// restore node1 to use default value for memtable_heap_space
175+
waitOn(cluster.get(1).shutdown());
176+
cluster.get(1).config().set("memtable_heap_space", originalMemTableHeapSpace);
177+
cluster.get(1).startup();
178+
}
179+
}
125180
}

test/unit/org/apache/cassandra/db/marshal/ValueGenerator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ public static ByteBuffer randomBytes(Random random)
4747

4848
public static String randomString(Random random)
4949
{
50-
char[] chars = new char[random.nextInt(100)];
50+
return randomString(random, 100);
51+
}
52+
53+
public static String randomString(Random random, int length)
54+
{
55+
char[] chars = new char[random.nextInt(length)];
5156
for (int i=0; i<chars.length; i++)
5257
chars[i] = CHARS[random.nextInt(CHARS.length)];
5358
return new String(chars);

0 commit comments

Comments
 (0)