Skip to content

Commit 09b5266

Browse files
ifesdjeenbelliottsmith
authored andcommitted
Accord: Add Rebootstrap and unsafe Bootstrap
To support recovering a node that has lost some of its local transaction log, introduce rebootstrap and unsafe bootstrap modes, where Accord ensures no responses are produced for transactions the node cannot be certain it had not previously answered. patch by Benedict and Alex Petrov for CASSANDRA-20908
1 parent 1200040 commit 09b5266

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+266
-292
lines changed

.gitmodules

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[submodule "modules/accord"]
22
path = modules/accord
3-
url = https://github.com/apache/cassandra-accord.git
4-
branch = trunk
3+
url = https://github.com/belliottsmith/cassandra-accord.git
4+
branch = rebootstrap-sq

modules/accord

Submodule accord updated 94 files

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import accord.coordinate.Coordinations;
4848
import accord.coordinate.PrepareRecovery;
4949
import accord.coordinate.tracking.AbstractTracker;
50+
import accord.primitives.RoutingKeys;
5051
import accord.utils.SortedListMap;
5152
import org.apache.cassandra.cql3.Operator;
5253
import org.apache.cassandra.db.EmptyIterators;
@@ -74,8 +75,6 @@
7475
import accord.local.CommandStores.LatentStoreSelector;
7576
import accord.local.Commands;
7677
import accord.local.DurableBefore;
77-
import accord.local.LoadKeys;
78-
import accord.local.LoadKeysFor;
7978
import accord.local.MaxConflicts;
8079
import accord.local.Node;
8180
import accord.local.PreLoadContext;
@@ -143,8 +142,9 @@
143142
import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
144143
import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
145144
import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
145+
import static accord.local.RedundantStatus.Property.LOG_UNAVAILABLE;
146146
import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
147-
import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
147+
import static accord.local.RedundantStatus.Property.UNREADY;
148148
import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
149149
import static com.google.common.collect.ImmutableList.toImmutableList;
150150
import static java.lang.String.format;
@@ -365,9 +365,8 @@ public Partition getPartition(DecoratedKey partitionKey)
365365
TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner());
366366

367367
List<Entry> cfks = new CopyOnWriteArrayList<>();
368-
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
369368
CommandStores commandStores = AccordService.instance().node().commandStores();
370-
AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
369+
AccordService.getBlocking(commandStores.forEach("commands_for_key table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
371370
SafeCommandsForKey safeCfk = safeStore.get(key);
372371
CommandsForKey cfk = safeCfk.current();
373372
if (cfk == null)
@@ -475,9 +474,8 @@ public Partition getPartition(DecoratedKey partitionKey)
475474
TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner());
476475

477476
List<Entry> cfks = new CopyOnWriteArrayList<>();
478-
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query");
479477
CommandStores commandStores = AccordService.instance().node().commandStores();
480-
AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
478+
AccordService.getBlocking(commandStores.forEach("commands_for_key_unmanaged table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
481479
SafeCommandsForKey safeCfk = safeStore.get(key);
482480
CommandsForKey cfk = safeCfk.current();
483481
if (cfk == null)
@@ -888,8 +886,9 @@ private RedundantBeforeTable()
888886
" locally_redundant 'TxnIdUtf8Type',\n" +
889887
" locally_synced 'TxnIdUtf8Type',\n" +
890888
" locally_witnessed 'TxnIdUtf8Type',\n" +
891-
" pre_bootstrap 'TxnIdUtf8Type',\n" +
892-
" stale_until_at_least 'TxnIdUtf8Type',\n" +
889+
" log_unavailable 'TxnIdUtf8Type',\n" +
890+
" unready 'TxnIdUtf8Type',\n" +
891+
" stale_until 'TxnIdUtf8Type',\n" +
893892
" PRIMARY KEY (keyspace_name, table_name, table_id, command_store_id, token_start)" +
894893
')', UTF8Type.instance));
895894
}
@@ -923,8 +922,9 @@ public DataSet data()
923922
.column("locally_redundant", entry.maxBound(LOCALLY_REDUNDANT).toString())
924923
.column("locally_synced", entry.maxBound(LOCALLY_SYNCED).toString())
925924
.column("locally_witnessed", entry.maxBound(LOCALLY_WITNESSED).toString())
926-
.column("pre_bootstrap", entry.maxBound(PRE_BOOTSTRAP).toString())
927-
.column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
925+
.column("log_unavailable", entry.maxBound(LOG_UNAVAILABLE).toString())
926+
.column("unready", entry.maxBound(UNREADY).toString())
927+
.column("stale_until", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
928928
return ds;
929929
},
930930
dataSet,
@@ -1188,7 +1188,7 @@ public Partition getPartition(DecoratedKey partitionKey)
11881188
TxnId txnId = TxnId.parse(txnIdStr);
11891189

11901190
List<Entry> commands = new CopyOnWriteArrayList<>();
1191-
AccordService.instance().node().commandStores().forEachCommandStore(store -> {
1191+
AccordService.instance().node().commandStores().forAllUnsafe(store -> {
11921192
Command command = ((AccordCommandStore)store).loadCommand(txnId);
11931193
if (command != null)
11941194
commands.add(new Entry(store.id(), command));
@@ -1293,7 +1293,7 @@ public Partition getPartition(DecoratedKey partitionKey)
12931293
TxnId txnId = TxnId.parse(txnIdStr);
12941294

12951295
List<Entry> entries = new ArrayList<>();
1296-
AccordService.instance().node().commandStores().forEachCommandStore(store -> {
1296+
AccordService.instance().node().commandStores().forAllUnsafe(store -> {
12971297
for (AccordJournal.DebugEntry e : ((AccordCommandStore)store).debugCommand(txnId))
12981298
entries.add(new Entry(store.id(), e.segment, e.position, e.builder));
12991299
});

src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private synchronized void refreshHistograms()
175175

176176
int nowSeconds = (int) (Clock.Global.currentTimeMillis() / 1000);
177177
SnapshotBuilder builder = new SnapshotBuilder();
178-
service.node().commandStores().forEachCommandStore(commandStore -> {
178+
service.node().commandStores().forAllUnsafe(commandStore -> {
179179
DefaultProgressLog.ImmutableView view = ((DefaultProgressLog)commandStore.unsafeProgressLog()).immutableView();
180180
builder.progressLogActive += view.activeCount();
181181
builder.progressLogSize.increment(view.size());

src/java/org/apache/cassandra/service/ActiveRepairService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,8 +582,7 @@ else if (range.intersects(toRepair))
582582
{
583583
throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) " +
584584
"but is not fully contained in one; this would lead to " +
585-
"imprecise repair. keyspace: %s", toRepair.toString(),
586-
range.toString(), keyspaceName));
585+
"imprecise repair. keyspace: %s", toRepair, range, keyspaceName));
587586
}
588587
}
589588
if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))

src/java/org/apache/cassandra/service/Rebuild.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36+
import accord.api.ConfigurationService.EpochReady;
3637
import org.apache.cassandra.config.DatabaseDescriptor;
3738
import org.apache.cassandra.db.Keyspace;
3839
import org.apache.cassandra.dht.Range;
@@ -160,7 +161,7 @@ else if (tokens == null)
160161

161162
StreamResultFuture streamResult = streamer.fetchAsync();
162163

163-
Future<?> accordReady = AccordService.instance().epochReadyFor(metadata);
164+
Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
164165
Future<?> ready = FutureCombiner.allOf(streamResult, accordReady);
165166

166167
// wait for result

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@
7171
import com.google.common.collect.Ordering;
7272
import com.google.common.collect.Sets;
7373
import com.google.common.util.concurrent.Uninterruptibles;
74-
75-
import org.apache.cassandra.io.sstable.format.SSTableReader;
76-
import org.apache.cassandra.repair.autorepair.AutoRepair;
7774
import org.apache.commons.lang3.StringUtils;
7875
import org.slf4j.Logger;
7976
import org.slf4j.LoggerFactory;
@@ -134,6 +131,7 @@
134131
import org.apache.cassandra.io.sstable.IScrubber;
135132
import org.apache.cassandra.io.sstable.IVerifier;
136133
import org.apache.cassandra.io.sstable.SSTableLoader;
134+
import org.apache.cassandra.io.sstable.format.SSTableReader;
137135
import org.apache.cassandra.io.sstable.format.Version;
138136
import org.apache.cassandra.io.util.File;
139137
import org.apache.cassandra.io.util.FileUtils;
@@ -158,7 +156,9 @@
158156
import org.apache.cassandra.metrics.StorageMetrics;
159157
import org.apache.cassandra.net.MessagingService;
160158
import org.apache.cassandra.repair.RepairCoordinator;
159+
import org.apache.cassandra.repair.RepairParallelism;
161160
import org.apache.cassandra.repair.SharedContext;
161+
import org.apache.cassandra.repair.autorepair.AutoRepair;
162162
import org.apache.cassandra.repair.messages.RepairOption;
163163
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
164164
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -183,6 +183,7 @@
183183
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
184184
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
185185
import org.apache.cassandra.service.snapshot.SnapshotManager;
186+
import org.apache.cassandra.streaming.PreviewKind;
186187
import org.apache.cassandra.streaming.StreamManager;
187188
import org.apache.cassandra.streaming.StreamResultFuture;
188189
import org.apache.cassandra.streaming.StreamState;
@@ -206,9 +207,9 @@
206207
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
207208
import org.apache.cassandra.tcm.sequences.InProgressSequences;
208209
import org.apache.cassandra.tcm.sequences.SingleNodeSequences;
210+
import org.apache.cassandra.tcm.transformations.AlterTopology;
209211
import org.apache.cassandra.tcm.transformations.Assassinate;
210212
import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
211-
import org.apache.cassandra.tcm.transformations.AlterTopology;
212213
import org.apache.cassandra.tcm.transformations.Register;
213214
import org.apache.cassandra.tcm.transformations.Startup;
214215
import org.apache.cassandra.tcm.transformations.Unregister;
@@ -3145,6 +3146,29 @@ private FutureTask<Object> createRepairTask(final int cmd, final String keyspace
31453146
return new FutureTask<>(task);
31463147
}
31473148

3149+
public RepairCoordinator repairAccordKeyspace(String keyspace, Collection<Range<Token>> ranges)
3150+
{
3151+
int cmd = nextRepairCommand.incrementAndGet();
3152+
RepairOption options = new RepairOption(RepairParallelism.PARALLEL, // parallelism
3153+
false, // primaryRange
3154+
false, // incremental
3155+
false, // trace
3156+
5, // jobThreads
3157+
ranges, // ranges
3158+
true, // pullRepair
3159+
true, // forceRepair
3160+
PreviewKind.NONE, // previewKind
3161+
false, // optimiseStreams
3162+
true, // ignoreUnreplicatedKeyspaces
3163+
true, // repairData
3164+
false, // repairPaxos
3165+
true, // dontPurgeTombstones
3166+
false // repairAccord
3167+
);
3168+
3169+
return new RepairCoordinator(this, cmd, options, keyspace);
3170+
}
3171+
31483172
private void tryRepairPaxosForTopologyChange(String reason)
31493173
{
31503174
try

src/java/org/apache/cassandra/service/accord/AccordCommandStores.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
import org.apache.cassandra.cache.CacheSize;
3838
import org.apache.cassandra.config.AccordSpec.QueueShardModel;
3939
import org.apache.cassandra.config.DatabaseDescriptor;
40-
import org.apache.cassandra.schema.TableId;
4140
import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
42-
import org.apache.cassandra.service.accord.api.TokenKey;
41+
import org.apache.cassandra.tcm.ClusterMetadata;
42+
import org.apache.cassandra.tcm.membership.NodeState;
4343

4444
import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
4545
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
@@ -112,33 +112,13 @@ static Factory factory()
112112
};
113113
}
114114

115-
@Override
116-
protected boolean shouldBootstrap(Node node, Topology previous, Topology updated, Range range)
117-
{
118-
if (!super.shouldBootstrap(node, previous, updated, range))
119-
return false;
120-
// we see new ranges when a new keyspace is added, so avoid bootstrap in these cases
121-
return contains(previous, ((TokenKey) range.start()).table());
122-
}
123-
124115
@Override
125116
public SequentialAsyncExecutor someSequentialExecutor()
126117
{
127118
int idx = ((int) Thread.currentThread().getId()) & mask;
128119
return executors[idx].newSequentialExecutor();
129120
}
130121

131-
private static boolean contains(Topology previous, TableId searchTable)
132-
{
133-
for (Range range : previous.ranges())
134-
{
135-
TableId table = ((TokenKey) range.start()).table();
136-
if (table.equals(searchTable))
137-
return true;
138-
}
139-
return false;
140-
}
141-
142122
public synchronized void setCapacity(long bytes)
143123
{
144124
cacheSize = bytes;

src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ AsyncResult<Void> acknowledged()
113113

114114
@Nullable AsyncResult<Void> reads()
115115
{
116-
return reads;
116+
return ready == null ? null : ready.reads;
117117
}
118118

119119
AsyncResult.Settable<Void> localSyncNotified()
@@ -449,7 +449,7 @@ public void reportTopology(Topology topology, boolean isLoad, boolean startSync)
449449
}
450450

451451
@Override
452-
protected void localSyncComplete(Topology topology, boolean startSync)
452+
protected void onReadyToCoordinate(Topology topology, boolean startSync)
453453
{
454454
long epoch = topology.epoch();
455455
EpochState epochState = getOrCreateEpochState(epoch);

src/java/org/apache/cassandra/service/accord/AccordDataStore.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import accord.primitives.Range;
3131
import accord.primitives.Ranges;
3232
import accord.primitives.SyncPoint;
33+
import accord.utils.UnhandledEnum;
3334
import org.apache.cassandra.db.ColumnFamilyStore;
3435
import org.apache.cassandra.db.memtable.Memtable;
3536
import org.apache.cassandra.schema.Schema;
@@ -40,14 +41,6 @@ public class AccordDataStore implements DataStore
4041
private static final Logger logger = LoggerFactory.getLogger(AccordDataStore.class);
4142
enum FlushListenerKey { KEY }
4243

43-
@Override
44-
public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback)
45-
{
46-
AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore());
47-
coordinator.start();
48-
return coordinator.result();
49-
}
50-
5144
/**
5245
* Ensures data for the intersecting ranges is flushed to sstable before calling back with reportOnSuccess.
5346
* This is used to gate journal cleanup, since we skip the CommitLog for applying to the data table.
@@ -95,4 +88,23 @@ public void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBef
9588
prev = cfs;
9689
}
9790
}
91+
92+
@Override
93+
public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind)
94+
{
95+
switch (kind)
96+
{
97+
default: throw new UnhandledEnum(kind);
98+
case Image:
99+
{
100+
AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore());
101+
coordinator.start();
102+
return coordinator.result();
103+
}
104+
case Sync:
105+
{
106+
throw new UnsupportedOperationException();
107+
}
108+
}
109+
}
98110
}

0 commit comments

Comments
 (0)