2525import accord .local .Command ;
2626import accord .local .SafeCommandStore ;
2727import accord .primitives .PartialDeps ;
28- import accord .primitives .Timestamp ;
2928import accord .primitives .TxnId ;
3029import com .codahale .metrics .Counting ;
31- import com .codahale .metrics .Histogram ;
32- import com .codahale .metrics .Timer ;
33- import org .apache .cassandra .service .accord .api .AccordTimeService ;
30+ import org .apache .cassandra .metrics .LogLinearDecayingHistograms .LogLinearDecayingHistogram ;
31+ import org .apache .cassandra .metrics .ShardedDecayingHistograms .DecayingHistogramsShard ;
32+ import org .apache .cassandra .metrics .ShardedDecayingHistograms .ShardedDecayingHistogram ;
33+ import org .apache .cassandra .service .accord .AccordCommandStore ;
34+ import org .apache .cassandra .service .accord .AccordSafeCommandStore ;
3435import org .apache .cassandra .tracing .Tracing ;
3536
3637import static org .apache .cassandra .metrics .CassandraMetricsRegistry .Metrics ;
38+ import static org .apache .cassandra .service .accord .AccordExecutor .HISTOGRAMS ;
39+ import static org .apache .cassandra .utils .Clock .Global .currentTimeMillis ;
3740
3841public class AccordReplicaMetrics
3942{
4043 public final static AccordReplicaMetrics readMetrics = new AccordReplicaMetrics ("ro" );
4144 public final static AccordReplicaMetrics writeMetrics = new AccordReplicaMetrics ("rw" );
45+ public final static AccordReplicaMetrics syncPointMetrics = new AccordReplicaMetrics ("rx" );
4246
4347 public static final String ACCORD_REPLICA = "AccordReplica" ;
4448 public static final String REPLICA_STABLE_LATENCY = "StableLatency" ;
4549 public static final String REPLICA_PREAPPLY_LATENCY = "PreApplyLatency" ;
4650 public static final String REPLICA_APPLY_LATENCY = "ApplyLatency" ;
47- public static final String REPLICA_APPLY_DURATION = "ApplyDuration" ;
4851 public static final String REPLICA_DEPENDENCIES = "Dependencies" ;
4952
53+ static final class SubShard
54+ {
55+ final LogLinearDecayingHistogram stableLatency ;
56+ final LogLinearDecayingHistogram preapplyLatency ;
57+ final LogLinearDecayingHistogram applyLatency ;
58+ final LogLinearDecayingHistogram dependencies ;
59+
60+ private SubShard (AccordReplicaMetrics metrics , DecayingHistogramsShard shard )
61+ {
62+ this .stableLatency = metrics .stableLatency .forShard (shard );
63+ this .preapplyLatency = metrics .preapplyLatency .forShard (shard );
64+ this .applyLatency = metrics .applyLatency .forShard (shard );
65+ this .dependencies = metrics .dependencies .forShard (shard );
66+ }
67+ }
68+
69+ public static final class Shard
70+ {
71+ final SubShard reads , writes , syncPoints ;
72+ public Shard (DecayingHistogramsShard shard )
73+ {
74+ reads = new SubShard (readMetrics , shard );
75+ writes = new SubShard (writeMetrics , shard );
76+ syncPoints = new SubShard (syncPointMetrics , shard );
77+ }
78+ }
79+
5080 /**
5181 * The time between start on the coordinator and commit on this replica.
5282 */
53- public final Timer stableLatency ;
83+ public final ShardedDecayingHistogram stableLatency = HISTOGRAMS . newHistogram ( TimeUnit . SECONDS . toNanos ( 1L )) ;
5484
5585 /**
5686 * The time between start on the coordinator and arrival of the result on this replica.
5787 */
58- public final Timer preapplyLatency ;
88+ public final ShardedDecayingHistogram preapplyLatency = HISTOGRAMS . newHistogram ( TimeUnit . SECONDS . toNanos ( 1L )) ;
5989
6090 /**
6191 * The time between start on the coordinator and application on this replica.
6292 */
63- public final Timer applyLatency ;
64-
65- /**
66- * TODO (expected): probably more interesting is latency from preapplied to apply;
67- * we already track local write latencies, whch this effectively duplicates (but including queueing latencies)
68- * Duration of applying changes.
69- */
70- public final Timer applyDuration ;
93+ public final ShardedDecayingHistogram applyLatency = HISTOGRAMS .newHistogram (TimeUnit .SECONDS .toNanos (1L ));
7194
7295 /**
7396 * A histogram of the number of dependencies per transaction at this replica.
7497 */
75- public final Histogram dependencies ;
98+ public final ShardedDecayingHistogram dependencies = HISTOGRAMS . newHistogram ( 1 << 12 ) ;
7699
77100 private AccordReplicaMetrics (String scope )
78101 {
79102 DefaultNameFactory replica = new DefaultNameFactory (ACCORD_REPLICA , scope );
80- stableLatency = Metrics .timer (replica .createMetricName (REPLICA_STABLE_LATENCY ));
81- preapplyLatency = Metrics .timer (replica .createMetricName (REPLICA_PREAPPLY_LATENCY ));
82- applyLatency = Metrics .timer (replica .createMetricName (REPLICA_APPLY_LATENCY ));
83- applyDuration = Metrics .timer (replica .createMetricName (REPLICA_APPLY_DURATION ));
84- dependencies = Metrics .histogram (replica .createMetricName (REPLICA_DEPENDENCIES ), true );
103+ Metrics .register (replica .createMetricName (REPLICA_STABLE_LATENCY ), stableLatency );
104+ Metrics .register (replica .createMetricName (REPLICA_PREAPPLY_LATENCY ), preapplyLatency );
105+ Metrics .register (replica .createMetricName (REPLICA_APPLY_LATENCY ), applyLatency );
106+ Metrics .register (replica .createMetricName (REPLICA_DEPENDENCIES ), dependencies );
85107 }
86108
87109 @ Override
@@ -106,64 +128,79 @@ public String toString()
106128 {
107129 throw new RuntimeException (e );
108130 }
109- builder .append ("]" );
131+ builder .append (']' );
110132 return builder .toString ();
111133 }
112134
113135 public static class Listener implements ReplicaEventListener
114136 {
115- private AccordReplicaMetrics forTransaction (TxnId txnId )
137+ private SubShard forTransaction (SafeCommandStore safeStore , TxnId txnId )
116138 {
117139 if (txnId != null )
118140 {
141+ Shard shard = ((AccordCommandStore ) safeStore .commandStore ()).executor ().replicaMetrics ;
119142 if (txnId .isWrite ())
120- return writeMetrics ;
143+ return shard . writes ;
121144 else if (txnId .isSomeRead ())
122- return readMetrics ;
145+ return shard .reads ;
146+ else if (txnId .isSyncPoint ())
147+ return shard .syncPoints ;
123148 }
124149 return null ;
125150 }
126151
152+ private static long unixNanos ()
153+ {
154+ return currentTimeMillis () * 1_000_000 ;
155+ }
156+
157+ private static long elapsed (TxnId txnId )
158+ {
159+ return elapsed (unixNanos (), txnId );
160+ }
161+
162+ private static long elapsed (long unixNanos , TxnId txnId )
163+ {
164+ return Math .max (0 , unixNanos - (txnId .hlc () * 1000 ));
165+ }
166+
167+ private static LogLinearDecayingHistograms .Buffer buffer (SafeCommandStore safeStore )
168+ {
169+ return ((AccordSafeCommandStore ) safeStore ).histogramBuffer ();
170+ }
171+
127172 @ Override
128173 public void onStable (SafeCommandStore safeStore , Command cmd )
129174 {
130175 Tracing .trace ("Stable {} on {}" , cmd .txnId (), safeStore .commandStore ());
131- long now = AccordTimeService .nowMicros ();
132- AccordReplicaMetrics metrics = forTransaction (cmd .txnId ());
176+ SubShard metrics = forTransaction (safeStore , cmd .txnId ());
133177 if (metrics != null )
134- {
135- long trxTimestamp = cmd .txnId ().hlc ();
136- metrics .stableLatency .update (now - trxTimestamp , TimeUnit .MICROSECONDS );
137- }
178+ metrics .stableLatency .add (buffer (safeStore ), elapsed (cmd .txnId ()));
138179 }
139180
140181 @ Override
141182 public void onPreApplied (SafeCommandStore safeStore , Command cmd )
142183 {
143184 Tracing .trace ("Preapplied {} on {}" , cmd .txnId (), safeStore .commandStore ());
144- long now = AccordTimeService .nowMicros ();
145- AccordReplicaMetrics metrics = forTransaction (cmd .txnId ());
185+ SubShard metrics = forTransaction (safeStore , cmd .txnId ());
146186 if (metrics != null )
147187 {
148- Timestamp trxTimestamp = cmd .txnId ();
149- metrics .preapplyLatency .update ( now - trxTimestamp . hlc ( ), TimeUnit . MICROSECONDS );
188+ long elapsed = elapsed ( cmd .txnId () );
189+ metrics .preapplyLatency .add ( buffer ( safeStore ), elapsed );
150190 PartialDeps deps = cmd .partialDeps ();
151- metrics .dependencies .update ( deps != null ? deps .txnIdCount () : 0 );
191+ metrics .dependencies .add ( buffer ( safeStore ), deps != null ? deps .txnIdCount () : 0 );
152192 }
153193 }
154194
155195 @ Override
156- public void onApplied (SafeCommandStore safeStore , Command cmd , long applyStartedAt )
196+ public void onApplied (SafeCommandStore safeStore , Command cmd )
157197 {
158198 Tracing .trace ("Applied {} on {}" , cmd .txnId (), safeStore .commandStore ());
159- long now = AccordTimeService .nowMicros ();
160- AccordReplicaMetrics metrics = forTransaction (cmd .txnId ());
199+ SubShard metrics = forTransaction (safeStore , cmd .txnId ());
161200 if (metrics != null )
162201 {
163- Timestamp trxTimestamp = cmd .txnId ();
164- metrics .applyLatency .update (now - trxTimestamp .hlc (), TimeUnit .MICROSECONDS );
165- if (applyStartedAt > 0 )
166- metrics .applyDuration .update (now - applyStartedAt , TimeUnit .MICROSECONDS );
202+ long now = unixNanos ();
203+ metrics .applyLatency .add (buffer (safeStore ), elapsed (now , cmd .txnId ()));
167204 }
168205 }
169206 }
0 commit comments