Skip to content

Commit aa5c3ab

Browse files
committed
Improve Topology Management
Merge ConfigurationService with TopologyManager to remove cyclic dependency and duplicated work. Also: - Improve visibility of work blocking topology processing - Ensure we cannot double-count peers when deciding an epoch's distributed readiness - Remove the possibility of distributed stalls, by processing new topologies as soon as a contiguous sequence is known locally, regardless of whether anyprior local epoch is ready to coordinate or has recorded this fact to peers. The notification of local readiness continues to be processed only once all prior epochs are ready, but we can begin using and readying later epochs immediately. patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20998
1 parent 1a96b88 commit aa5c3ab

40 files changed

+662
-1378
lines changed

modules/accord

Submodule accord updated 81 files

src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import accord.primitives.Ranges;
5757
import accord.primitives.Routable;
5858
import accord.primitives.RoutingKeys;
59+
import accord.topology.ActiveEpoch;
60+
import accord.topology.ActiveEpochs;
5961
import accord.topology.Shard;
6062
import accord.topology.Topology;
6163
import accord.utils.SortedListMap;
@@ -2166,7 +2168,7 @@ private ShardEpochsTable()
21662168
public void collect(PartitionsCollector collector)
21672169
{
21682170
IAccordService service = AccordService.unsafeInstance();
2169-
List<Topology> snapshot = service.node().topology().topologySnapshot();
2171+
ActiveEpochs snapshot = service.node().topology().active();
21702172
Map<TableId, Map<TokenKey, List<ShardAndEpochs>>> tableIdLookup = new HashMap<>();
21712173

21722174
{
@@ -2177,8 +2179,9 @@ public void collect(PartitionsCollector collector)
21772179

21782180
TableId prevTableId = null;
21792181
Map<TokenKey, List<ShardAndEpochs>> startLookup = null;
2180-
for (Topology topology : snapshot)
2182+
for (ActiveEpoch epoch : snapshot)
21812183
{
2184+
Topology topology = epoch.global();
21822185
for (Shard shard : topology.shards())
21832186
{
21842187
Range range = shard.range;

src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232

3333
import accord.primitives.Range;
3434
import accord.primitives.Ranges;
35-
import accord.topology.TopologyManager.EpochsSnapshot;
36-
import accord.topology.TopologyManager.EpochsSnapshot.Epoch;
37-
import accord.topology.TopologyManager.EpochsSnapshot.EpochReady;
35+
import accord.topology.ActiveEpoch;
36+
import accord.topology.ActiveEpochs;
37+
import accord.topology.EpochReady;
3838
import org.apache.cassandra.config.DatabaseDescriptor;
3939
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
4040
import org.apache.cassandra.db.marshal.LongType;
@@ -46,8 +46,6 @@
4646
import org.apache.cassandra.service.accord.AccordService;
4747
import org.apache.cassandra.service.accord.TokenRange;
4848

49-
import static accord.topology.TopologyManager.EpochsSnapshot.ResultStatus.SUCCESS;
50-
5149
public class AccordVirtualTables
5250
{
5351
public static final String EPOCHS = "accord_epochs";
@@ -79,7 +77,7 @@ public EpochReadyTable(String keyspace)
7977
{
8078
super(parse(keyspace, "CREATE TABLE " + EPOCHS + " (\n" +
8179
" epoch bigint PRIMARY KEY,\n" +
82-
" ready_metadata text,\n" +
80+
" ready_active text,\n" +
8381
" ready_coordinate text,\n" +
8482
" ready_data text,\n" +
8583
" ready_reads text,\n" +
@@ -94,16 +92,16 @@ public EpochReadyTable(String keyspace)
9492
public DataSet data()
9593
{
9694
SimpleDataSet ds = new SimpleDataSet(metadata());
97-
EpochsSnapshot snapshot = epochsSnapshot();
98-
for (Epoch epoch : snapshot)
95+
ActiveEpochs snapshot = AccordService.instance().topology().active();
96+
for (ActiveEpoch epoch : snapshot)
9997
{
100-
ds.row(epoch.epoch);
101-
EpochReady ready = epoch.ready;
102-
ds.column("ready_metadata", ready.metadata.value);
103-
ds.column("ready_coordinate", ready.coordinate.value);
104-
ds.column("ready_data", ready.data.value);
105-
ds.column("ready_reads", ready.reads.value);
106-
ds.column("ready", ready.reads == SUCCESS);
98+
ds.row(epoch.epoch());
99+
EpochReady ready = epoch.epochReady();
100+
ds.column("ready_active", ready.active.toString());
101+
ds.column("ready_coordinate", ready.coordinate.toString());
102+
ds.column("ready_data", ready.data.toString());
103+
ds.column("ready_reads", ready.reads.toString());
104+
ds.column("ready", ready.active.isDone() && ready.coordinate.isDone() && ready.data.isDone() && ready.reads.isDone());
107105
}
108106
return ds;
109107
}
@@ -133,21 +131,21 @@ protected EpochSyncRanges(String keyspace)
133131
public DataSet data()
134132
{
135133
SimpleDataSet ds = new SimpleDataSet(metadata());
136-
EpochsSnapshot snapshot = epochsSnapshot();
137-
for (Epoch state : snapshot)
134+
ActiveEpochs snapshot = AccordService.instance().topology().active();
135+
for (ActiveEpoch state : snapshot)
138136
{
139137
Map<TableId, List<TokenRange>> addedRanges = groupByTable(state.addedRanges);
140138
Map<TableId, List<TokenRange>> removedRanges = groupByTable(state.removedRanges);
141-
Map<TableId, List<TokenRange>> synced = groupByTable(state.synced);
142-
Map<TableId, List<TokenRange>> closed = groupByTable(state.closed);
143-
Map<TableId, List<TokenRange>> retired = groupByTable(state.retired);
139+
Map<TableId, List<TokenRange>> synced = groupByTable(state.quorumReady());
140+
Map<TableId, List<TokenRange>> closed = groupByTable(state.closed());
141+
Map<TableId, List<TokenRange>> retired = groupByTable(state.retired());
144142

145143
Set<TableId> allTables = union(addedRanges.keySet(), removedRanges.keySet(), synced.keySet(), closed.keySet(), retired.keySet());
146144
for (TableId table : allTables)
147145
{
148146
TableMetadata metadata = Schema.instance.getTableMetadata(table);
149147
if (metadata == null) continue; // table dropped, ignore
150-
ds.row(state.epoch, metadata.keyspace, metadata.name);
148+
ds.row(state.epoch(), metadata.keyspace, metadata.name);
151149

152150
ds.column("added", format(addedRanges.get(table)));
153151
ds.column("removed", format(removedRanges.get(table)));
@@ -179,11 +177,6 @@ private static List<String> format(@Nullable List<TokenRange> list)
179177
}
180178
}
181179

182-
private static EpochsSnapshot epochsSnapshot()
183-
{
184-
return AccordService.instance().topology().epochsSnapshot();
185-
}
186-
187180
private static String toStringNoTable(TokenRange tr)
188181
{
189182
// TokenRange extends Range.EndInclusive

src/java/org/apache/cassandra/service/Rebuild.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36-
import accord.api.ConfigurationService.EpochReady;
36+
import accord.topology.EpochReady;
3737
import org.apache.cassandra.config.DatabaseDescriptor;
3838
import org.apache.cassandra.db.Keyspace;
3939
import org.apache.cassandra.dht.Range;

0 commit comments

Comments
 (0)