Skip to content

Commit e4684a1

Browse files
committed
Fix:
- DefaultLocalListeners.ComplexListeners iterator IndexOutOfBoundsException - Race condition initialising empty ActiveEpochs, when minimum pending epoch can move backwards - SyncPoints must be declared in an epoch containing the ranges, and PENDING_REMOVAL ranges will reject non-syncpoint transactions - AccordExecutorMetrics is now registered on startup - getRecentValues for non-cumulative histogram should not subtract prior values Improve: - Report ephemeral read, epoch waits and timeout metrics - Remove Topologies.SelectNodeOwnership, as no need to SLICE anymore - Introduce SystemEventListener for epoch waiting and timeout metrics - No-op but log if gcBefore provided to CFK is in the past patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21076
1 parent 8a72868 commit e4684a1

32 files changed

+513
-253
lines changed

modules/accord

Submodule accord updated 61 files

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5505,7 +5505,7 @@ public static long getAccordRepairTimeoutNanos()
55055505

55065506
public static boolean getAccordTransactionsEnabled()
55075507
{
5508-
return conf == null ? false : conf.accord.enabled;
5508+
return conf != null && conf.accord.enabled;
55095509
}
55105510

55115511
public static void setAccordTransactionsEnabled(boolean b)

src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement,
132132
public static final String WRITE_TXN_EMPTY_WITH_IGNORED_READS = "Write txn produced no mutation, and its reads do not return to the caller; ignoring...";
133133
public static final String WRITE_TXN_EMPTY_WITH_NO_READS = "Write txn produced no mutation, and had no reads; ignoring...";
134134

135-
private static NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1, TimeUnit.MINUTES);
135+
private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1, TimeUnit.MINUTES);
136136

137137
static class NamedSelect
138138
{

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2186,7 +2186,7 @@ public void collect(PartitionsCollector collector)
21862186
Map<TokenKey, List<ShardAndEpochs>> startLookup = null;
21872187
for (ActiveEpoch epoch : snapshot)
21882188
{
2189-
Topology topology = epoch.global();
2189+
Topology topology = epoch.all();
21902190
for (Shard shard : topology.shards())
21912191
{
21922192
Range range = shard.range;

src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public Shard(ShardedHitRate.HitRateShard hitRate, LogLinearHistogram objectSize)
9090
public AccordCacheMetrics(String subTypeName)
9191
{
9292
DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE, subTypeName);
93-
this.objectSize = Metrics.shardedHistogram(factory.createMetricName("EntrySize"));
93+
this.objectSize = Metrics.shardedHistogram(factory.createMetricName("EntrySize"), false);
9494
this.hits = Metrics.gauge(factory.createMetricName("Hits"), hitRate::totalHits);
9595
this.misses = Metrics.gauge(factory.createMetricName("Misses"), hitRate::totalMisses);
9696
this.requests = Metrics.gauge(factory.createMetricName("Requests"), hitRate::totalRequests);

src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class AccordCoordinatorMetrics
5252
public static final String COORDINATOR_PREACCEPT_LATENCY = "PreAcceptLatency";
5353
public static final String COORDINATOR_EXECUTE_LATENCY = "ExecuteLatency";
5454
public static final String COORDINATOR_APPLY_LATENCY = "ApplyLatency";
55+
public static final String EPHEMERAL = "Ephemeral";
5556
public static final String FAST_PATHS = "FastPaths";
5657
public static final String MEDIUM_PATHS = "MediumPaths";
5758
public static final String SLOW_PATHS = "SlowPaths";
@@ -98,6 +99,11 @@ public class AccordCoordinatorMetrics
9899
*/
99100
public final Histogram tables;
100101

102+
/**
103+
* The number of ephemeral transactions executed on this coordinator.
104+
*/
105+
public final Meter ephemeral;
106+
101107
/**
102108
* The number of fast path transactions executed on this coordinator.
103109
*/
@@ -159,6 +165,7 @@ private AccordCoordinatorMetrics(String scope)
159165
keys = Metrics.histogram(coordinator.createMetricName(COORDINATOR_KEYS), true);
160166
tables = Metrics.histogram(coordinator.createMetricName(COORDINATOR_TABLES), true);
161167

168+
ephemeral = Metrics.meter(coordinator.createMetricName(EPHEMERAL));
162169
fastPaths = Metrics.meter(coordinator.createMetricName(FAST_PATHS));
163170
mediumPaths = Metrics.meter(coordinator.createMetricName(MEDIUM_PATHS));
164171
slowPaths = Metrics.meter(coordinator.createMetricName(SLOW_PATHS));
@@ -168,7 +175,7 @@ private AccordCoordinatorMetrics(String scope)
168175
invalidations = Metrics.meter(coordinator.createMetricName(INVALIDATIONS));
169176
recoveryDelay = Metrics.timer(coordinator.createMetricName(RECOVERY_DELAY));
170177
recoveryDuration = Metrics.timer(coordinator.createMetricName(RECOVERY_TIME));
171-
fastPathToTotal = new RatioGaugeSet(fastPaths, RatioGaugeSet.sum(fastPaths, mediumPaths, slowPaths), coordinator, FAST_PATH_TO_TOTAL + ".%s");
178+
fastPathToTotal = new RatioGaugeSet(fastPaths, RatioGaugeSet.sum(ephemeral, fastPaths, mediumPaths, slowPaths), coordinator, FAST_PATH_TO_TOTAL + ".%s");
172179
}
173180

174181
@Override
@@ -238,6 +245,7 @@ public void onExecuting(TxnId txnId, @Nullable Ballot ballot, Deps deps, @Nullab
238245
{
239246
switch (path)
240247
{
248+
case EPHEMERAL: metrics.ephemeral.mark(); break;
241249
case FAST: metrics.fastPaths.mark(); break;
242250
case MEDIUM: metrics.mediumPaths.mark(); break;
243251
case SLOW: metrics.slowPaths.mark(); break;

src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,8 @@ public AccordExecutorMetrics()
5858
waitingToRun = Metrics.register(factory.createMetricName("WaitingToRun"), gauges.newGauge(AccordExecutor::unsafeWaitingToRunCount, Long::sum));
5959
running = Metrics.register(factory.createMetricName("Running"), gauges.newGauge(AccordExecutor::unsafeRunningCount, Long::sum));
6060
}
61+
62+
public static void touch()
63+
{
64+
}
6165
}

src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import java.lang.reflect.Field;
2222
import java.util.concurrent.TimeUnit;
2323

24+
import accord.api.SystemEventListener;
2425
import accord.impl.progresslog.DefaultProgressLog;
2526
import accord.local.MaxDecidedRX;
2627
import accord.local.RedundantBefore;
2728
import accord.primitives.TxnId;
2829
import accord.topology.TopologyManager;
2930
import accord.utils.Invariants;
31+
import com.codahale.metrics.Counter;
3032
import com.codahale.metrics.Counting;
3133
import com.codahale.metrics.Gauge;
3234
import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
@@ -44,14 +46,18 @@
4446
import static org.apache.cassandra.metrics.AccordMetricUtils.fromTopologyManager;
4547
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
4648

47-
public class AccordSystemMetrics
49+
public class AccordSystemMetrics implements SystemEventListener
4850
{
4951
public final static AccordSystemMetrics metrics = new AccordSystemMetrics();
5052
private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(30);
5153

5254
public static final String ACCORD_SYSTEM = "AccordSystem";
5355
public static final String MIN_EPOCH = "MinEpoch";
5456
public static final String MAX_EPOCH = "MaxEpoch";
57+
public static final String MAX_PENDING_EPOCH = "MaxPendingEpoch";
58+
public static final String ERRORS = "Errors";
59+
public static final String EPOCH_WAITS = "EpochWaits";
60+
public static final String EPOCH_TIMEOUTS = "EpochTimeouts";
5561
public static final String PROGRESS_LOG_ACTIVE = "ProgressLogActive";
5662
public static final String PROGRESS_LOG_SIZE = "ProgressLogSize";
5763
public static final String PROGRESS_LOG_AGE = "ProgressLogAge";
@@ -66,6 +72,10 @@ public class AccordSystemMetrics
6672

6773
public final Gauge<Long> minEpoch;
6874
public final Gauge<Long> maxEpoch;
75+
public final Gauge<Long> maxPendingEpoch;
76+
public final Counter errors;
77+
public final Counter epochWaits;
78+
public final Counter epochTimeouts;
6979
public final Gauge<Long> progressLogActive;
7080
public final Gauge<Long> durabilityQueueActive;
7181
public final Gauge<Long> durabilityQueuePending;
@@ -142,17 +152,33 @@ private AccordSystemMetrics()
142152
DefaultNameFactory factory = new DefaultNameFactory(ACCORD_SYSTEM);
143153
minEpoch = Metrics.gauge(factory.createMetricName(MIN_EPOCH), fromTopologyManager(TopologyManager::minEpoch));
144154
maxEpoch = Metrics.gauge(factory.createMetricName(MAX_EPOCH), fromTopologyManager(TopologyManager::epoch));
155+
maxPendingEpoch = Metrics.gauge(factory.createMetricName(MAX_PENDING_EPOCH), fromTopologyManager(TopologyManager::pendingEpoch));
156+
errors = Metrics.counter(factory.createMetricName(ERRORS));
157+
epochTimeouts = Metrics.counter(factory.createMetricName(EPOCH_TIMEOUTS));
158+
epochWaits = Metrics.counter(factory.createMetricName(EPOCH_WAITS));
145159
durabilityQueueActive = Metrics.gauge(factory.createMetricName(DURABILITY_QUEUE_ACTIVE), fromDurabilityService(durability -> (long)durability.queue().activeCount()));
146160
durabilityQueuePending = Metrics.gauge(factory.createMetricName(DURABILITY_QUEUE_PENDING), fromDurabilityService(durability -> (long)durability.queue().pendingCount()));
147161
progressLogActive = Metrics.gauge(factory.createMetricName(PROGRESS_LOG_ACTIVE), fromDurabilityService(durability -> (long)durability.queue().activeCount()));
148-
progressLogSize = Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_SIZE), () -> maybeRefreshHistograms().progressLogSize);
149-
progressLogAge = Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_AGE), () -> maybeRefreshHistograms().progressLogAge);
150-
syncPointAgreedLag = Metrics.onDemandHistogram(factory.createMetricName(SYNCPOINT_AGREED_LAG), () -> maybeRefreshHistograms().syncPointAgreedLag);
151-
locallyAppliedLag = Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_APPLIED_LAG), () -> maybeRefreshHistograms().locallyAppliedLag);
152-
locallyDurableLag = Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_DURABLE_LAG), () -> maybeRefreshHistograms().locallyDurableLag);
153-
quorumAppliedLag = Metrics.onDemandHistogram(factory.createMetricName(QUORUM_APPLIED_LAG), () -> maybeRefreshHistograms().quorumAppliedLag);
154-
shardAppliedLag = Metrics.onDemandHistogram(factory.createMetricName(SHARD_APPLIED_LAG), () -> maybeRefreshHistograms().shardAppliedLag);
155-
gcLag = Metrics.onDemandHistogram(factory.createMetricName(GC_LAG), () -> maybeRefreshHistograms().gcLag);
162+
progressLogSize = Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_SIZE), () -> maybeRefreshHistograms().progressLogSize, false);
163+
progressLogAge = Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_AGE), () -> maybeRefreshHistograms().progressLogAge, false);
164+
syncPointAgreedLag = Metrics.onDemandHistogram(factory.createMetricName(SYNCPOINT_AGREED_LAG), () -> maybeRefreshHistograms().syncPointAgreedLag, false);
165+
locallyAppliedLag = Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_APPLIED_LAG), () -> maybeRefreshHistograms().locallyAppliedLag, false);
166+
locallyDurableLag = Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_DURABLE_LAG), () -> maybeRefreshHistograms().locallyDurableLag, false);
167+
quorumAppliedLag = Metrics.onDemandHistogram(factory.createMetricName(QUORUM_APPLIED_LAG), () -> maybeRefreshHistograms().quorumAppliedLag, false);
168+
shardAppliedLag = Metrics.onDemandHistogram(factory.createMetricName(SHARD_APPLIED_LAG), () -> maybeRefreshHistograms().shardAppliedLag, false);
169+
gcLag = Metrics.onDemandHistogram(factory.createMetricName(GC_LAG), () -> maybeRefreshHistograms().gcLag, false);
170+
}
171+
172+
@Override
173+
public void onWaitingForEpoch(long epoch)
174+
{
175+
epochWaits.inc();
176+
}
177+
178+
@Override
179+
public void onTimeoutForEpoch(long epoch, int count)
180+
{
181+
epochTimeouts.inc(count);
156182
}
157183

158184
private synchronized Snapshot maybeRefreshHistograms()
@@ -190,6 +216,8 @@ private synchronized void refreshHistograms()
190216
for (int i = 0 ; i < redundantBefore.size() ; ++i)
191217
{
192218
RedundantBefore.Bounds bounds = redundantBefore.valueAt(i);
219+
if (bounds == null)
220+
continue;
193221
builder.locallyAppliedLag.increment(ageSeconds(nowSeconds, bounds.maxBound(LOCALLY_APPLIED)));
194222
builder.locallyDurableLag.increment(ageSeconds(nowSeconds, bounds.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE)));
195223
builder.quorumAppliedLag.increment(ageSeconds(nowSeconds, bounds.maxBound(QUORUM_APPLIED)));

src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import com.codahale.metrics.Metric;
4949
import com.codahale.metrics.MetricRegistry;
5050
import com.codahale.metrics.MetricSet;
51-
import com.codahale.metrics.Timer;
5251
import org.apache.cassandra.db.virtual.CollectionVirtualTableAdapter;
5352
import org.apache.cassandra.db.virtual.VirtualTable;
5453
import org.apache.cassandra.db.virtual.model.CounterMetricRow;
@@ -177,8 +176,8 @@ else if (metric instanceof OverrideHistogram)
177176
return Double.toString(((OverrideHistogram) metric).getSnapshot().getMedian());
178177
else if (metric instanceof Meter)
179178
return Long.toString(((Meter) metric).getCount());
180-
else if (metric instanceof Timer)
181-
return Long.toString(((Timer) metric).getCount());
179+
else if (metric instanceof com.codahale.metrics.Timer)
180+
return Long.toString(((com.codahale.metrics.Timer) metric).getCount());
182181
else
183182
throw new IllegalStateException("Unknown metric type: " + metric.getClass().getName());
184183
}
@@ -357,14 +356,14 @@ public OverrideHistogram histogram(MetricName name, MetricName alias, boolean co
357356
return histogram;
358357
}
359358

360-
public ShardedHistogram shardedHistogram(MetricName name)
359+
public ShardedHistogram shardedHistogram(MetricName name, boolean isCumulative)
361360
{
362-
return register(name, new ShardedHistogram());
361+
return register(name, new ShardedHistogram(isCumulative));
363362
}
364363

365-
public OnDemandHistogram onDemandHistogram(MetricName name, Supplier<LogLinearHistogram.LogLinearSnapshot> snapshot)
364+
public OnDemandHistogram onDemandHistogram(MetricName name, Supplier<LogLinearHistogram.LogLinearSnapshot> snapshot, boolean isCumulative)
366365
{
367-
return register(name, new OnDemandHistogram(snapshot));
366+
return register(name, new OnDemandHistogram(snapshot, isCumulative));
368367
}
369368

370369
public <T extends Gauge<?>> T gauge(MetricName name, T gauge)
@@ -379,7 +378,7 @@ public <T extends Gauge<?>> T gauge(MetricName name, MetricName alias, T gauge)
379378
return gaugeLoc;
380379
}
381380

382-
public Timer timer(MetricName name)
381+
public SnapshottingTimer timer(MetricName name)
383382
{
384383
return timer(name, DEFAULT_TIMER_UNIT);
385384
}
@@ -773,17 +772,22 @@ public long[] values()
773772
}
774773

775774
/**
776-
* Returns a histogram describing the values recorded since the last time this method was called.
775+
* If the Histogram has cumulative data, returns a histogram describing the values recorded since the last time this method was called.
777776
*
778-
* ex. If the counts are [0, 1, 2, 1] at the time the first caller arrives, but change to [1, 2, 3, 2] by the
777+
* ex. If the counts are [0, 1, 2, 1] at the time the first caller arrives, but change to [1, 2, 3, 2] by the
779778
* time a second caller arrives, the second caller will receive [1, 1, 1, 1].
780779
*
780+
* If the Histogram does not have cumulative data, simply returns the current snapshot.
781+
*
781782
* @return a histogram whose bucket offsets are assumed to be in nanoseconds
782783
*/
783784
@Override
784785
public synchronized long[] getRecentValues()
785786
{
786-
long[] now = metric.getSnapshot().getValues();
787+
long[] now = values();
788+
if (!metric.isCumulative())
789+
return now;
790+
787791
long[] delta = delta(now, last);
788792
last = now;
789793
return delta;
@@ -968,6 +972,12 @@ public interface JmxTimerMBean extends JmxMeterMBean
968972
long[] getRecentValues();
969973

970974
String getDurationUnit();
975+
976+
String bucketsId();
977+
978+
long[] rawBuckets(int count);
979+
980+
long[] rawValues();
971981
}
972982

973983
static class JmxTimer extends JmxMeter implements JmxTimerMBean
@@ -1049,7 +1059,10 @@ public double get999thPercentile()
10491059
@Override
10501060
public long[] values()
10511061
{
1052-
return metric.getSnapshot().getValues();
1062+
long[] values = metric.getSnapshot().getValues();
1063+
if (metric.bucketStrategy() == CassandraReservoir.BucketStrategy.log_linear)
1064+
values = metric.bucketStrategy().translateTo(CassandraReservoir.BucketStrategy.exp_12_nozero, values);
1065+
return values;
10531066
}
10541067

10551068
/**
@@ -1063,7 +1076,10 @@ public long[] values()
10631076
@Override
10641077
public synchronized long[] getRecentValues()
10651078
{
1066-
long[] now = metric.getSnapshot().getValues();
1079+
long[] now = values();
1080+
if (!metric.isCumulative())
1081+
return now;
1082+
10671083
long[] delta = delta(now, last);
10681084
last = now;
10691085
return delta;
@@ -1074,6 +1090,24 @@ public String getDurationUnit()
10741090
{
10751091
return durationUnit;
10761092
}
1093+
1094+
@Override
1095+
public String bucketsId()
1096+
{
1097+
return metric.bucketStrategy().name();
1098+
}
1099+
1100+
@Override
1101+
public long[] rawBuckets(int count)
1102+
{
1103+
return metric.bucketStarts(count);
1104+
}
1105+
1106+
@Override
1107+
public long[] rawValues()
1108+
{
1109+
return metric.getSnapshot().getValues();
1110+
}
10771111
}
10781112

10791113
/**

src/java/org/apache/cassandra/metrics/LatencyMetrics.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
import com.google.common.collect.Lists;
2525

2626
import com.codahale.metrics.Counter;
27-
import com.codahale.metrics.Reservoir;
2827
import com.codahale.metrics.Snapshot;
29-
import com.codahale.metrics.Timer;
3028

3129
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
3230

@@ -169,12 +167,11 @@ public void release()
169167
Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"));
170168
}
171169

172-
public class LatencyMetricsTimer extends Timer
170+
public class LatencyMetricsTimer extends OverrideTimer implements org.apache.cassandra.metrics.Timer
173171
{
174-
175172
long releasedLatencyCount = 0;
176173

177-
public LatencyMetricsTimer(Reservoir reservoir)
174+
public LatencyMetricsTimer(CassandraReservoir reservoir)
178175
{
179176
super(reservoir);
180177
}

0 commit comments

Comments
 (0)