Skip to content

Commit e14816e

Browse files
committed
Journal reads must select segments before sstables to avoid compaction races
Also Fix Cassandra: - In memory size calculation for CommandsForKey include Unmanaged - Accord load out-of-band cleanup should use SafeRedundantBefore ALso Improve Cassandra: - Report replay information on begin replay - Improve AccordService shutdown - Log command store RedundantBefore on shutdown - Segment compaction should wait for readOrder barrier to replace segments, for additional safety - Journal segments should share readOrder with sstables Also Improve Accord: - Iterate LocalListeners in order, so can query more effectively on node - Refine AbstractReplay.minReplay/shouldReplay patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21804
1 parent c295c33 commit e14816e

File tree

17 files changed

+193
-142
lines changed

17 files changed

+193
-142
lines changed

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ private ListenersDepsTable()
692692
" waiting_until text,\n" +
693693
" waiter 'TxnIdUtf8Type',\n" +
694694
" PRIMARY KEY (command_store_id, waiting_on, waiting_until, waiter)" +
695-
')', Int32Type.instance), FAIL, UNSORTED, ASC);
695+
')', Int32Type.instance), FAIL, ASC, ASC);
696696
}
697697

698698
@Override
@@ -726,10 +726,15 @@ private void addRows(SafeCommandStore safeStore, RowsCollector rows)
726726
LocalListeners listeners = safeStore.commandStore().unsafeGetListeners();
727727
for (LocalListeners.TxnListener listener : listeners.txnListeners())
728728
{
729-
rows.add(listener.waitingOn.toString(), listener.awaitingStatus.name(), listener.waiter.toString())
729+
rows.add(listener.waitingOn.toString(), ordered(listener.awaitingStatus), listener.waiter.toString())
730730
.eagerCollect(ignore -> {});
731731
}
732732
}
733+
734+
private String ordered(SaveStatus saveStatus)
735+
{
736+
return (saveStatus.ordinal() <= 9 ? "0" : "") + saveStatus.ordinal() + '_' + saveStatus.name();
737+
}
733738
}
734739

735740
public static final class ListenersLocalTable extends AbstractLazyVirtualTable
@@ -1796,7 +1801,10 @@ protected void applyRowUpdate(Object[] partitionKeys, Object[] clusteringKeys, C
17961801
case TRY_EXECUTE:
17971802
run(txnId, commandStoreId, safeStore -> {
17981803
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
1799-
Commands.maybeExecute(safeStore, safeCommand, safeCommand.current(), true, true, NotifyWaitingOnPlus.adapter(ignore -> {}, true, true));
1804+
Command command = safeCommand.current();
1805+
if (command.saveStatus() == SaveStatus.Applying)
1806+
return Commands.applyChain(safeStore, (Command.Executed) command);
1807+
Commands.maybeExecute(safeStore, safeCommand, command, true, true, NotifyWaitingOnPlus.adapter(ignore -> {}, true, true));
18001808
return AsyncChains.success(null);
18011809
});
18021810
break;
@@ -1896,7 +1904,8 @@ private void run(TxnId txnId, int commandStoreId, Function<SafeCommandStore, Asy
18961904
AccordService.getBlocking(accord.node()
18971905
.commandStores()
18981906
.forId(commandStoreId)
1899-
.chain(PreLoadContext.contextFor(txnId, TXN_OPS), apply));
1907+
.chain(PreLoadContext.contextFor(txnId, TXN_OPS), apply)
1908+
.flatMap(i -> i));
19001909
}
19011910

19021911
private void cleanup(TxnId txnId, int commandStoreId, Cleanup cleanup)

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public class Journal<K, V> implements Shutdownable
121121

122122
private final FlusherCallbacks flusherCallbacks;
123123

124-
final OpOrder readOrder = new OpOrder();
124+
final OpOrder readOrder;
125125

126126
private class FlusherCallbacks implements Flusher.Callbacks
127127
{
@@ -177,14 +177,16 @@ public Journal(String name,
177177
Params params,
178178
KeySupport<K> keySupport,
179179
ValueSerializer<K, V> valueSerializer,
180-
SegmentCompactor<K, V> segmentCompactor)
180+
SegmentCompactor<K, V> segmentCompactor,
181+
OpOrder readOrder)
181182
{
182183
this.name = name;
183184
this.directory = directory;
184185
this.params = params;
185186

186187
this.keySupport = keySupport;
187188
this.valueSerializer = valueSerializer;
189+
this.readOrder = readOrder;
188190

189191
this.metrics = new Metrics<>(name);
190192
this.flusherCallbacks = new FlusherCallbacks();
@@ -357,15 +359,25 @@ public V readLast(K id)
357359
return null;
358360
}
359361

360-
public void readAll(K id, RecordConsumer<K> consumer)
362+
public static <K, V> void readAll(K id, RecordConsumer<K> consumer, OpOrder.Group readGroup, Segments<K, V> segments)
361363
{
362364
EntrySerializer.EntryHolder<K> holder = new EntrySerializer.EntryHolder<>();
363-
try (OpOrder.Group group = readOrder.start())
365+
for (Segment<K, V> segment : segments.allSorted(false))
364366
{
365-
for (Segment<K, V> segment : segments.get().allSorted(false))
366-
{
367-
segment.readAll(id, holder, consumer);
368-
}
367+
segment.readAll(id, holder, consumer);
368+
}
369+
}
370+
371+
public void readAll(K id, RecordConsumer<K> consumer, OpOrder.Group readGroup)
372+
{
373+
readAll(id, consumer, readGroup, segments.get());
374+
}
375+
376+
public void readAll(K id, RecordConsumer<K> consumer)
377+
{
378+
try (OpOrder.Group readGroup = readOrder.start())
379+
{
380+
readAll(id, consumer, readGroup);
369381
}
370382
}
371383

@@ -449,18 +461,15 @@ public V readFirstMatching(K id, Predicate<V> condition)
449461
* @return true if the record was found, false otherwise
450462
*/
451463
@SuppressWarnings("unused")
452-
public boolean readLast(K id, RecordConsumer<K> consumer)
464+
public static <K, V> boolean readLast(K id, RecordConsumer<K> consumer, OpOrder.Group readOrder, Segments<K, V> segments)
453465
{
454-
try (OpOrder.Group group = readOrder.start())
466+
for (Segment<K, V> segment : segments.allSorted(false))
455467
{
456-
for (Segment<K, V> segment : segments.get().allSorted(false))
457-
{
458-
if (!segment.index().mayContainId(id))
459-
continue;
468+
if (!segment.index().mayContainId(id))
469+
continue;
460470

461-
if (segment.readLast(id, consumer))
462-
return true;
463-
}
471+
if (segment.readLast(id, consumer))
472+
return true;
464473
}
465474
return false;
466475
}
@@ -728,7 +737,7 @@ ReferencedSegments<K, V> selectAndReference(Predicate<Segment<K,V>> selector)
728737
}
729738
}
730739

731-
Segments<K, V> segments()
740+
public Segments<K, V> segments()
732741
{
733742
return segments.get();
734743
}

src/java/org/apache/cassandra/journal/Segments.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* <p/>
3232
* TODO (performance, expected): an interval/range structure for StaticSegment lookup based on min/max key bounds
3333
*/
34-
class Segments<K, V>
34+
public class Segments<K, V>
3535
{
3636
private final Long2ObjectHashMap<Segment<K, V>> segments;
3737
private SortedArrayList<Segment<K, V>> sorted;

src/java/org/apache/cassandra/service/accord/AccordCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ public Shrink decideFullShrink(TxnId txnId, Command value)
13211321
try (DataInputBuffer buf = new DataInputBuffer(buffer, false))
13221322
{
13231323
builder.deserializeNext(buf, Version.LATEST);
1324-
return builder.construct(commandStore.unsafeGetRedundantBefore());
1324+
return builder.construct(commandStore.safeGetRedundantBefore());
13251325
}
13261326
catch (UnknownTableException e)
13271327
{

src/java/org/apache/cassandra/service/accord/AccordCommandStore.java

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ CommandsForKey loadCommandsForKey(RoutableKey key)
317317
CommandsForKey cfk = CommandsForKeyAccessor.load(id, (TokenKey) key);
318318
if (cfk == null)
319319
return null;
320-
RedundantBefore.QuickBounds bounds = unsafeGetRedundantBefore().get(key);
320+
RedundantBefore.QuickBounds bounds = safeGetRedundantBefore().get(key);
321321
if (bounds == null)
322322
return cfk; // TODO (required): I don't think this should be possible? but we hit it on some test
323323
return cfk.withGcBeforeAtLeast(bounds.gcBefore, false);
@@ -420,7 +420,7 @@ public void appendCommands(List<CommandUpdate> diffs, Runnable onFlush)
420420
@VisibleForTesting
421421
public Command loadCommand(TxnId txnId)
422422
{
423-
return journal.loadCommand(id, txnId, unsafeGetRedundantBefore(), durableBefore());
423+
return journal.loadCommand(id, txnId, safeGetRedundantBefore(), durableBefore());
424424
}
425425

426426
@VisibleForTesting
@@ -446,12 +446,12 @@ public static Command prepareToCache(Command command)
446446

447447
public Command.Minimal loadMinimal(TxnId txnId)
448448
{
449-
return journal.loadMinimal(id, txnId, unsafeGetRedundantBefore(), durableBefore());
449+
return journal.loadMinimal(id, txnId, safeGetRedundantBefore(), durableBefore());
450450
}
451451

452452
public Command.MinimalWithDeps loadMinimalWithDeps(TxnId txnId)
453453
{
454-
return journal.loadMinimalWithDeps(id, txnId, unsafeGetRedundantBefore(), durableBefore());
454+
return journal.loadMinimalWithDeps(id, txnId, safeGetRedundantBefore(), durableBefore());
455455
}
456456

457457
public AccordCompactionInfo getCompactionInfo()
@@ -465,17 +465,22 @@ public AccordCompactionInfo getCompactionInfo()
465465
return new AccordCompactionInfo(id, redundantBefore, ranges, tableId);
466466
}
467467

468+
public final RedundantBefore safeGetRedundantBefore()
469+
{
470+
return safeRedundantBefore.redundantBefore;
471+
}
472+
468473
public RangeSearcher rangeSearcher()
469474
{
470475
return rangeSearcher;
471476
}
472477

473478
public AccordCommandStoreReplayer replayer()
474479
{
475-
boolean replayOnlyDurable = true;
480+
boolean replayOnlyNonDurable = true;
476481
if (journal instanceof AccordJournal)
477-
replayOnlyDurable = ((AccordJournal)journal).configuration().replayMode() == ONLY_NON_DURABLE;
478-
return new AccordCommandStoreReplayer(this, replayOnlyDurable);
482+
replayOnlyNonDurable = ((AccordJournal)journal).configuration().replayMode() == ONLY_NON_DURABLE;
483+
return new AccordCommandStoreReplayer(this, replayOnlyNonDurable);
479484
}
480485

481486
static final AtomicLong nextDurabilityLoggingId = new AtomicLong();
@@ -540,15 +545,22 @@ public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore)
540545
super.unsafeUpsertRedundantBefore(addRedundantBefore);
541546
}
542547

548+
@VisibleForTesting
549+
public void unsafeUpdateRangesForEpoch()
550+
{
551+
super.unsafeUpdateRangesForEpoch();
552+
safeRedundantBefore = new SafeRedundantBefore(0, unsafeGetRedundantBefore());
553+
}
554+
543555
public static class AccordCommandStoreReplayer extends AbstractReplayer
544556
{
545-
private final AccordCommandStore store;
557+
private final AccordCommandStore commandStore;
546558
private final boolean onlyNonDurable;
547559

548-
private AccordCommandStoreReplayer(AccordCommandStore store, boolean onlyNonDurable)
560+
private AccordCommandStoreReplayer(AccordCommandStore commandStore, boolean onlyNonDurable)
549561
{
550-
super(store.unsafeGetRedundantBefore());
551-
this.store = store;
562+
super(commandStore, null);
563+
this.commandStore = commandStore;
552564
this.onlyNonDurable = onlyNonDurable;
553565
}
554566

@@ -558,7 +570,7 @@ public AsyncChain<Route> replay(TxnId txnId)
558570
if (onlyNonDurable && !maybeShouldReplay(txnId))
559571
return AsyncChains.success(null);
560572

561-
return store.chain(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
573+
return commandStore.chain(PreLoadContext.contextFor(txnId, "Replay"), safeStore -> {
562574
if (onlyNonDurable && !shouldReplay(txnId, safeStore.unsafeGet(txnId).current().participants()))
563575
return null;
564576

@@ -574,12 +586,16 @@ public AsyncChain<Route> replay(TxnId txnId)
574586

575587
void maybeLoadRedundantBefore(RedundantBefore redundantBefore)
576588
{
589+
Invariants.require(safeRedundantBefore == null);
577590
if (redundantBefore != null)
578591
{
579592
loadRedundantBefore(redundantBefore);
580-
Invariants.require(safeRedundantBefore == null);
581593
safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore);
582594
}
595+
else
596+
{
597+
safeRedundantBefore = new SafeRedundantBefore(0, this.unsafeGetRedundantBefore());
598+
}
583599
}
584600

585601
void maybeLoadBootstrapBeganAt(NavigableMap<TxnId, Ranges> bootstrapBeganAt)

src/java/org/apache/cassandra/service/accord/AccordCommandStores.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.List;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.stream.Stream;
2324

2425
import accord.api.Agent;
2526
import accord.api.DataStore;
@@ -33,6 +34,7 @@
3334
import accord.utils.RandomSource;
3435
import org.apache.cassandra.cache.CacheSize;
3536
import org.apache.cassandra.concurrent.ScheduledExecutors;
37+
import org.apache.cassandra.concurrent.Shutdownable;
3638
import org.apache.cassandra.config.AccordSpec.QueueShardModel;
3739
import org.apache.cassandra.config.DatabaseDescriptor;
3840
import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
@@ -43,8 +45,9 @@
4345
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITHOUT_LOCK;
4446
import static org.apache.cassandra.service.accord.AccordExecutor.Mode.RUN_WITH_LOCK;
4547
import static org.apache.cassandra.service.accord.AccordExecutor.constant;
48+
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
4649

47-
public class AccordCommandStores extends CommandStores implements CacheSize
50+
public class AccordCommandStores extends CommandStores implements CacheSize, Shutdownable
4851
{
4952
private final AccordExecutor[] executors;
5053
private final int mask;
@@ -206,25 +209,37 @@ public void waitForQuiescence()
206209
executor.waitForQuiescence();
207210
}
208211

212+
@Override
213+
public boolean isTerminated()
214+
{
215+
return Stream.of(executors).allMatch(AccordExecutor::isTerminated);
216+
}
217+
209218
@Override
210219
public synchronized void shutdown()
211220
{
212221
super.shutdown();
213222
for (AccordExecutor executor : executors)
214-
{
215223
executor.shutdown();
216-
}
224+
}
225+
226+
@Override
227+
public Object shutdownNow()
228+
{
229+
shutdown();
230+
return null;
231+
}
232+
233+
@Override
234+
public boolean awaitTermination(long timeout, TimeUnit units) throws InterruptedException
235+
{
236+
long deadline = nanoTime() + units.toNanos(timeout);
217237
for (AccordExecutor executor : executors)
218238
{
219-
try
220-
{
221-
executor.awaitTermination(1, TimeUnit.MINUTES);
222-
}
223-
catch (InterruptedException e)
224-
{
225-
throw new RuntimeException(e);
226-
}
239+
long wait = Math.max(1, deadline - nanoTime());
240+
if (!executor.awaitTermination(wait, TimeUnit.NANOSECONDS))
241+
return false;
227242
}
228-
//TODO (expected): shutdown isn't useful by itself, we need a way to "wait" as well. Should be AutoCloseable or offer awaitTermination as well (think Shutdownable interface)
243+
return true;
229244
}
230245
}

0 commit comments

Comments
 (0)