Skip to content

Commit 4defbb9

Browse files
committed
Make TopologyException a checked exception to ensure they are handled carefully, as they may occur at surprising times
Also Fix: - Restore MaxDecidedRX on replay - When catchup_on_start_exit_on_failure == false, should startup on any kind of failure, not only timeout - lazy vtable LIMIT clause regression - DurabilityService.onEpochRetired - Command.validate when uniqueHlc differs - Avoid unsafe publication of AccordExecutor to scheduledFastTasks - AccordCache hitRate metric names - use long for return type of DurationSpec.toNanoseconds - Repair without all replicas should not request all Accord replicas participate - ExecuteAtSerializer - SyncPoints should be coordinated in an epoch that contains the ranges Also Improve: - Split Accord startup into local+distributed, ensure we - Add logging to FetchDurableBefore on startup - Add randomised testing of AbstractLazyVirtualTable - Add validation of lazy virtual table key ordering - Don't send requests to faulty replicas - shrinkOrEvict large objects without holding lock - Accord dtest shutdown patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21042
1 parent dbfcd52 commit 4defbb9

File tree

90 files changed

+1145
-431
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+1145
-431
lines changed

modules/accord

Submodule accord updated 133 files

src/java/org/apache/cassandra/config/CassandraRelevantProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ public enum CassandraRelevantProperties
228228
DTEST_ACCORD_ENABLED("jvm_dtest.accord.enabled", "true"),
229229
DTEST_ACCORD_JOURNAL_SANITY_CHECK_ENABLED("jvm_dtest.accord.journal_sanity_check_enabled", "false"),
230230
DTEST_API_LOG_TOPOLOGY("cassandra.dtest.api.log.topology"),
231+
DTEST_IGNORE_SHUTDOWN_THREADCOUNT("jvm_dtests.ignore_shutdown_threadcount"),
231232
/** This property indicates if the code is running under the in-jvm dtest framework */
232233
DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
233234
/** In_JVM dtest property indicating that the test should use "latest" configuration */

src/java/org/apache/cassandra/config/DurationSpec.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -587,13 +587,11 @@ public static IntSecondsBound inSecondsString(String value)
587587
}
588588

589589
/**
590-
* Returns this duration in the number of nanoseconds as an {@code int}
591-
*
592-
* @return this duration in number of nanoseconds or {@code Integer.MAX_VALUE} if the number of nanoseconds is too large.
590+
* Returns this duration in the number of nanoseconds as a {@code long}
593591
*/
594-
public int toNanoseconds()
592+
public long toNanoseconds()
595593
{
596-
return Ints.saturatedCast(unit().toNanos(quantity()));
594+
return unit().toNanos(quantity());
597595
}
598596

599597
/**

src/java/org/apache/cassandra/db/compaction/CompactionIterator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,14 @@ public CompactionIterator(OperationType type,
181181
TopPartitionTracker.Collector topPartitionCollector)
182182
{
183183
this(type, scanners, controller, nowInSec, compactionId, activeCompactions, topPartitionCollector,
184-
AccordService.isSetup() ? AccordService.instance() : null);
184+
accord(controller));
185+
}
186+
187+
private static IAccordService accord(AbstractCompactionController controller)
188+
{
189+
IAccordService accord = AccordService.tryGetUnsafe();
190+
Invariants.require(accord != null || (!isAccordJournal(controller.cfs) && !isAccordCommandsForKey(controller.cfs)));
191+
return accord;
185192
}
186193

187194
public CompactionIterator(OperationType type,
@@ -194,7 +201,7 @@ public CompactionIterator(OperationType type,
194201
IAccordService accord)
195202
{
196203
this(type, scanners, controller, nowInSec, compactionId, activeCompactions, topPartitionCollector,
197-
() -> accord.getCompactionInfo(),
204+
() -> Invariants.nonNull(accord).getCompactionInfo(),
198205
() -> Version.fromVersion(accord.journalConfiguration().userVersion()));
199206
}
200207

src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Iterator;
2525
import java.util.Map;
2626
import java.util.NavigableMap;
27+
import java.util.NoSuchElementException;
2728
import java.util.TreeMap;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.function.Consumer;
@@ -223,6 +224,8 @@ public PartitionCollector partition(Object ... partitionKeys)
223224
if (!dataRange.contains(partitionKey))
224225
return dropCks -> {};
225226

227+
228+
226229
return partitions.computeIfAbsent(partitionKey, SimplePartition::new);
227230
}
228231

@@ -232,36 +235,50 @@ public UnfilteredPartitionIterator finish()
232235
final Iterator<SimplePartition> partitions = this.partitions.values().iterator();
233236
return new UnfilteredPartitionIterator()
234237
{
238+
private UnfilteredRowIterator next;
239+
235240
@Override public TableMetadata metadata() { return metadata; }
236241
@Override public void close() {}
237242

238243
@Override
239244
public boolean hasNext()
240245
{
241-
return partitions.hasNext();
246+
while (next == null && partitions.hasNext())
247+
{
248+
SimplePartition partition = partitions.next();
249+
Iterator<Row> rows = partition.rows();
250+
251+
if (!rows.hasNext())
252+
continue;
253+
254+
next = new UnfilteredRowIterator()
255+
{
256+
@Override public TableMetadata metadata() { return metadata; }
257+
@Override public boolean isReverseOrder() { return dataRange.isReversed(); }
258+
@Override public RegularAndStaticColumns columns() { return columnFilter.fetchedColumns(); }
259+
@Override public DecoratedKey partitionKey() { return partition.key; }
260+
261+
@Override public Row staticRow() { return partition.staticRow(); }
262+
@Override public boolean hasNext() { return rows.hasNext(); }
263+
@Override public Unfiltered next() { return rows.next(); }
264+
265+
@Override public void close() {}
266+
@Override public DeletionTime partitionLevelDeletion() { return DeletionTime.LIVE; }
267+
@Override public EncodingStats stats() { return EncodingStats.NO_STATS; }
268+
};
269+
}
270+
return next != null;
242271
}
243272

244273
@Override
245274
public UnfilteredRowIterator next()
246275
{
247-
SimplePartition partition = partitions.next();
248-
Iterator<Row> rows = partition.rows();
276+
if (!hasNext())
277+
throw new NoSuchElementException();
249278

250-
return new UnfilteredRowIterator()
251-
{
252-
@Override public TableMetadata metadata() { return metadata; }
253-
@Override public boolean isReverseOrder() { return dataRange.isReversed(); }
254-
@Override public RegularAndStaticColumns columns() { return columnFilter.fetchedColumns(); }
255-
@Override public DecoratedKey partitionKey() { return partition.key; }
256-
257-
@Override public Row staticRow() { return partition.staticRow(); }
258-
@Override public boolean hasNext() { return rows.hasNext(); }
259-
@Override public Unfiltered next() { return rows.next(); }
260-
261-
@Override public void close() {}
262-
@Override public DeletionTime partitionLevelDeletion() { return DeletionTime.LIVE; }
263-
@Override public EncodingStats stats() { return EncodingStats.NO_STATS; }
264-
};
279+
UnfilteredRowIterator result = next;
280+
next = null;
281+
return result;
265282
}
266283
};
267284
}
@@ -312,9 +329,23 @@ public RowCollector row(Object... primaryKeys)
312329
if (!dataRange.contains(partitionKey) || !dataRange.clusteringIndexFilter(partitionKey).selects(clustering))
313330
return drop -> {};
314331

332+
if (isSortedByPartitionKey)
333+
checkCorrectlySorted(partitionKey);
334+
315335
return partitions.computeIfAbsent(partitionKey, SimplePartition::new).row(clustering);
316336
}
317337

338+
private void checkCorrectlySorted(DecoratedKey newPartitionKey)
339+
{
340+
if (partitions.isEmpty())
341+
return;
342+
343+
DecoratedKey prevKey = partitions.lastKey();
344+
int c = metadata.partitionKeyType.compare(prevKey.getKey(), newPartitionKey.getKey());
345+
if (dataRange.isReversed() ? c < 0 : c > 0)
346+
throw new IllegalArgumentException(Arrays.toString(composePartitionKeys(prevKey, metadata)) + (dataRange.isReversed() ? " < " : " > ") + Arrays.toString(composePartitionKeys(newPartitionKey, metadata)));
347+
}
348+
318349
private final class SimplePartition implements PartitionCollector, RowsCollector
319350
{
320351
private final DecoratedKey key;
@@ -323,6 +354,7 @@ private final class SimplePartition implements PartitionCollector, RowsCollector
323354
private int rowCount;
324355
private SimpleRow staticRow;
325356
private boolean dropRows;
357+
private boolean isSortedAndFiltered = true;
326358

327359
private SimplePartition(DecoratedKey key)
328360
{
@@ -346,6 +378,17 @@ public RowCollector add(Object... clusteringKeys)
346378
return row(decomposeClusterings(metadata, clusteringKeys));
347379
}
348380

381+
private void checkCorrectlySorted(Clustering<?> newClustering)
382+
{
383+
if (rowCount == 0)
384+
return;
385+
386+
Clustering<?> prevClustering = rows[rowCount - 1].clustering;
387+
int c = metadata.comparator.compare(prevClustering, newClustering);
388+
if (dataRange.isReversed() ? c <= 0 : c >= 0)
389+
throw new IllegalArgumentException(Arrays.toString(composeClusterings(prevClustering, metadata)) + (dataRange.isReversed() ? " <= " : " >= ") + Arrays.toString(composeClusterings(newClustering, metadata)));
390+
}
391+
349392
RowCollector row(Clustering<?> clustering)
350393
{
351394
if (nanoTime() > deadlineNanos)
@@ -354,6 +397,9 @@ RowCollector row(Clustering<?> clustering)
354397
if (dropRows || !dataRange.clusteringIndexFilter(key).selects(clustering))
355398
return drop -> {};
356399

400+
if (isSorted)
401+
checkCorrectlySorted(clustering);
402+
357403
if (totalRowCount >= limits.count())
358404
{
359405
boolean filter;
@@ -370,9 +416,13 @@ RowCollector row(Clustering<?> clustering)
370416

371417
if (filter)
372418
{
373-
// first filter within each partition
374419
for (SimplePartition partition : partitions.values())
420+
{
421+
// first filter within each partition
422+
partition.filterAndSort();
423+
// and truncate if there are per-partition limits
375424
partition.truncate(limits.perPartitionCount());
425+
}
376426

377427
// then drop any partitions that completely fall outside our limit
378428
Iterator<SimplePartition> iter = partitions.descendingMap().values().iterator();
@@ -418,12 +468,16 @@ RowCollector row(Clustering<?> clustering)
418468
if (rowCount == rows.length)
419469
rows = Arrays.copyOf(rows, Math.max(8, rowCount * 2));
420470
rows[rowCount++] = result;
471+
isSortedAndFiltered = false;
421472
}
422473
return result;
423474
}
424475

425476
void filterAndSort()
426477
{
478+
if (isSortedAndFiltered)
479+
return;
480+
427481
int newCount = 0;
428482
for (int i = 0 ; i < rowCount; ++i)
429483
{
@@ -441,6 +495,7 @@ void filterAndSort()
441495
rowCount = newCount;
442496
}
443497
Arrays.sort(rows, 0, newCount, rowComparator());
498+
isSortedAndFiltered = true;
444499
}
445500

446501
int truncate(int newCount)

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ public void collect(PartitionsCollector collector)
946946
collector.partition(commandStore.id()).collect(collect -> {
947947
while (view.advance())
948948
{
949-
// TODO (required): view should return an immutable per-row view so that we can call lazyAdd
949+
// TODO (desired): view should return an immutable per-row view so that we can call lazyAdd
950950
collect.add(view.txnId().toString())
951951
.eagerCollect(columns -> {
952952
columns.add("table_id", tableIdStr)
@@ -1516,7 +1516,7 @@ abstract static class AbstractJournalTable extends AbstractLazyVirtualTable
15161516

15171517
AbstractJournalTable(TableMetadata metadata)
15181518
{
1519-
super(metadata, FAIL, ASC);
1519+
super(metadata, FAIL, UNSORTED, ASC);
15201520
}
15211521

15221522
@Override

src/java/org/apache/cassandra/journal/ActiveSegment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ void write(K id, ByteBuffer record)
466466
}
467467
}
468468

469-
// TODO (required): Find a better way to test unwritten allocations and/or corruption
469+
// TODO (expected): Find a better way to test unwritten allocations and/or corruption
470470
@VisibleForTesting
471471
void consumeBufferUnsafe(Consumer<ByteBuffer> fn)
472472
{

src/java/org/apache/cassandra/journal/Compactor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@
2424
import java.util.concurrent.Future;
2525
import java.util.concurrent.TimeUnit;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
2831
import org.apache.cassandra.concurrent.Shutdownable;
2932

3033
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
3134

3235
public final class Compactor<K, V> implements Runnable, Shutdownable
3336
{
37+
private static final Logger logger = LoggerFactory.getLogger(Compactor.class);
38+
3439
private final Journal<K, V> journal;
3540
private final SegmentCompactor<K, V> segmentCompactor;
3641
private final ScheduledExecutorPlus executor;
@@ -99,6 +104,7 @@ public boolean isTerminated()
99104
@Override
100105
public void shutdown()
101106
{
107+
logger.debug("Shutting down " + executor);
102108
executor.shutdown();
103109
}
104110

src/java/org/apache/cassandra/journal/Flusher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,12 @@ void start()
102102

103103
void shutdown() throws InterruptedException
104104
{
105+
logger.debug("Shutting down " + flushExecutor + " and awaiting termination");
105106
flushExecutor.shutdown();
106107
flushExecutor.awaitTermination(1, MINUTES);
107108
if (fsyncExecutor != null)
108109
{
110+
logger.debug("Shutting down " + fsyncExecutor + " and awaiting termination");
109111
fsyncExecutor.shutdownNow(); // `now` to interrupt potentially parked runnable
110112
fsyncExecutor.awaitTermination(1, MINUTES);
111113
}

src/java/org/apache/cassandra/journal/Journal.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public class Journal<K, V> implements Shutdownable
114114

115115
final AtomicReference<State> state = new AtomicReference<>(State.UNINITIALIZED);
116116

117-
// TODO (required): we do not need wait queues here, we can just wait on a signal on a segment while its byte buffer is being allocated
117+
// TODO (expected): we do not need wait queues here, we can just wait on a signal on a segment while its byte buffer is being allocated
118118
private final WaitQueue segmentPrepared = newWaitQueue();
119119
private final WaitQueue allocatorThreadWaitQueue = newWaitQueue();
120120
private final BooleanSupplier allocatorThreadWaitCondition = () -> (availableSegment == null);
@@ -256,6 +256,7 @@ public void shutdown()
256256
{
257257
Invariants.require(state.compareAndSet(State.NORMAL, State.SHUTDOWN),
258258
"Unexpected journal state while trying to shut down", state);
259+
logger.debug("Shutting down " + allocator + " and awaiting termination");
259260
allocator.shutdown();
260261
wakeAllocator(); // Wake allocator to force it into shutdown
261262
// TODO (expected): why are we awaitingTermination here when we have a separate method for it?
@@ -265,6 +266,7 @@ public void shutdown()
265266
compactor.awaitTermination(1, TimeUnit.MINUTES);
266267
flusher.shutdown();
267268
closeAllSegments();
269+
logger.debug("Shutting down " + releaser + " and " + closer + " and awaiting termination");
268270
releaser.shutdown();
269271
closer.shutdown();
270272
closer.awaitTermination(1, TimeUnit.MINUTES);

0 commit comments

Comments
 (0)