Skip to content

Commit c9e5239

Browse files
committed
Fix:
- Lock inversion when restarting durability for erased sync point - Initialise durability cycle start time Improve: - Improve durability reporting - Timeout durability requests - DurabilityQueue concurrency should limit only until quorum is achieved - Some exceptions that may be thrown when starting coordination may not be propagated patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20328
1 parent 7bf6eef commit c9e5239

File tree

8 files changed

+24
-17
lines changed

8 files changed

+24
-17
lines changed

src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import static accord.local.durability.DurabilityService.SyncLocal.Self;
6767
import static accord.local.durability.DurabilityService.SyncRemote.NoRemote;
6868
import static com.google.common.base.Preconditions.checkNotNull;
69+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
6970
import static org.apache.cassandra.config.CassandraRelevantProperties.REPAIR_MUTATION_REPAIR_ROWS_PER_BATCH;
7071
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
7172

@@ -260,10 +261,11 @@ public void finished()
260261
{
261262
Ranges accordRanges = AccordTopology.toAccordRanges(cfs.getTableId(), ranges);
262263
long startedAtNanos = nanoTime();
263-
long deadlineNanos = startedAtNanos + DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos();
264+
long timeoutNanos = DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos();
265+
long deadlineNanos = startedAtNanos + timeoutNanos;
264266
// TODO (expected): use the source bounds for the streams to avoid waiting unnecessarily long
265267
AccordService.getBlocking(accordService.maxConflict(accordRanges)
266-
.flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote))
268+
.flatMap(min -> accordService.sync("[Stream #" + session.planId() + ']', min, accordRanges, null, Self, NoRemote, timeoutNanos, NANOSECONDS))
267269
, accordRanges, new LatencyRequestBookkeeping(cfs.metric.accordPostStreamRepair), startedAtNanos, deadlineNanos);
268270
}
269271

src/java/org/apache/cassandra/service/accord/AccordService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,9 @@ public ShardDurability.ImmutableView shardDurability()
516516
}
517517

518518
@Override
519-
public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote)
519+
public AsyncChain<Void> sync(Object requestedBy, Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
520520
{
521-
return node.durability().sync(requestedBy, minBound, ranges, include, syncLocal, syncRemote);
521+
return node.durability().sync(requestedBy, minBound, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
522522
}
523523

524524
@Override
@@ -998,10 +998,11 @@ public void awaitDone(TableId id, long epoch)
998998
if (rangeList.isEmpty()) return; // nothing to see here
999999

10001000
Ranges ranges = Ranges.of(rangeList.toArray(accord.primitives.Range[]::new));
1001+
long timeout = DatabaseDescriptor.getAccordRepairTimeoutNanos();
10011002
long startedAt = nanoTime();
1002-
long deadline = startedAt + DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos();
1003+
long deadline = startedAt + timeout;
10031004
// TODO (required): relax this requirement - too expensive
1004-
getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " + epoch + ')', TxnId.minForEpoch(epoch), ranges, Self, All), ranges, new LatencyRequestBookkeeping(null), startedAt, deadline, false);
1005+
getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " + epoch + ')', TxnId.minForEpoch(epoch), ranges, Self, All, DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges, new LatencyRequestBookkeeping(null), startedAt, deadline, false);
10051006
}
10061007

10071008
public Params journalConfiguration()

src/java/org/apache/cassandra/service/accord/IAccordService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public interface IAccordService
7878
IVerbHandler<? extends Request> requestHandler();
7979
IVerbHandler<? extends Reply> responseHandler();
8080

81-
AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote);
81+
AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp minBound, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits);
8282
AsyncChain<Void> sync(@Nullable Timestamp minBound, Keys keys, SyncLocal syncLocal, SyncRemote syncRemote);
8383
AsyncChain<Timestamp> maxConflict(Ranges ranges);
8484

@@ -193,7 +193,7 @@ public IVerbHandler<? extends Reply> responseHandler()
193193
}
194194

195195
@Override
196-
public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote)
196+
public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
197197
{
198198
throw new UnsupportedOperationException("No accord transaction should be executed when accord.enabled = false in cassandra.yaml");
199199
}
@@ -361,9 +361,9 @@ public IVerbHandler<? extends Reply> responseHandler()
361361
}
362362

363363
@Override
364-
public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote)
364+
public AsyncChain<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Id> include, SyncLocal syncLocal, SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
365365
{
366-
return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote);
366+
return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote, timeout, timeoutUnits);
367367
}
368368

369369
@Override

src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static accord.local.durability.DurabilityService.SyncRemote.Quorum;
5555
import static accord.primitives.Timestamp.mergeMax;
5656
import static accord.primitives.Timestamp.minForEpoch;
57+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
5758
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordRepairTimeoutNanos;
5859

5960
/*
@@ -149,10 +150,11 @@ private List<accord.primitives.Range> repairRange(TokenRange range) throws Throw
149150
Ranges ranges = AccordService.intersecting(Ranges.of(range));
150151
waiting = Thread.currentThread();
151152
RequestBookkeeping bookkeeping = new LatencyRequestBookkeeping(latency);
153+
long timeoutNanos = getAccordRepairTimeoutNanos();
152154
AccordService.getBlocking(service.maxConflict(ranges).flatMap(conflict -> {
153155
conflict = mergeMax(conflict, minForEpoch(this.minEpoch.getEpoch()));
154-
return service.sync("[repairId #" + repairId + ']', conflict, Ranges.of(range), ids, NoLocal, syncRemote);
155-
}), ranges, bookkeeping, start, start + getAccordRepairTimeoutNanos());
156+
return service.sync("[repairId #" + repairId + ']', conflict, Ranges.of(range), ids, NoLocal, syncRemote, timeoutNanos, NANOSECONDS);
157+
}), ranges, bookkeeping, start, start + timeoutNanos);
156158
waiting = null;
157159

158160
if (shouldAbort != null)

test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ public BarrierRecordingService(IAccordService delegate)
8989
}
9090

9191
@Override
92-
public AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Node.Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote)
92+
public AsyncResult<Void> sync(Object requestedBy, @Nullable Timestamp onOrAfter, Ranges ranges, @Nullable Collection<Node.Id> include, DurabilityService.SyncLocal syncLocal, DurabilityService.SyncRemote syncRemote, long timeout, TimeUnit timeoutUnits)
9393
{
94-
return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote).map(v -> {
94+
return delegate.sync(requestedBy, onOrAfter, ranges, include, syncLocal, syncRemote, 10L, TimeUnit.MINUTES).map(v -> {
9595
executedBarriers = true;
9696
return v;
9797
}).beginAsResult();

test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.util.concurrent.Callable;
23+
import java.util.concurrent.TimeUnit;
2324

2425
import org.junit.Test;
2526

@@ -89,7 +90,7 @@ private static void insert(IInvokableInstance node, String ks, String table)
8990
Ranges ranges = Ranges.single(TokenRange.fullRange(metadata.id, metadata.partitioner));
9091
for (int i = 0; i < 10; i++)
9192
{
92-
AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum));
93+
AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
9394

9495
accord.journal().closeCurrentSegmentForTestingIfNonEmpty();
9596
accord.journal().runCompactorForTesting();

test/distributed/org/apache/cassandra/distributed/test/accord/journal/StatefulJournalRestartTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.time.Duration;
23+
import java.util.concurrent.TimeUnit;
2324

2425
import org.junit.Ignore;
2526
import org.junit.Test;
@@ -118,7 +119,7 @@ private static void insert(IInvokableInstance node)
118119
Ranges ranges = Ranges.single(TokenRange.fullRange(metadata.id, metadata.partitioner));
119120
for (int i = 0; i < 10; i++)
120121
{
121-
AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum));
122+
AsyncChains.getBlockingAndRethrow(accord.sync(null, Timestamp.NONE, ranges, null, DurabilityService.SyncLocal.Self, DurabilityService.SyncRemote.Quorum, 10L, TimeUnit.MINUTES));
122123

123124
accord.journal().closeCurrentSegmentForTestingIfNonEmpty();
124125
accord.journal().runCompactorForTesting();

0 commit comments

Comments
 (0)