Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/belliottsmith/cassandra-accord.git
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you had it in mind, but just for completeness, reminding to switch branch.

branch = rebootstrap-sq
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 94 files
+1 −5 accord-core/src/main/java/accord/api/Agent.java
+8 −1 accord-core/src/main/java/accord/api/ConfigurationService.java
+9 −1 accord-core/src/main/java/accord/api/DataStore.java
+6 −1 accord-core/src/main/java/accord/api/Journal.java
+39 −0 accord-core/src/main/java/accord/api/OwnershipEventListener.java
+10 −5 accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+1 −1 accord-core/src/main/java/accord/coordinate/CoordinationAdapter.java
+6 −5 accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+20 −15 accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+16 −4 accord-core/src/main/java/accord/coordinate/FetchRoute.java
+14 −10 accord-core/src/main/java/accord/coordinate/Infer.java
+35 −8 accord-core/src/main/java/accord/coordinate/Invalidate.java
+36 −7 accord-core/src/main/java/accord/coordinate/KeyBarriers.java
+26 −54 accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+17 −10 accord-core/src/main/java/accord/impl/CommandChange.java
+12 −16 accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+1 −4 accord-core/src/main/java/accord/impl/progresslog/PackedKeyTracker.java
+34 −22 accord-core/src/main/java/accord/local/Bootstrap.java
+32 −8 accord-core/src/main/java/accord/local/Cleanup.java
+1 −1 accord-core/src/main/java/accord/local/Command.java
+224 −60 accord-core/src/main/java/accord/local/CommandStore.java
+126 −178 accord-core/src/main/java/accord/local/CommandStores.java
+13 −7 accord-core/src/main/java/accord/local/Commands.java
+39 −0 accord-core/src/main/java/accord/local/LogUnavailableException.java
+68 −0 accord-core/src/main/java/accord/local/MapReduceCommandStores.java
+95 −0 accord-core/src/main/java/accord/local/MapReduceConsumeCommandStores.java
+16 −52 accord-core/src/main/java/accord/local/Node.java
+126 −137 accord-core/src/main/java/accord/local/RedundantBefore.java
+89 −112 accord-core/src/main/java/accord/local/RedundantStatus.java
+24 −6 accord-core/src/main/java/accord/local/SafeCommandStore.java
+65 −65 accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+18 −18 accord-core/src/main/java/accord/local/cfk/PostProcess.java
+12 −12 accord-core/src/main/java/accord/local/cfk/Pruning.java
+19 −19 accord-core/src/main/java/accord/local/cfk/Serialize.java
+21 −21 accord-core/src/main/java/accord/local/cfk/Updating.java
+1 −0 accord-core/src/main/java/accord/local/durability/DurabilityService.java
+1 −2 accord-core/src/main/java/accord/local/durability/GlobalDurability.java
+0 −1 accord-core/src/main/java/accord/local/durability/ShardDurability.java
+10 −18 accord-core/src/main/java/accord/messages/Accept.java
+5 −6 accord-core/src/main/java/accord/messages/Apply.java
+3 −3 accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+44 −31 accord-core/src/main/java/accord/messages/Await.java
+6 −9 accord-core/src/main/java/accord/messages/BeginInvalidation.java
+4 −3 accord-core/src/main/java/accord/messages/BeginRecovery.java
+20 −29 accord-core/src/main/java/accord/messages/CheckStatus.java
+68 −25 accord-core/src/main/java/accord/messages/Commit.java
+3 −3 accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+3 −3 accord-core/src/main/java/accord/messages/GetLatestDeps.java
+3 −3 accord-core/src/main/java/accord/messages/GetMaxConflict.java
+3 −3 accord-core/src/main/java/accord/messages/InformDurable.java
+21 −12 accord-core/src/main/java/accord/messages/NoWaitRequest.java
+15 −59 accord-core/src/main/java/accord/messages/ParticipantsRequest.java
+3 −3 accord-core/src/main/java/accord/messages/PreAccept.java
+28 −15 accord-core/src/main/java/accord/messages/Propagate.java
+26 −16 accord-core/src/main/java/accord/messages/ReadData.java
+11 −8 accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+78 −0 accord-core/src/main/java/accord/messages/RouteRequest.java
+11 −7 accord-core/src/main/java/accord/messages/SetShardDurable.java
+10 −2 accord-core/src/main/java/accord/messages/StableThenRead.java
+1 −1 accord-core/src/main/java/accord/primitives/LatestDeps.java
+10 −8 accord-core/src/main/java/accord/primitives/Ranges.java
+6 −0 accord-core/src/main/java/accord/utils/DeterministicSet.java
+6 −0 accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
+1 −10 accord-core/src/main/java/accord/utils/MapReduceConsume.java
+10 −7 accord-core/src/main/java/accord/utils/async/AsyncResults.java
+9 −11 accord-core/src/test/java/accord/burn/BurnTestBase.java
+3 −10 accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+0 −26 accord-core/src/test/java/accord/burn/TopologyUpdates.java
+4 −5 accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+3 −3 accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+62 −0 accord-core/src/test/java/accord/impl/AbstractTestConfigurationService.java
+3 −3 accord-core/src/test/java/accord/impl/MessageListener.java
+8 −1 accord-core/src/test/java/accord/impl/TestAgent.java
+86 −51 accord-core/src/test/java/accord/impl/basic/Cluster.java
+0 −12 accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+20 −4 accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+2 −2 accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+74 −28 accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
+24 −9 accord-core/src/test/java/accord/impl/list/ListAgent.java
+2 −1 accord-core/src/test/java/accord/impl/list/ListData.java
+1 −1 accord-core/src/test/java/accord/impl/list/ListStore.java
+2 −2 accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
+1 −1 accord-core/src/test/java/accord/impl/mock/MockStore.java
+6 −11 accord-core/src/test/java/accord/local/RedundantBeforeTest.java
+9 −2 accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+1 −3 accord-core/src/test/java/accord/messages/ReadDataTest.java
+11 −11 accord-core/src/test/java/accord/messages/RouteRequestScopeTest.java
+1 −1 accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+105 −26 accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+2 −2 accord-core/src/test/java/accord/utils/ExtendedAssertions.java
+1 −1 accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+8 −1 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+1 −1 accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
+2 −1 accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java
26 changes: 13 additions & 13 deletions src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import accord.coordinate.Coordinations;
import accord.coordinate.PrepareRecovery;
import accord.coordinate.tracking.AbstractTracker;
import accord.primitives.RoutingKeys;
import accord.utils.SortedListMap;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.EmptyIterators;
Expand Down Expand Up @@ -74,8 +75,6 @@
import accord.local.CommandStores.LatentStoreSelector;
import accord.local.Commands;
import accord.local.DurableBefore;
import accord.local.LoadKeys;
import accord.local.LoadKeysFor;
import accord.local.MaxConflicts;
import accord.local.Node;
import accord.local.PreLoadContext;
Expand Down Expand Up @@ -143,8 +142,9 @@
import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
import static accord.local.RedundantStatus.Property.LOG_UNAVAILABLE;
import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
import static accord.local.RedundantStatus.Property.UNREADY;
import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
Expand Down Expand Up @@ -365,9 +365,8 @@ public Partition getPartition(DecoratedKey partitionKey)
TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner());

List<Entry> cfks = new CopyOnWriteArrayList<>();
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
CommandStores commandStores = AccordService.instance().node().commandStores();
AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
AccordService.getBlocking(commandStores.forEach("commands_for_key table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
SafeCommandsForKey safeCfk = safeStore.get(key);
CommandsForKey cfk = safeCfk.current();
if (cfk == null)
Expand Down Expand Up @@ -475,9 +474,8 @@ public Partition getPartition(DecoratedKey partitionKey)
TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner());

List<Entry> cfks = new CopyOnWriteArrayList<>();
PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query");
CommandStores commandStores = AccordService.instance().node().commandStores();
AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
AccordService.getBlocking(commandStores.forEach("commands_for_key_unmanaged table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
SafeCommandsForKey safeCfk = safeStore.get(key);
CommandsForKey cfk = safeCfk.current();
if (cfk == null)
Expand Down Expand Up @@ -888,8 +886,9 @@ private RedundantBeforeTable()
" locally_redundant 'TxnIdUtf8Type',\n" +
" locally_synced 'TxnIdUtf8Type',\n" +
" locally_witnessed 'TxnIdUtf8Type',\n" +
" pre_bootstrap 'TxnIdUtf8Type',\n" +
" stale_until_at_least 'TxnIdUtf8Type',\n" +
" log_unavailable 'TxnIdUtf8Type',\n" +
" unready 'TxnIdUtf8Type',\n" +
" stale_until 'TxnIdUtf8Type',\n" +
" PRIMARY KEY (keyspace_name, table_name, table_id, command_store_id, token_start)" +
')', UTF8Type.instance));
}
Expand Down Expand Up @@ -923,8 +922,9 @@ public DataSet data()
.column("locally_redundant", entry.maxBound(LOCALLY_REDUNDANT).toString())
.column("locally_synced", entry.maxBound(LOCALLY_SYNCED).toString())
.column("locally_witnessed", entry.maxBound(LOCALLY_WITNESSED).toString())
.column("pre_bootstrap", entry.maxBound(PRE_BOOTSTRAP).toString())
.column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
.column("log_unavailable", entry.maxBound(LOG_UNAVAILABLE).toString())
.column("unready", entry.maxBound(UNREADY).toString())
.column("stale_until", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
return ds;
},
dataSet,
Expand Down Expand Up @@ -1188,7 +1188,7 @@ public Partition getPartition(DecoratedKey partitionKey)
TxnId txnId = TxnId.parse(txnIdStr);

List<Entry> commands = new CopyOnWriteArrayList<>();
AccordService.instance().node().commandStores().forEachCommandStore(store -> {
AccordService.instance().node().commandStores().forAllUnsafe(store -> {
Command command = ((AccordCommandStore)store).loadCommand(txnId);
if (command != null)
commands.add(new Entry(store.id(), command));
Expand Down Expand Up @@ -1293,7 +1293,7 @@ public Partition getPartition(DecoratedKey partitionKey)
TxnId txnId = TxnId.parse(txnIdStr);

List<Entry> entries = new ArrayList<>();
AccordService.instance().node().commandStores().forEachCommandStore(store -> {
AccordService.instance().node().commandStores().forAllUnsafe(store -> {
for (AccordJournal.DebugEntry e : ((AccordCommandStore)store).debugCommand(txnId))
entries.add(new Entry(store.id(), e.segment, e.position, e.builder));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private synchronized void refreshHistograms()

int nowSeconds = (int) (Clock.Global.currentTimeMillis() / 1000);
SnapshotBuilder builder = new SnapshotBuilder();
service.node().commandStores().forEachCommandStore(commandStore -> {
service.node().commandStores().forAllUnsafe(commandStore -> {
DefaultProgressLog.ImmutableView view = ((DefaultProgressLog)commandStore.unsafeProgressLog()).immutableView();
builder.progressLogActive += view.activeCount();
builder.progressLogSize.increment(view.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,7 @@ else if (range.intersects(toRepair))
{
throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) " +
"but is not fully contained in one; this would lead to " +
"imprecise repair. keyspace: %s", toRepair.toString(),
range.toString(), keyspaceName));
"imprecise repair. keyspace: %s", toRepair, range, keyspaceName));
}
}
if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/service/Rebuild.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import accord.api.ConfigurationService.EpochReady;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
Expand Down Expand Up @@ -160,7 +161,7 @@ else if (tokens == null)

StreamResultFuture streamResult = streamer.fetchAsync();

Future<?> accordReady = AccordService.instance().epochReadyFor(metadata);
Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
Future<?> ready = FutureCombiner.allOf(streamResult, accordReady);

// wait for result
Expand Down
32 changes: 28 additions & 4 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;

import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.repair.autorepair.AutoRepair;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -134,6 +131,7 @@
import org.apache.cassandra.io.sstable.IScrubber;
import org.apache.cassandra.io.sstable.IVerifier;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
Expand All @@ -158,7 +156,9 @@
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairCoordinator;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.repair.autorepair.AutoRepair;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.KeyspaceMetadata;
Expand All @@ -183,6 +183,7 @@
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
Expand All @@ -206,9 +207,9 @@
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
import org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.SingleNodeSequences;
import org.apache.cassandra.tcm.transformations.AlterTopology;
import org.apache.cassandra.tcm.transformations.Assassinate;
import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
import org.apache.cassandra.tcm.transformations.AlterTopology;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.tcm.transformations.Unregister;
Expand Down Expand Up @@ -3145,6 +3146,29 @@ private FutureTask<Object> createRepairTask(final int cmd, final String keyspace
return new FutureTask<>(task);
}

public RepairCoordinator repairAccordKeyspace(String keyspace, Collection<Range<Token>> ranges)
{
int cmd = nextRepairCommand.incrementAndGet();
RepairOption options = new RepairOption(RepairParallelism.PARALLEL, // parallelism
false, // primaryRange
false, // incremental
false, // trace
5, // jobThreads
ranges, // ranges
true, // pullRepair
true, // forceRepair
PreviewKind.NONE, // previewKind
false, // optimiseStreams
true, // ignoreUnreplicatedKeyspaces
true, // repairData
false, // repairPaxos
true, // dontPurgeTombstones
false // repairAccord
);

return new RepairCoordinator(this, cmd, options, keyspace);
}

private void tryRepairPaxosForTopologyChange(String reason)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import org.apache.cassandra.cache.CacheSize;
import org.apache.cassandra.config.AccordSpec.QueueShardModel;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
import org.apache.cassandra.service.accord.api.TokenKey;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.NodeState;

import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
Expand Down Expand Up @@ -112,33 +112,13 @@ static Factory factory()
};
}

@Override
protected boolean shouldBootstrap(Node node, Topology previous, Topology updated, Range range)
{
if (!super.shouldBootstrap(node, previous, updated, range))
return false;
// we see new ranges when a new keyspace is added, so avoid bootstrap in these cases
return contains(previous, ((TokenKey) range.start()).table());
}

@Override
public SequentialAsyncExecutor someSequentialExecutor()
{
int idx = ((int) Thread.currentThread().getId()) & mask;
return executors[idx].newSequentialExecutor();
}

private static boolean contains(Topology previous, TableId searchTable)
{
for (Range range : previous.ranges())
{
TableId table = ((TokenKey) range.start()).table();
if (table.equals(searchTable))
return true;
}
return false;
}

public synchronized void setCapacity(long bytes)
{
cacheSize = bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ AsyncResult<Void> acknowledged()

@Nullable AsyncResult<Void> reads()
{
return reads;
return ready == null ? null : ready.reads;
}

AsyncResult.Settable<Void> localSyncNotified()
Expand Down Expand Up @@ -449,7 +449,7 @@ public void reportTopology(Topology topology, boolean isLoad, boolean startSync)
}

@Override
protected void localSyncComplete(Topology topology, boolean startSync)
protected void onReadyToCoordinate(Topology topology, boolean startSync)
{
long epoch = topology.epoch();
EpochState epochState = getOrCreateEpochState(epoch);
Expand Down
28 changes: 20 additions & 8 deletions src/java/org/apache/cassandra/service/accord/AccordDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.SyncPoint;
import accord.utils.UnhandledEnum;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.schema.Schema;
Expand All @@ -40,14 +41,6 @@ public class AccordDataStore implements DataStore
private static final Logger logger = LoggerFactory.getLogger(AccordDataStore.class);
enum FlushListenerKey { KEY }

@Override
public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback)
{
AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore());
coordinator.start();
return coordinator.result();
}

/**
* Ensures data for the intersecting ranges is flushed to sstable before calling back with reportOnSuccess.
* This is used to gate journal cleanup, since we skip the CommitLog for applying to the data table.
Expand Down Expand Up @@ -95,4 +88,23 @@ public void ensureDurable(CommandStore commandStore, Ranges ranges, RedundantBef
prev = cfs;
}
}

@Override
public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind)
{
switch (kind)
{
default: throw new UnhandledEnum(kind);
case Image:
{
AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore());
coordinator.start();
return coordinator.result();
}
case Sync:
{
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this implemented in some other way? I believe I had an implementation of repair plugged in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is also disabled for now, I plan to introduce it in a follow up patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding: this patch is then just to make Accord compile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it also supports bootstrapping a node that was previously bootstrapped. That said, let me see about at least restoring this part of the patch, and we can address the journal replay issue in a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way: I am also completely fine with doing this in a follow-up patch.

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public void forEach(Consumer<JournalKey> consumer)

@SuppressWarnings("unchecked")
@Override
public void replay(CommandStores commandStores)
public boolean replay(CommandStores commandStores)
{
// TODO (expected): make the parallelisms configurable
// Replay is performed in parallel, where at most X commands can be in flight, accross at most Y commands stores.
Expand Down Expand Up @@ -716,6 +716,7 @@ public void close()

++cur;
}
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return false on corruption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is essentially disabled for now as we discussed, I will introduce it more fully in a follow up patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not recall any conversation about this logic being disabled. Searched messages quickly and couldn't find anything either. Maybe I misunderstood when it got mentioned.

}
catch (Throwable t)
{
Expand Down
Loading