Skip to content

Commit f7641a4

Browse files
authored
CNDB-13372 Introduce min Paxos backoff time (#1651)
### What is the issue Resolves: https://github.com/riptano/cndb/issues/13372 Currently min paxos backoff is to `0` which doesn't help there is already contention. ### What does this PR fix and why was it fixed Introduced a new property `cassandra.lwt_min_backoff_ms` which defautls to 5ms (10x smaller that the default 50ms to give enough range and be conservative relative to the previous 0 value, but still a magic number) and fix the `contentionBackoffLatency` that accepts nanos
1 parent 0716022 commit f7641a4

File tree

6 files changed

+183
-29
lines changed

6 files changed

+183
-29
lines changed

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ public enum CassandraRelevantProperties
479479
CUSTOM_KEYSPACES_FILTER_PROVIDER("cassandra.custom_keyspaces_filter_provider_class"),
480480

481481
LWT_LOCKS_PER_THREAD("cassandra.lwt_locks_per_thread", "1024"),
482+
LWT_MIN_BACKOFF_MS("cassandra.lwt_min_backoff_ms", "5"),
482483
LWT_MAX_BACKOFF_MS("cassandra.lwt_max_backoff_ms", "50"),
483484
COUNTER_LOCK_NUM_STRIPES_PER_THREAD("cassandra.counter_lock.num_stripes_per_thread", "1024"),
484485
COUNTER_LOCK_FAIR_LOCK("cassandra.counter_lock.fair_lock", "false"),

src/java/org/apache/cassandra/service/StorageProxy.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.concurrent.atomic.AtomicInteger;
3939
import java.util.concurrent.atomic.AtomicLong;
4040
import java.util.function.Supplier;
41-
import java.util.stream.Collectors;
4241
import javax.annotation.Nullable;
4342

4443
import com.google.common.base.Preconditions;
@@ -123,14 +122,13 @@
123122
import org.apache.cassandra.schema.Schema;
124123
import org.apache.cassandra.schema.SchemaConstants;
125124
import org.apache.cassandra.schema.TableMetadata;
126-
import org.apache.cassandra.sensors.ActiveRequestSensors;
127125
import org.apache.cassandra.sensors.Context;
128-
import org.apache.cassandra.sensors.NoOpRequestSensors;
129126
import org.apache.cassandra.sensors.RequestSensors;
130127
import org.apache.cassandra.sensors.SensorsFactory;
131128
import org.apache.cassandra.sensors.Type;
132129
import org.apache.cassandra.service.paxos.Commit;
133130
import org.apache.cassandra.service.paxos.PaxosState;
131+
import org.apache.cassandra.service.paxos.PaxosUtils;
134132
import org.apache.cassandra.service.paxos.PrepareCallback;
135133
import org.apache.cassandra.service.paxos.ProposeCallback;
136134
import org.apache.cassandra.service.reads.AbstractReadExecutor;
@@ -359,8 +357,6 @@ public AtomicInteger load(InetAddressAndPort inetAddress)
359357
private static final boolean disableSerialReadLinearizability =
360358
Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false"));
361359

362-
private static volatile Integer maxPaxosBackoffMillis = CassandraRelevantProperties.LWT_MAX_BACKOFF_MS.getInt();
363-
364360
private StorageProxy()
365361
{
366362
}
@@ -369,6 +365,7 @@ private StorageProxy()
369365
{
370366
MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
371367
HintsService.instance.registerMBean();
368+
PaxosUtils.instance.registerMBean();
372369

373370
standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
374371
{
@@ -715,9 +712,7 @@ private static RowIterator doPaxos(TableMetadata metadata,
715712

716713
Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
717714
contentions++;
718-
int sleepInMillis = ThreadLocalRandom.current().nextInt(maxPaxosBackoffMillis);
719-
Uninterruptibles.sleepUninterruptibly(sleepInMillis, TimeUnit.MILLISECONDS);
720-
casMetrics.contentionBackoffLatency.addNano(sleepInMillis * 1000);
715+
PaxosUtils.applyPaxosContentionBackoff(casMetrics);
721716
// continue to retry
722717
}
723718
}
@@ -785,9 +780,7 @@ private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoT
785780
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
786781
contentions++;
787782
// sleep a random amount to give the other proposer a chance to finish
788-
int sleepInMillis = ThreadLocalRandom.current().nextInt(maxPaxosBackoffMillis);
789-
Uninterruptibles.sleepUninterruptibly(sleepInMillis, MILLISECONDS);
790-
casMetrics.contentionBackoffLatency.addNano(sleepInMillis * 1000);
783+
PaxosUtils.applyPaxosContentionBackoff(casMetrics);
791784
continue;
792785
}
793786

@@ -826,9 +819,7 @@ private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoT
826819
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
827820
// sleep a random amount to give the other proposer a chance to finish
828821
contentions++;
829-
int sleepInMillis = ThreadLocalRandom.current().nextInt(maxPaxosBackoffMillis);
830-
Uninterruptibles.sleepUninterruptibly(sleepInMillis, MILLISECONDS);
831-
casMetrics.contentionBackoffLatency.addNano(sleepInMillis * 1000);
822+
PaxosUtils.applyPaxosContentionBackoff(casMetrics);
832823
}
833824
continue;
834825
}
@@ -3050,16 +3041,4 @@ public void disableCheckForDuplicateRowsDuringCompaction()
30503041
{
30513042
DatabaseDescriptor.setCheckForDuplicateRowsDuringCompaction(false);
30523043
}
3053-
3054-
@Override
3055-
public int getMaxPaxosBackoffMillis()
3056-
{
3057-
return maxPaxosBackoffMillis;
3058-
}
3059-
3060-
@Override
3061-
public void setMaxPaxosBackoffMillis(int maxPaxosBackoffMillis)
3062-
{
3063-
StorageProxy.maxPaxosBackoffMillis = maxPaxosBackoffMillis;
3064-
}
30653044
}

src/java/org/apache/cassandra/service/StorageProxyMBean.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,4 @@ public interface StorageProxyMBean
103103
boolean getCheckForDuplicateRowsDuringCompaction();
104104
void enableCheckForDuplicateRowsDuringCompaction();
105105
void disableCheckForDuplicateRowsDuringCompaction();
106-
107-
int getMaxPaxosBackoffMillis();
108-
void setMaxPaxosBackoffMillis(int maxPaxosBackoffMillis);
109106
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.paxos;
20+
21+
import java.util.concurrent.ThreadLocalRandom;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import com.google.common.util.concurrent.Uninterruptibles;
25+
26+
import org.apache.cassandra.config.CassandraRelevantProperties;
27+
import org.apache.cassandra.metrics.CASClientRequestMetrics;
28+
import org.apache.cassandra.utils.MBeanWrapper;
29+
30+
public final class PaxosUtils implements PaxosUtilsMBean
31+
{
32+
public static final PaxosUtils instance = new PaxosUtils();
33+
34+
private static final String MBEAN_NAME = "org.apache.cassandra.service:type=Paxos";
35+
private static volatile Integer maxPaxosBackoffMillis = CassandraRelevantProperties.LWT_MAX_BACKOFF_MS.getInt();
36+
private static volatile Integer minPaxosBackoffMillis = CassandraRelevantProperties.LWT_MIN_BACKOFF_MS.getInt();
37+
38+
private PaxosUtils()
39+
{
40+
}
41+
42+
public void registerMBean()
43+
{
44+
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
45+
}
46+
47+
/**
48+
* Applies a random sleep time between minPaxosBackoffMillis (inclusive) and maxPaxosBackoffMillis (exclusive)
49+
* and emits the contentionBackoffLatency metric.
50+
*/
51+
public static void applyPaxosContentionBackoff(CASClientRequestMetrics casMetrics)
52+
{
53+
int sleepInMillis = ThreadLocalRandom.current().nextInt(minPaxosBackoffMillis, maxPaxosBackoffMillis);
54+
Uninterruptibles.sleepUninterruptibly(sleepInMillis, TimeUnit.MILLISECONDS);
55+
long sleepInNanos = TimeUnit.MILLISECONDS.toNanos(sleepInMillis);
56+
casMetrics.contentionBackoffLatency.addNano(sleepInNanos);
57+
}
58+
59+
@Override
60+
public int getMaxPaxosBackoffMillis()
61+
{
62+
return maxPaxosBackoffMillis;
63+
}
64+
65+
@Override
66+
public void setMaxPaxosBackoffMillis(int maxPaxosBackoffMillis)
67+
{
68+
PaxosUtils.maxPaxosBackoffMillis = maxPaxosBackoffMillis;
69+
}
70+
71+
@Override
72+
public int getMinPaxosBackoffMillis()
73+
{
74+
return minPaxosBackoffMillis;
75+
}
76+
77+
@Override
78+
public void setMinPaxosBackoffMillis(int minPaxosBackoffMillis)
79+
{
80+
PaxosUtils.minPaxosBackoffMillis = minPaxosBackoffMillis;
81+
}
82+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service.paxos;
20+
21+
public interface PaxosUtilsMBean
22+
{
23+
int getMaxPaxosBackoffMillis();
24+
25+
void setMaxPaxosBackoffMillis(int maxPaxosBackoffMillis);
26+
27+
int getMinPaxosBackoffMillis();
28+
29+
void setMinPaxosBackoffMillis(int minPaxosBackoffMillis);
30+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.service;
20+
21+
import org.junit.BeforeClass;
22+
import org.junit.Test;
23+
24+
import com.carrotsearch.randomizedtesting.annotations.Timeout;
25+
import org.apache.cassandra.config.CassandraRelevantProperties;
26+
import org.apache.cassandra.metrics.CASClientRequestMetrics;
27+
import org.apache.cassandra.service.paxos.PaxosUtils;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertTrue;
31+
32+
33+
public class PaxosUtilsTest
34+
{
35+
private static final int minPaxosBackoffMillis = 1;
36+
private static final int maxPaxosBackoffMillis = 5;
37+
38+
private static final long minPaxosBackoffMicros = minPaxosBackoffMillis * 1000;
39+
private static final long maxPaxosBackoffMicros = maxPaxosBackoffMillis * 1000;
40+
41+
@BeforeClass
42+
public static void beforeClass() throws Throwable
43+
{
44+
CassandraRelevantProperties.LWT_MIN_BACKOFF_MS.setInt(minPaxosBackoffMillis);
45+
CassandraRelevantProperties.LWT_MAX_BACKOFF_MS.setInt(maxPaxosBackoffMillis);
46+
}
47+
48+
@Timeout(millis = 500)
49+
@Test
50+
public void testApplyPaxosContentionBackoff()
51+
{
52+
CASClientRequestMetrics casMetrics = new CASClientRequestMetrics("test", "");
53+
long totalLatencyMicrosFromPreviousIteration = 0;
54+
for (int i = 0; i < 100; i++)
55+
{
56+
PaxosUtils.applyPaxosContentionBackoff(casMetrics);
57+
assertEquals(i + 1, casMetrics.contentionBackoffLatency.latency.getCount());
58+
59+
double lastRecordedLatencyMicros = casMetrics.contentionBackoffLatency.totalLatency.getCount() - totalLatencyMicrosFromPreviousIteration;
60+
totalLatencyMicrosFromPreviousIteration = casMetrics.contentionBackoffLatency.totalLatency.getCount();
61+
assertTrue(lastRecordedLatencyMicros >= minPaxosBackoffMicros);
62+
assertTrue(lastRecordedLatencyMicros < maxPaxosBackoffMicros);
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)