Skip to content

Commit f6fd495

Browse files
committed
Improve DurabilityQueue:
- All retries are appended to a delay queue so overlaps can be pruned - Quorum successes are not retried if there are superseding sync points covering the ranges - User-initiated requests are not purged unless the request has timed out or otherwise completed - Overlapping requests are queued up against the next to run Alsp (C*): - Catch-up with quorums on restart - Manage an ordered set of keys in cache for faster range searches Also (Accord): - Update copy of BTree and import IntervalBTree - Fix RedundantStatus WAS_OWNED_OVERRIDE_MASK - Add Catchup mechanism to reach parity with a quorum on restart patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21013
1 parent 4b7ef7c commit f6fd495

File tree

14 files changed

+702
-89
lines changed

14 files changed

+702
-89
lines changed

modules/accord

Submodule accord updated 52 files

src/java/org/apache/cassandra/config/AccordSpec.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,13 @@ public enum TransactionalRangeMigration
178178
public TransactionalMode default_transactional_mode = TransactionalMode.off;
179179
public boolean ephemeralReadEnabled = true;
180180
public boolean state_cache_listener_jfr_enabled = true;
181+
182+
public DurationSpec.IntSecondsBound catchup_on_start_success_latency = new DurationSpec.IntSecondsBound(60);
183+
public DurationSpec.IntSecondsBound catchup_on_start_fail_latency = new DurationSpec.IntSecondsBound(900);
184+
public int catchup_on_start_max_attempts = 5;
185+
public boolean catchup_on_start_exit_on_failure = true;
186+
public boolean catchup_on_start = true;
187+
181188
public final JournalSpec journal = new JournalSpec();
182189

183190
public enum MixedTimeSourceHandling

src/java/org/apache/cassandra/service/accord/AccordCache.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,17 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.ArrayList;
2323
import java.util.Collection;
24+
import java.util.Comparator;
2425
import java.util.IdentityHashMap;
2526
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
28-
import java.util.Set;
2929
import java.util.concurrent.CopyOnWriteArrayList;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.function.BiConsumer;
3232
import java.util.function.BiFunction;
3333
import java.util.function.Function;
3434
import java.util.function.ToLongFunction;
35-
import java.util.stream.Stream;
36-
3735
import javax.annotation.Nullable;
3836

3937
import com.google.common.annotations.VisibleForTesting;
@@ -68,6 +66,7 @@
6866
import org.apache.cassandra.utils.NoSpamLogger;
6967
import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
7068
import org.apache.cassandra.utils.ObjectSizes;
69+
import org.apache.cassandra.utils.btree.BTree;
7170

7271
import static accord.utils.Invariants.illegalState;
7372
import static accord.utils.Invariants.require;
@@ -112,6 +111,7 @@ public interface Adapter<K, V, S>
112111
long estimateShrunkHeapSize(Object shrunk);
113112
boolean validate(AccordCommandStore commandStore, K key, V value);
114113
S safeRef(AccordCacheEntry<K, V> node);
114+
default Comparator<K> keyComparator() { return null; }
115115

116116
default AccordCacheEntry<K, V> newEntry(K key, AccordCache.Type<K, V, ?>.Instance owner)
117117
{
@@ -274,7 +274,7 @@ public void saveWhenReadyExclusive(AccordCacheEntry<?, ?> entry, Runnable onSucc
274274
entry.savingOrWaitingToSave().identity.onSuccess(onSuccess);
275275
}
276276

277-
private void evict(AccordCacheEntry<?, ?> node, boolean updateUnreferenced)
277+
private <K> void evict(AccordCacheEntry<K, ?> node, boolean updateUnreferenced)
278278
{
279279
if (logger.isTraceEnabled())
280280
logger.trace("Evicting {}", node);
@@ -297,7 +297,7 @@ private void evict(AccordCacheEntry<?, ?> node, boolean updateUnreferenced)
297297
if (node.status() == LOADED && VALIDATE_LOAD_ON_EVICT)
298298
owner.validateLoadEvicted(node);
299299

300-
AccordCacheEntry<?, ?> self = node.owner.cache.remove(node.key());
300+
AccordCacheEntry<K, ?> self = node.owner.remove(node.key());
301301
Invariants.require(self.references() == 0);
302302
require(self == node, "Leaked node detected; was attempting to remove %s but cache had %s", node, self);
303303
node.notifyListeners(Listener::onEvict);
@@ -400,6 +400,8 @@ public class Instance implements Iterable<AccordCacheEntry<K, V>>
400400
// TODO (desired): don't need to store key separately as stored in node; ideally use a hash set that allows us to get the current entry
401401
private final Map<K, AccordCacheEntry<K, V>> cache = new Object2ObjectHashMap<>();
402402
private List<Listener<K, V>> listeners = null;
403+
// TODO (expected): update this after releasing the lock
404+
private OrderedKeys<K> orderedKeys;
403405

404406
public Instance(AccordCommandStore commandStore)
405407
{
@@ -435,7 +437,6 @@ public void recordPreAcquired(AccordSafeState<K, V> ref)
435437

436438
private AccordCacheEntry<K, V> acquire(K key, boolean onlyIfLoaded)
437439
{
438-
@SuppressWarnings("unchecked")
439440
AccordCacheEntry<K, V> node = cache.get(key);
440441
return node == null
441442
? acquireAbsent(key, onlyIfLoaded)
@@ -454,8 +455,10 @@ private AccordCacheEntry<K, V> acquireAbsent(K key, boolean onlyIfLoaded)
454455
node.increment();
455456

456457
Object prev = cache.put(key, node);
457-
node.initSize(parent());
458458
Invariants.require(prev == null, "%s not absent from cache: %s already present", key, node);
459+
if (orderedKeys != null)
460+
orderedKeys.add(key);
461+
node.initSize(parent());
459462
++size;
460463
node.notifyListeners(Listener::onAdd);
461464
maybeShrinkOrEvictSomeNodes();
@@ -555,16 +558,27 @@ else if (node.isLoadingOrWaiting())
555558
maybeShrinkOrEvictSomeNodes();
556559
}
557560

558-
public Stream<AccordCacheEntry<K, V>> stream()
561+
AccordCacheEntry<K, ?> remove(K key)
559562
{
560-
return cache.values().stream();
563+
AccordCacheEntry<K, ?> result = cache.remove(key);
564+
if (orderedKeys != null && result != null)
565+
orderedKeys.remove(key);
566+
return result;
561567
}
562568

563569
final Type<K, V, S> parent()
564570
{
565571
return Type.this;
566572
}
567573

574+
public Iterable<K> keysBetween(K start, boolean startInclusive, K end, boolean endInclusive)
575+
{
576+
if (orderedKeys == null)
577+
orderedKeys = new OrderedKeys<>(adapter.keyComparator(), cache.keySet());
578+
579+
return orderedKeys.between(start, startInclusive, end, endInclusive);
580+
}
581+
568582
@Override
569583
public Iterator<AccordCacheEntry<K, V>> iterator()
570584
{
@@ -602,11 +616,6 @@ public AccordCacheEntry<K, V> getUnsafe(K key)
602616
return cache.get(key);
603617
}
604618

605-
public Set<K> keySet()
606-
{
607-
return cache.keySet();
608-
}
609-
610619
@VisibleForTesting
611620
public boolean isReferenced(K key)
612621
{
@@ -1039,6 +1048,12 @@ public AccordCacheEntry<K, V> newEntry(K key, Type<K, V, ?>.Instance owner)
10391048
{
10401049
return newNode.apply(key, owner);
10411050
}
1051+
1052+
@Override
1053+
public Comparator<K> keyComparator()
1054+
{
1055+
return Comparator.comparing(a -> ((Comparable) a));
1056+
}
10421057
}
10431058

10441059
static class SettableWrapper<K, V, S> extends FunctionalAdapter<K, V, S>
@@ -1148,6 +1163,12 @@ public AccordSafeCommandsForKey safeRef(AccordCacheEntry<RoutingKey, CommandsFor
11481163
{
11491164
return new AccordSafeCommandsForKey(node);
11501165
}
1166+
1167+
@Override
1168+
public Comparator<RoutingKey> keyComparator()
1169+
{
1170+
return RoutingKey::compareAsRoutingKey;
1171+
}
11511172
}
11521173

11531174
public static class CommandAdapter implements Adapter<TxnId, Command, AccordSafeCommand>

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import accord.local.SafeCommandStore;
5656
import accord.local.cfk.CommandsForKey;
5757
import accord.primitives.PartialTxn;
58+
import accord.primitives.Range;
5859
import accord.primitives.Ranges;
5960
import accord.primitives.RoutableKey;
6061
import accord.primitives.Route;
@@ -490,12 +491,16 @@ class Ready extends CountingResult implements Runnable
490491
Ready ready = new Ready();
491492
try (ExclusiveCaches caches = lockCaches())
492493
{
493-
for (AccordCacheEntry<RoutingKey, CommandsForKey> e : caches.commandsForKeys())
494+
for (Range range : ranges)
494495
{
495-
if (ranges.contains(e.key()) && e.isModified())
496+
for (RoutingKey k : caches.commandsForKeys().keysBetween(range.start(), range.startInclusive(), range.end(), range.endInclusive()))
496497
{
497-
ready.increment();
498-
caches.global().saveWhenReadyExclusive(e, ready);
498+
AccordCacheEntry<RoutingKey, CommandsForKey> e = caches.commandsForKeys().getUnsafe(k);
499+
if (e.isModified())
500+
{
501+
ready.increment();
502+
caches.global().saveWhenReadyExclusive(e, ready);
503+
}
499504
}
500505
}
501506
}

0 commit comments

Comments
 (0)