Skip to content

Commit 1a12cbb

Browse files
committed
Merge branch 'cassandra-4.1' into cassandra-5.0
* cassandra-4.1: Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting
2 parents 2fec119 + 7f59280 commit 1a12cbb

File tree

6 files changed

+252
-2
lines changed

6 files changed

+252
-2
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
Merged from 4.1:
55
* ReadCommandController should close fast to avoid deadlock when building secondary index (CASSANDRA-19564)
66
Merged from 4.0:
7+
* Fix cleanup of old incremental repair sessions in case of owned token range changes or a table deleting (CASSANDRA-20877)
8+
* Fix memory leak in BufferPoolAllocator when a capacity needs to be extended (CASSANDRA-20753)
79
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884)
810

911
5.0.6

src/java/org/apache/cassandra/dht/Range.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,21 @@ public static <T extends RingPosition<T>> Set<Range<T>> subtract(Collection<Rang
408408
return result;
409409
}
410410

411+
public static <T extends RingPosition<T>> List<Range<T>> intersect(Collection<Range<T>> ranges1, Collection<Range<T>> ranges2)
412+
{
413+
Set<Range<T>> result = new HashSet<>();
414+
// note: O(n^2), simple but not very efficient
415+
for (Range<T> range1 : ranges1)
416+
{
417+
for (Range<T> range2 : ranges2)
418+
{
419+
result.addAll(range1.intersectionWith(range2));
420+
}
421+
}
422+
return normalize(result);
423+
}
424+
425+
411426
/**
412427
* Calculate set of the difference ranges of given two ranges
413428
* (as current (A, B] and rhs is (C, D])

src/java/org/apache/cassandra/repair/consistent/LocalSessions.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.apache.cassandra.repair.messages.StatusResponse;
9191
import org.apache.cassandra.repair.SharedContext;
9292
import org.apache.cassandra.schema.TableId;
93+
import org.apache.cassandra.schema.TableMetadata;
9394
import org.apache.cassandra.service.ActiveRepairService;
9495
import org.apache.cassandra.repair.NoSuchRepairSessionException;
9596
import org.apache.cassandra.service.StorageService;
@@ -249,21 +250,46 @@ private boolean shouldStoreSession(LocalSession session)
249250
*/
250251
private boolean isSuperseded(LocalSession session)
251252
{
253+
// to reduce overheads of intersect calculation for tables within the same keyspace
254+
Map<String, Collection<Range<Token>>> rangesPerKeyspaceCache = new HashMap<>();
252255
for (TableId tid : session.tableIds)
253256
{
254-
RepairedState state = repairedStates.get(tid);
257+
TableMetadata tableMetadata = getTableMetadata(tid);
258+
if (tableMetadata == null) // if a table was removed - ignore it
259+
continue;
255260

261+
RepairedState state = repairedStates.get(tid);
256262
if (state == null)
257263
return false;
258264

259-
long minRepaired = state.minRepairedAt(session.ranges);
265+
Collection<Range<Token>> actualRanges = rangesPerKeyspaceCache.computeIfAbsent(tableMetadata.keyspace, (keyspace) -> {
266+
List<Range<Token>> localRanges = getLocalRanges(tableMetadata.keyspace);
267+
if (localRanges.isEmpty()) // to handle the case when we run before the information about owned ranges is properly populated
268+
return session.ranges;
269+
270+
// ignore token ranges which were moved to other nodes and not owned by the current one anymore
271+
return Range.intersect(session.ranges, localRanges);
272+
});
273+
long minRepaired = state.minRepairedAt(actualRanges);
260274
if (minRepaired <= session.repairedAt)
261275
return false;
262276
}
263277

264278
return true;
265279
}
266280

281+
@VisibleForTesting
282+
protected TableMetadata getTableMetadata(TableId tableId)
283+
{
284+
return Schema.instance.getTableMetadata(tableId);
285+
}
286+
287+
@VisibleForTesting
288+
protected List<Range<Token>> getLocalRanges(String keyspace)
289+
{
290+
return StorageService.instance.getLocalAndPendingRanges(keyspace);
291+
}
292+
267293
public RepairedState.Stats getRepairedStats(TableId tid, Collection<Range<Token>> ranges)
268294
{
269295
RepairedState state = repairedStates.get(tid);
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import com.google.common.collect.ImmutableList;
25+
import com.google.common.collect.ImmutableMap;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
import org.apache.cassandra.config.CassandraRelevantProperties;
30+
import org.apache.cassandra.distributed.Cluster;
31+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
32+
import org.apache.cassandra.distributed.api.ICluster;
33+
import org.apache.cassandra.distributed.api.IInstanceConfig;
34+
import org.apache.cassandra.distributed.api.IInvokableInstance;
35+
import org.apache.cassandra.distributed.api.TokenSupplier;
36+
import org.apache.cassandra.distributed.shared.NetworkTopology;
37+
import org.apache.cassandra.distributed.shared.WithProperties;
38+
import org.apache.cassandra.repair.consistent.ConsistentSession;
39+
import org.apache.cassandra.service.ActiveRepairService;
40+
import org.apache.cassandra.service.StorageService;
41+
import org.apache.cassandra.utils.concurrent.Condition;
42+
import org.apache.cassandra.utils.progress.ProgressEventType;
43+
44+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
45+
import static java.util.concurrent.TimeUnit.MINUTES;
46+
import static java.util.concurrent.TimeUnit.SECONDS;
47+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
48+
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
49+
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
50+
import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
51+
import static org.apache.cassandra.repair.consistent.LocalSessionInfo.STATE;
52+
import static org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY;
53+
import static org.awaitility.Awaitility.await;
54+
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
55+
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
56+
57+
public class IncrementalRepairCleanupAfterNodeAddingTest extends TestBaseImpl
58+
{
59+
@Test
60+
public void test() throws Exception
61+
{
62+
int originalNodeCount = 3;
63+
try (WithProperties withProperties = new WithProperties())
64+
{
65+
withProperties.set(CassandraRelevantProperties.REPAIR_DELETE_TIMEOUT_SECONDS, "0");
66+
try (Cluster cluster = builder().withNodes(originalNodeCount)
67+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(originalNodeCount + 1, 1))
68+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
69+
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
70+
.start())
71+
{
72+
populate(cluster, 0, 100);
73+
74+
repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));
75+
76+
Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT
77+
78+
// to check that the session is still here (it is not superseded yet)
79+
cluster.get(1).runOnInstance(rethrow(() -> {
80+
ActiveRepairService.instance().consistent.local.cleanup();
81+
List<Map<String, String>> sessions = ActiveRepairService.instance().getSessions(true, null);
82+
Assert.assertThat(sessions, hasSize(1));
83+
}));
84+
85+
addNode(cluster);
86+
87+
repair(cluster, KEYSPACE, ImmutableMap.of(INCREMENTAL_KEY, "true"));
88+
89+
Thread.sleep(1); // to ensure that we crossed LocalSessions.AUTO_DELETE_TIMEOUT
90+
91+
cluster.get(1).runOnInstance(rethrow(() -> {
92+
ActiveRepairService.instance().consistent.local.cleanup();
93+
List<Map<String, String>> sessions = ActiveRepairService.instance().getSessions(true, null);
94+
Assert.assertThat(sessions, hasSize(1));
95+
}));
96+
}
97+
}
98+
}
99+
100+
protected void addNode(Cluster cluster)
101+
{
102+
IInstanceConfig config = cluster.newInstanceConfig();
103+
config.set("auto_bootstrap", true);
104+
IInvokableInstance newInstance = cluster.bootstrap(config);
105+
newInstance.startup(cluster);
106+
}
107+
108+
public static void populate(ICluster cluster, int from, int to)
109+
{
110+
populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
111+
}
112+
113+
public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
114+
{
115+
cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"));
116+
cluster.schemaChange(withKeyspace("CREATE TABLE IF NOT EXISTS %s.repair_add_node_test (pk int, ck int, v int, PRIMARY KEY (pk, ck))"));
117+
for (int i = from; i < to; i++)
118+
{
119+
cluster.coordinator(coord).execute(withKeyspace("INSERT INTO %s.repair_add_node_test (pk, ck, v) VALUES (?, ?, ?)"),
120+
cl, i, i, i);
121+
}
122+
}
123+
124+
static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
125+
{
126+
cluster.get(1).runOnInstance(rethrow(() -> {
127+
Condition await = newOneTimeCondition();
128+
StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
129+
if (event.getType() == ProgressEventType.COMPLETE)
130+
await.signalAll();
131+
})).right.get();
132+
await.await(1L, MINUTES);
133+
134+
// local sessions finalization happens asynchronously
135+
// so to avoid race condition and flakiness for the test we wait explicitly for local sessions to finalize
136+
await().pollInterval(10, MILLISECONDS)
137+
.atMost(60, SECONDS)
138+
.until(() -> {
139+
List<Map<String, String>> sessions = ActiveRepairService.instance().getSessions(true, null);
140+
for (Map<String, String> sessionInfo : sessions)
141+
if (!sessionInfo.get(STATE).equals(ConsistentSession.State.FINALIZED.toString()))
142+
return false;
143+
return true;
144+
});
145+
}));
146+
}
147+
}

test/unit/org/apache/cassandra/dht/RangeTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,4 +736,48 @@ public void testGroupSubtract()
736736
assertEquals(ranges, Range.subtract(ranges, asList(r(6, 7), r(20, 25))));
737737
assertEquals(Sets.newHashSet(r(1, 4), r(11, 15)), Range.subtract(ranges, asList(r(4, 7), r(8, 11))));
738738
}
739+
740+
@Test
741+
public void testGroupIntersection()
742+
{
743+
assertEquals(Collections.emptyList(),
744+
Range.intersect(asList(r(1, 5), r(10, 15)),
745+
asList(r(6, 7), r(20, 25))
746+
));
747+
748+
assertEquals(asList(r(5, 6)),
749+
Range.intersect(asList(r(1, 6), r(10, 15)),
750+
asList(r(5, 10))
751+
));
752+
753+
assertEquals(asList(r(5, 6), r(10, 11)),
754+
Range.intersect(asList(r(1, 6), r(10, 15)),
755+
asList(r(5, 11))
756+
));
757+
758+
assertEquals(asList(r(5, 6), r(10, 11)),
759+
Range.intersect(asList(r(1, 6), r(10, 15)),
760+
asList(r(5, 11))
761+
));
762+
763+
assertEquals(asList(r(5, 6), r(10, 11), r(12, 15)),
764+
Range.intersect(asList(r(1, 6), r(10, 15)),
765+
asList(r(5, 11), r(12, 20))
766+
));
767+
768+
assertEquals(asList(r(5, 6), r(10, 15)),
769+
Range.intersect(asList(r(1, 6), r(10, 15)),
770+
asList(r(5, 11), r(11, 20))
771+
));
772+
773+
assertEquals(Collections.emptyList(),
774+
Range.intersect(Collections.emptyList(),
775+
asList(r(5, 11), r(11, 20))
776+
));
777+
778+
assertEquals(Collections.emptyList(),
779+
Range.intersect(asList(r(1, 6), r(10, 15)),
780+
Collections.emptyList()
781+
));
782+
}
739783
}

test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.cassandra.repair.consistent;
2020

2121
import java.util.ArrayList;
22+
import java.util.Arrays;
2223
import java.util.Collection;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
@@ -44,11 +45,14 @@
4445
import org.apache.cassandra.cql3.QueryProcessor;
4546
import org.apache.cassandra.cql3.UntypedResultSet;
4647
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
48+
import org.apache.cassandra.dht.Range;
49+
import org.apache.cassandra.dht.Token;
4750
import org.apache.cassandra.locator.RangesAtEndpoint;
4851
import org.apache.cassandra.net.Message;
4952
import org.apache.cassandra.repair.AbstractRepairTest;
5053
import org.apache.cassandra.locator.InetAddressAndPort;
5154
import org.apache.cassandra.repair.KeyspaceRepairManager;
55+
import org.apache.cassandra.schema.TableId;
5256
import org.apache.cassandra.schema.TableMetadata;
5357
import org.apache.cassandra.schema.Schema;
5458
import org.apache.cassandra.schema.SchemaConstants;
@@ -222,6 +226,18 @@ protected boolean sessionHasData(LocalSession session)
222226
{
223227
return sessionHasData;
224228
}
229+
230+
@Override
231+
protected TableMetadata getTableMetadata(TableId tableId)
232+
{
233+
return cfm;
234+
}
235+
236+
@Override
237+
protected List<Range<Token>> getLocalRanges(String keyspace)
238+
{
239+
return Arrays.asList(RANGE1, RANGE2, RANGE3);
240+
}
225241
}
226242

227243
private static TableMetadata cfm;

0 commit comments

Comments
 (0)