Skip to content

Commit a813a51

Browse files
committed
Improve Observability:
- Track all active Coordinations - Refactor Replica/Coordinator metrics and report Coordinator exhausted/preempted/timeout - DurabilityQueue metrics and visibility Also Fix: - WaitingState can get cause distributed stall when asked to wait for CanApply if not yet PreCommitted; track separate querying state and advance this to the next achievable state rather than the desired final state - Stalled coordinators should not prevent recovery - Edge case with fetch unable to make progress when pre-bootstrap and all peers have GC'd - Dependency initialisation for sync points across certain ownership changes - SyncPoint propagation may not include all of the epochs required on the receiving node for ranges they have lost but not closed, and receiving node does not validate them - Stable tracker accounting with LocalExecute - Do not prune non-durable APPLIED as must be reported in dependencies until durably applied (so as not to break recovery) - Ensure we cannot race with replies when initiating Coordination - ProgressLog does not guarantee to clear home or waiting states when erased or invalidated by compaction - WaitingState on non-home shard cannot guarantee progress once home shard is Erased - WaitingOnSync handles retired ranges incorrectly Also Improve: - Standardise failure accounting, use null to represent single reply timeouts - BurnTest record/replay to/from file patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20878
1 parent eeb227b commit a813a51

File tree

77 files changed

+2559
-711
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+2559
-711
lines changed

modules/accord

Submodule accord updated 154 files

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
import com.google.common.collect.BoundType;
4242
import com.google.common.collect.Range;
4343
import com.google.common.collect.Sets;
44+
45+
import accord.coordinate.AbstractCoordination;
46+
import accord.coordinate.Coordination;
47+
import accord.coordinate.Coordinations;
48+
import accord.coordinate.PrepareRecovery;
49+
import accord.coordinate.tracking.AbstractTracker;
50+
import accord.utils.SortedListMap;
4451
import org.apache.cassandra.cql3.Operator;
4552
import org.apache.cassandra.db.EmptyIterators;
4653
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
@@ -57,7 +64,6 @@
5764
import accord.coordinate.FetchData;
5865
import accord.coordinate.FetchRoute;
5966
import accord.coordinate.MaybeRecover;
60-
import accord.coordinate.RecoverWithRoute;
6167
import accord.impl.CommandChange;
6268
import accord.impl.progresslog.DefaultProgressLog;
6369
import accord.impl.progresslog.TxnStateKind;
@@ -151,6 +157,8 @@
151157

152158
public class AccordDebugKeyspace extends VirtualKeyspace
153159
{
160+
public static final String COORDINATIONS = "coordinations";
161+
public static final String EXECUTORS = "executors";
154162
public static final String COMMANDS_FOR_KEY = "commands_for_key";
155163
public static final String COMMANDS_FOR_KEY_UNMANAGED = "commands_for_key_unmanaged";
156164
public static final String DURABILITY_SERVICE = "durability_service";
@@ -173,6 +181,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
173181
private AccordDebugKeyspace()
174182
{
175183
super(VIRTUAL_ACCORD_DEBUG, List.of(
184+
new ExecutorsTable(),
185+
new CoordinationsTable(),
176186
new CommandsForKeyTable(),
177187
new CommandsForKeyUnmanagedTable(),
178188
new DurabilityServiceTable(),
@@ -192,6 +202,118 @@ private AccordDebugKeyspace()
192202
));
193203
}
194204

205+
// TODO (desired): human readable packed key tracker (but requires loading Txn, so might be preferable to only do conditionally)
206+
public static final class ExecutorsTable extends AbstractVirtualTable
207+
{
208+
private ExecutorsTable()
209+
{
210+
super(parse(VIRTUAL_ACCORD_DEBUG, EXECUTORS,
211+
"Accord Executor State",
212+
"CREATE TABLE %s (\n" +
213+
" executor_id int,\n" +
214+
" status text,\n" +
215+
" position int,\n" +
216+
" unique_position int,\n" +
217+
" description text,\n" +
218+
" command_store_id int,\n" +
219+
" txn_id 'TxnIdUtf8Type',\n" +
220+
" txn_id_additional 'TxnIdUtf8Type',\n" +
221+
" keys text,\n" +
222+
" keysLoad text,\n" +
223+
" keysLoadFor text,\n" +
224+
" PRIMARY KEY (executor_id, status, position, unique_position)" +
225+
')', UTF8Type.instance));
226+
}
227+
228+
@Override
229+
public DataSet data()
230+
{
231+
AccordCommandStores commandStores = (AccordCommandStores) AccordService.instance().node().commandStores();
232+
SimpleDataSet ds = new SimpleDataSet(metadata());
233+
234+
for (AccordExecutor executor : commandStores.executors())
235+
{
236+
int uniquePos = 0;
237+
int executorId = executor.executorId();
238+
AccordExecutor.TaskInfo prev = null;
239+
for (AccordExecutor.TaskInfo info : executor.taskSnapshot())
240+
{
241+
if (prev != null && info.status() == prev.status() && info.position() == prev.position()) ++uniquePos;
242+
else uniquePos = 0;
243+
prev = info;
244+
PreLoadContext preLoadContext = info.preLoadContext();
245+
ds.row(executorId, info.status(), info.position(), uniquePos)
246+
.column("description", info.describe())
247+
.column("command_store_id", info.commandStoreId())
248+
.column("txn_id", preLoadContext == null ? null : preLoadContext.primaryTxnId())
249+
.column("txn_id_additional", preLoadContext == null ? null : preLoadContext.additionalTxnId())
250+
.column("keys", preLoadContext == null ? null : preLoadContext.keys())
251+
.column("keysLoad", preLoadContext == null ? null : preLoadContext.loadKeys())
252+
.column("keysLoadFor", preLoadContext == null ? null : preLoadContext.loadKeysFor())
253+
;
254+
}
255+
}
256+
return ds;
257+
}
258+
}
259+
260+
// TODO (desired): human readable packed key tracker (but requires loading Txn, so might be preferable to only do conditionally)
261+
public static final class CoordinationsTable extends AbstractVirtualTable
262+
{
263+
private CoordinationsTable()
264+
{
265+
super(parse(VIRTUAL_ACCORD_DEBUG, COORDINATIONS,
266+
"Accord Coordination State",
267+
"CREATE TABLE %s (\n" +
268+
" txn_id int,\n" +
269+
" kind text,\n" +
270+
" coordination_id int,\n" +
271+
" description text,\n" +
272+
" nodes text,\n" +
273+
" nodes_inflight text,\n" +
274+
" nodes_contacted text,\n" +
275+
" participants text,\n" +
276+
" replies text,\n" +
277+
" tracker text,\n" +
278+
" PRIMARY KEY (txn_id, kind, coordination_id)" +
279+
')', UTF8Type.instance));
280+
}
281+
282+
@Override
283+
public DataSet data()
284+
{
285+
Coordinations coordinations = AccordService.instance().node().coordinations();
286+
SimpleDataSet ds = new SimpleDataSet(metadata());
287+
for (Coordination c : coordinations)
288+
{
289+
ds.row(c.txnId(), c.kind().toString(), c.coordinationId())
290+
.column("nodes", toStringOrNull(c.nodes()))
291+
.column("nodes_inflight", toStringOrNull(c.inflight()))
292+
.column("nodes_contacted", toStringOrNull(c.contacted()))
293+
.column("description", c.describe())
294+
.column("participants", toStringOrNull(c.scope()))
295+
.column("replies", summarise(c.replies()))
296+
.column("tracker", summarise(c.tracker()));
297+
}
298+
return ds;
299+
}
300+
301+
private static String summarise(@Nullable SortedListMap<Node.Id, ?> replies)
302+
{
303+
if (replies == null)
304+
return null;
305+
return AbstractCoordination.summariseReplies(replies, 60);
306+
}
307+
308+
private static String summarise(@Nullable AbstractTracker<?> tracker)
309+
{
310+
if (tracker == null)
311+
return null;
312+
return tracker.summariseTracker();
313+
}
314+
}
315+
316+
195317
// TODO (desired): don't report null as "null"
196318
public static final class CommandsForKeyTable extends AbstractVirtualTable implements AbstractVirtualTable.DataSet
197319
{
@@ -529,7 +651,7 @@ public DataSet data()
529651
private static void addRow(SimpleDataSet ds, int executorId, String scope, AccordCache.ImmutableStats stats)
530652
{
531653
ds.row(executorId, scope)
532-
.column("queries", stats.queries)
654+
.column("queries", stats.hits + stats.misses)
533655
.column("hits", stats.hits)
534656
.column("misses", stats.misses);
535657
}
@@ -1365,7 +1487,7 @@ private void recover(TxnId txnId, @Nullable Route<?> route, AsyncResult.Settable
13651487
Node node = AccordService.instance().node();
13661488
if (Route.isFullRoute(route))
13671489
{
1368-
RecoverWithRoute.recover(node, node.someSequentialExecutor(), txnId, NotKnownToBeInvalid, (FullRoute<?>) route, null, LatentStoreSelector.standard(), (success, fail) -> {
1490+
PrepareRecovery.recover(node, node.someSequentialExecutor(), txnId, NotKnownToBeInvalid, (FullRoute<?>) route, null, LatentStoreSelector.standard(), (success, fail) -> {
13691491
if (fail != null) result.setFailure(fail);
13701492
else result.setSuccess(null);
13711493
}, node.agent().trace(txnId, RECOVER));

src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,94 @@
1818

1919
package org.apache.cassandra.metrics;
2020

21-
import java.util.Map;
22-
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.locks.Lock;
22+
import java.util.function.Function;
23+
import java.util.function.ToLongFunction;
2324

24-
import com.codahale.metrics.Histogram;
25+
import com.codahale.metrics.Gauge;
26+
import org.apache.cassandra.service.accord.AccordExecutor;
27+
import org.apache.cassandra.service.accord.IAccordService;
2528

26-
import static org.apache.cassandra.metrics.CacheMetrics.TYPE_NAME;
29+
import static org.apache.cassandra.metrics.AccordMetricUtils.fromAccordService;
2730
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
2831

29-
public class AccordCacheMetrics extends CacheAccessMetrics
32+
public class AccordCacheMetrics
3033
{
31-
public static final String OBJECT_SIZE = "ObjectSize";
34+
public static final String ACCORD_CACHE = "AccordCache";
35+
public static final AccordCacheMetrics CommandsCacheMetrics = new AccordCacheMetrics("Commands");
36+
public static final AccordCacheMetrics CommandsForKeyCacheMetrics = new AccordCacheMetrics("CommandsForKey");
37+
public static final AccordCacheGlobalMetrics Global = new AccordCacheGlobalMetrics();
3238

33-
public final Histogram objectSize;
39+
// not sure why we create these wrapper objects that can only be instantiated once, but it's a pattern in this package so...
40+
public static class AccordCacheGlobalMetrics
41+
{
42+
final Gauge<Long> usedBytes;
43+
final Gauge<Long> unreferencedBytes;
44+
45+
public AccordCacheGlobalMetrics()
46+
{
47+
DefaultNameFactory factory = new DefaultNameFactory("AccordCache");
48+
this.usedBytes = Metrics.gauge(factory.createMetricName("UsedBytes"), fromAccordService(sumExecutors(executor -> executor.cacheUnsafe().weightedSize()), 0L));
49+
this.unreferencedBytes = Metrics.gauge(factory.createMetricName("UnreferencedBytes"), fromAccordService(sumExecutors(executor -> executor.cacheUnsafe().unreferencedBytes()), 0L));
50+
}
51+
52+
private static Function<IAccordService, Long> sumExecutors(ToLongFunction<AccordExecutor> f)
53+
{
54+
return service -> {
55+
long sum = 0;
56+
for (AccordExecutor executor : service.executors())
57+
sum += f.applyAsLong(executor);
58+
return sum;
59+
};
60+
}
61+
}
62+
63+
public static class Shard
64+
{
65+
public final ShardedHitRate.HitRateShard hitRate;
66+
public final LogLinearHistogram objectSize;
67+
68+
public Shard(ShardedHitRate.HitRateShard hitRate, LogLinearHistogram objectSize)
69+
{
70+
this.hitRate = hitRate;
71+
this.objectSize = objectSize;
72+
}
73+
}
3474

35-
private final Map<String, CacheAccessMetrics> instanceMetrics = new ConcurrentHashMap<>(2);
75+
public final ShardedHitRate hitRate = new ShardedHitRate();
76+
public final ShardedHistogram objectSize;
3677

37-
private final String scope;
78+
public final Gauge<Long> hits;
79+
public final Gauge<Long> misses;
80+
public final Gauge<Long> requests;
81+
public final Gauge<Double> requestRate1m;
82+
public final Gauge<Double> requestRate5m;
83+
public final Gauge<Double> requestRate15m;
84+
public final Gauge<Double> hitRateAllTime;
85+
public final Gauge<Double> hitRate1m;
86+
public final Gauge<Double> hitRate5m;
87+
public final Gauge<Double> hitRate15m;
88+
private final String subTypeName;
3889

39-
public AccordCacheMetrics(String scope)
90+
public AccordCacheMetrics(String subTypeName)
4091
{
41-
super(new DefaultNameFactory(TYPE_NAME, scope));
42-
objectSize = Metrics.histogram(factory.createMetricName(OBJECT_SIZE), false);
43-
this.scope = scope;
92+
DefaultNameFactory factory = new DefaultNameFactory("AccordCache", subTypeName);
93+
this.objectSize = Metrics.shardedHistogram(factory.createMetricName("EntrySize"));
94+
this.hits = Metrics.gauge(factory.createMetricName("Hits"), hitRate::totalHits);
95+
this.misses = Metrics.gauge(factory.createMetricName("Misses"), hitRate::totalMisses);
96+
this.requests = Metrics.gauge(factory.createMetricName("Requests"), hitRate::totalRequests);
97+
this.requestRate1m = Metrics.gauge(factory.createMetricName("Requests"), () -> hitRate.requestsPerSecond(1));
98+
this.requestRate5m = Metrics.gauge(factory.createMetricName("Requests"), () -> hitRate.requestsPerSecond(5));
99+
this.requestRate15m = Metrics.gauge(factory.createMetricName(RatioGaugeSet.FIFTEEN_MINUTE + "RequestRate"), () -> hitRate.requestsPerSecond(15));
100+
this.hitRate1m = Metrics.gauge(factory.createMetricName(RatioGaugeSet.ONE_MINUTE + "HitRate"), () -> hitRate.hitRate(1));
101+
this.hitRate5m = Metrics.gauge(factory.createMetricName(RatioGaugeSet.FIVE_MINUTE + "HitRate"), () -> hitRate.hitRate(5));
102+
this.hitRate15m = Metrics.gauge(factory.createMetricName(RatioGaugeSet.FIFTEEN_MINUTE + "HitRate"), () -> hitRate.hitRate(15));
103+
this.hitRateAllTime = Metrics.gauge(factory.createMetricName("Misses"), hitRate::hitRateAllTime);
104+
this.subTypeName = subTypeName;
44105
}
45106

46-
public CacheAccessMetrics forInstance(Class<?> klass)
107+
public Shard newShard(Lock guardedBy)
47108
{
48-
// cannot make Class<?> hashCode deterministic, as cannot rewrite - so cannot safely use as Map key if want deterministic simulation
49-
// (or we need to create extra hoops to catch this specific case in method rewriting)
50-
return instanceMetrics.computeIfAbsent(klass.getSimpleName(), k -> new CacheAccessMetrics(new DefaultNameFactory(TYPE_NAME, String.format("%s-%s", scope, k))));
109+
return new Shard(hitRate.newShard(guardedBy), objectSize.newShard(guardedBy));
51110
}
52111
}

src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,4 @@ public void release()
7272
Metrics.remove(factory.createMetricName("Preempted"));
7373
Metrics.remove(factory.createMetricName("TopologyMismatches"));
7474
}
75-
7675
}

0 commit comments

Comments
 (0)