18
18
19
19
package org .apache .cassandra .service .accord ;
20
20
21
+ import java .util .ArrayList ;
21
22
import java .util .Comparator ;
23
+ import java .util .HashMap ;
22
24
import java .util .Iterator ;
25
+ import java .util .List ;
23
26
import java .util .Map ;
24
27
import java .util .NavigableMap ;
25
28
import java .util .TreeMap ;
26
29
import java .util .concurrent .atomic .AtomicReference ;
30
+ import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
31
+ import java .util .concurrent .locks .Lock ;
32
+ import java .util .concurrent .locks .ReentrantLock ;
27
33
import java .util .function .BiFunction ;
28
34
import java .util .function .Consumer ;
29
35
import java .util .function .UnaryOperator ;
54
60
import accord .utils .UnhandledEnum ;
55
61
import org .agrona .collections .Object2ObjectHashMap ;
56
62
import org .apache .cassandra .service .accord .api .TokenKey ;
63
+ import org .apache .cassandra .utils .btree .BTreeSet ;
57
64
import org .apache .cassandra .utils .btree .IntervalBTree ;
65
+ import org .apache .cassandra .utils .concurrent .IntrusiveStack ;
58
66
59
67
import static accord .local .CommandSummaries .SummaryStatus .NOT_DIRECTLY_WITNESSED ;
60
68
import static org .apache .cassandra .utils .btree .IntervalBTree .InclusiveEndHelper .endWithStart ;
@@ -122,15 +130,54 @@ public NavigableMap<Timestamp, CommandSummaries.Summary> byTxnId()
122
130
return this ;
123
131
}
124
132
125
- public static class Manager implements AccordCache .Listener <TxnId , Command >
133
+ public static class Manager implements AccordCache .Listener <TxnId , Command >, Runnable
126
134
{
135
+ static class IntervalTreeEdit extends IntrusiveStack <IntervalTreeEdit >
136
+ {
137
+ final TxnId txnId ;
138
+ final @ Nullable Object [] update , remove ;
139
+
140
+ IntervalTreeEdit (TxnId txnId , Object [] update , Object [] remove )
141
+ {
142
+ this .txnId = txnId ;
143
+ this .update = update ;
144
+ this .remove = remove ;
145
+ }
146
+
147
+ public static boolean push (IntervalTreeEdit edit , Manager manager )
148
+ {
149
+ return null == IntrusiveStack .getAndPush (pendingEditsUpdater , manager , edit );
150
+ }
151
+
152
+ public IntervalTreeEdit reverse ()
153
+ {
154
+ return reverse (this );
155
+ }
156
+
157
+ boolean isSize (int size )
158
+ {
159
+ return IntrusiveStack .isSize (size , this );
160
+ }
161
+
162
+ IntervalTreeEdit merge (IntervalTreeEdit next )
163
+ {
164
+ Invariants .require (this .txnId .equals (next .txnId ));
165
+ Object [] remove = this .remove == null ? next .remove : next .remove == null ? this .remove : IntervalBTree .update (this .remove , next .remove , COMPARATORS );
166
+ return new IntervalTreeEdit (txnId , next .update , remove );
167
+ }
168
+ }
169
+
127
170
private final AccordCommandStore commandStore ;
128
171
private final RangeSearcher searcher ;
129
172
private final AtomicReference <NavigableMap <TxnId , Ranges >> transitive = new AtomicReference <>(new TreeMap <>());
130
173
// TODO (desired): manage memory consumed by this auxillary information
131
174
private final Object2ObjectHashMap <TxnId , RangeRoute > cachedRangeTxnsById = new Object2ObjectHashMap <>();
132
175
private Object [] cachedRangeTxnsByRange = IntervalBTree .empty ();
133
176
177
+ private volatile IntervalTreeEdit pendingEdits ;
178
+ private final Lock drainPendingEditsLock = new ReentrantLock ();
179
+ private static final AtomicReferenceFieldUpdater <Manager , IntervalTreeEdit > pendingEditsUpdater = AtomicReferenceFieldUpdater .newUpdater (Manager .class , IntervalTreeEdit .class , "pendingEdits" );
180
+
134
181
public Manager (AccordCommandStore commandStore )
135
182
{
136
183
this .commandStore = commandStore ;
@@ -155,16 +202,93 @@ public void onUpdate(AccordCacheEntry<TxnId, Command> state)
155
202
{
156
203
RangeRoute cur = cachedRangeTxnsById .put (cmd .txnId (), upd );
157
204
if (!upd .equals (cur ))
158
- {
159
- if (cur != null )
160
- remove (txnId , cur );
161
- cachedRangeTxnsByRange = IntervalBTree .update (cachedRangeTxnsByRange , toMap (txnId , upd ), COMPARATORS );
162
- }
205
+ pushEdit (new IntervalTreeEdit (txnId , toMap (txnId , upd ), cur == null ? null : toMap (txnId , cur )));
163
206
}
164
207
}
165
208
}
166
209
}
167
210
211
+ private void pushEdit (IntervalTreeEdit edit )
212
+ {
213
+ if (IntervalTreeEdit .push (edit , this ))
214
+ commandStore .executor ().submitExclusive (this );
215
+ }
216
+
217
+ @ Override
218
+ public void run ()
219
+ {
220
+ if (drainPendingEditsLock .tryLock ())
221
+ {
222
+ try
223
+ {
224
+ drainPendingEditsInternal ();
225
+ }
226
+ finally
227
+ {
228
+ drainPendingEditsLock .unlock ();
229
+ postUnlock ();
230
+ }
231
+ }
232
+ }
233
+
234
+ Object [] cachedRangeTxnsByRange ()
235
+ {
236
+ drainPendingEditsLock .lock ();
237
+ try
238
+ {
239
+ drainPendingEditsInternal ();
240
+ return cachedRangeTxnsByRange ;
241
+ }
242
+ finally
243
+ {
244
+ drainPendingEditsLock .unlock ();
245
+ postUnlock ();
246
+ }
247
+ }
248
+
249
+ void drainPendingEditsInternal ()
250
+ {
251
+ IntervalTreeEdit edits = pendingEditsUpdater .getAndSet (this , null );
252
+ if (edits == null )
253
+ return ;
254
+
255
+ if (edits .isSize (1 ))
256
+ {
257
+ if (edits .remove != null ) cachedRangeTxnsByRange = IntervalBTree .subtract (cachedRangeTxnsByRange , edits .remove , COMPARATORS );
258
+ if (edits .update != null ) cachedRangeTxnsByRange = IntervalBTree .update (cachedRangeTxnsByRange , edits .update , COMPARATORS );
259
+ return ;
260
+ }
261
+
262
+ edits = edits .reverse ();
263
+ Map <TxnId , IntervalTreeEdit > editMap = new HashMap <>();
264
+ for (IntervalTreeEdit edit : edits )
265
+ editMap .merge (edit .txnId , edit , IntervalTreeEdit ::merge );
266
+
267
+ List <TxnIdInterval > update = new ArrayList <>(), remove = new ArrayList <>();
268
+ for (IntervalTreeEdit edit : editMap .values ())
269
+ {
270
+ if (edit .update != null ) update .addAll (BTreeSet .wrap (edit .update , COMPARATORS .totalOrder ()));
271
+ if (edit .remove != null ) remove .addAll (BTreeSet .wrap (edit .remove , COMPARATORS .totalOrder ()));
272
+ }
273
+
274
+ if (!remove .isEmpty ())
275
+ {
276
+ remove .sort (COMPARATORS .totalOrder ());
277
+ cachedRangeTxnsByRange = IntervalBTree .subtract (cachedRangeTxnsByRange , IntervalBTree .build (remove , COMPARATORS ), COMPARATORS );
278
+ }
279
+ if (!update .isEmpty ())
280
+ {
281
+ update .sort (COMPARATORS .totalOrder ());
282
+ cachedRangeTxnsByRange = IntervalBTree .update (cachedRangeTxnsByRange , IntervalBTree .build (update , COMPARATORS ), COMPARATORS );
283
+ }
284
+ }
285
+
286
+ private void postUnlock ()
287
+ {
288
+ if (pendingEdits != null )
289
+ commandStore .executor ().submit (this );
290
+ }
291
+
168
292
@ Override
169
293
public void onEvict (AccordCacheEntry <TxnId , Command > state )
170
294
{
@@ -173,15 +297,10 @@ public void onEvict(AccordCacheEntry<TxnId, Command> state)
173
297
{
174
298
RangeRoute cur = cachedRangeTxnsById .remove (txnId );
175
299
if (cur != null )
176
- remove ( txnId , cur );
300
+ pushEdit ( new IntervalTreeEdit ( txnId , null , toMap ( txnId , cur )) );
177
301
}
178
302
}
179
303
180
- private void remove (TxnId txnId , RangeRoute route )
181
- {
182
- cachedRangeTxnsByRange = IntervalBTree .subtract (cachedRangeTxnsByRange , toMap (txnId , route ), COMPARATORS );
183
- }
184
-
185
304
static Object [] toMap (TxnId txnId , RangeRoute route )
186
305
{
187
306
int size = route .size ();
@@ -199,7 +318,6 @@ static Object[] toMap(TxnId txnId, RangeRoute route)
199
318
}
200
319
}
201
320
}
202
-
203
321
}
204
322
205
323
public CommandsForRanges .Loader loader (@ Nullable TxnId primaryTxnId , KeyHistory keyHistory , Unseekables <?> keysOrRanges )
@@ -331,7 +449,7 @@ public void forEachInCache(Unseekables<?> keysOrRanges, Consumer<Summary> forEac
331
449
{
332
450
for (RoutingKey key : (AbstractUnseekableKeys )keysOrRanges )
333
451
{
334
- IntervalBTree .accumulate (manager .cachedRangeTxnsByRange , KEY_COMPARATORS , key , (f , s , i , c ) -> {
452
+ IntervalBTree .accumulate (manager .cachedRangeTxnsByRange () , KEY_COMPARATORS , key , (f , s , i , c ) -> {
335
453
TxnIdInterval interval = (TxnIdInterval )i ;
336
454
if (isRelevant (interval ))
337
455
{
@@ -349,7 +467,7 @@ public void forEachInCache(Unseekables<?> keysOrRanges, Consumer<Summary> forEac
349
467
{
350
468
for (Range range : (AbstractRanges )keysOrRanges )
351
469
{
352
- IntervalBTree .accumulate (manager .cachedRangeTxnsByRange , COMPARATORS , new TxnIdInterval (range .start (), range .end (), TxnId .NONE ), (f , s , i , c ) -> {
470
+ IntervalBTree .accumulate (manager .cachedRangeTxnsByRange () , COMPARATORS , new TxnIdInterval (range .start (), range .end (), TxnId .NONE ), (f , s , i , c ) -> {
353
471
if (isRelevant (i ))
354
472
{
355
473
TxnId txnId = i .txnId ;
0 commit comments