Skip to content

Commit 674b8d5

Browse files
committed
Deduplicate ranges in WatermarkCollector
Also: collect ranges before submitting to TopologyManager to avoid quadratic complexity patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20757
1 parent 28edf72 commit 674b8d5

File tree

2 files changed

+85
-74
lines changed

2 files changed

+85
-74
lines changed

src/java/org/apache/cassandra/service/accord/WatermarkCollector.java

Lines changed: 77 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,29 @@
1919
package org.apache.cassandra.service.accord;
2020

2121
import java.io.IOException;
22-
import java.util.HashMap;
22+
import java.util.ArrayList;
23+
import java.util.Comparator;
2324
import java.util.HashSet;
25+
import java.util.List;
2426
import java.util.Map;
2527
import java.util.Objects;
2628
import java.util.Set;
29+
import java.util.function.BiConsumer;
2730

2831
import com.google.common.annotations.VisibleForTesting;
32+
import com.google.common.base.Predicates;
2933
import com.google.common.collect.Iterators;
30-
import org.slf4j.Logger;
31-
import org.slf4j.LoggerFactory;
34+
import com.google.common.primitives.Ints;
3235

3336
import accord.api.ConfigurationService;
3437
import accord.local.Node;
3538
import accord.primitives.Range;
3639
import accord.primitives.Ranges;
3740
import accord.topology.Topology;
3841
import accord.utils.Invariants;
42+
import accord.utils.ReducingRangeMap;
3943
import accord.utils.async.AsyncResult;
40-
import org.agrona.collections.Int2ObjectHashMap;
44+
import org.agrona.collections.Long2LongHashMap;
4145
import org.apache.cassandra.db.TypeSizes;
4246
import org.apache.cassandra.io.UnversionedSerializer;
4347
import org.apache.cassandra.io.util.DataInputPlus;
@@ -60,17 +64,21 @@
6064
*/
6165
public class WatermarkCollector implements ConfigurationService.Listener
6266
{
63-
private static final Logger logger = LoggerFactory.getLogger(WatermarkCollector.class);
67+
private static final Comparator<Map.Entry<Range, Long>> sortByEpochThenRange = (a, b) -> {
68+
int c = Long.compareUnsigned(a.getValue(), b.getValue());
69+
if (c == 0) c = a.getKey().compare(b.getKey());
70+
return c;
71+
};
6472

65-
final Map<Range, Long> closed;
66-
final Map<Range, Long> retired;
67-
final Int2ObjectHashMap<Long> synced;
73+
ReducingRangeMap<Long> closed;
74+
ReducingRangeMap<Long> retired;
75+
final Long2LongHashMap synced;
6876

6977
WatermarkCollector()
7078
{
71-
closed = new HashMap<>();
72-
retired = new HashMap<>();
73-
synced = new Int2ObjectHashMap<>();
79+
closed = new ReducingRangeMap<>();
80+
retired = new ReducingRangeMap<>();
81+
synced = new Long2LongHashMap(-1);
7482
}
7583

7684
@Override public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boolean startSync)
@@ -79,40 +87,36 @@ public class WatermarkCollector implements ConfigurationService.Listener
7987
}
8088

8189
@Override
82-
public void onRemoteSyncComplete(Node.Id node, long epoch)
90+
public synchronized void onRemoteSyncComplete(Node.Id node, long epoch)
8391
{
84-
synced.compute(node.id, (k, prev) -> prev == null ? epoch : Long.max(prev, epoch));
92+
synced.compute(node.id, (k, prev) -> prev == -1 ? epoch : Long.max(prev, epoch));
8593
}
8694

8795
@Override
88-
public void onEpochClosed(Ranges ranges, long epoch)
96+
public synchronized void onEpochClosed(Ranges ranges, long epoch)
8997
{
90-
synchronized (this)
91-
{
92-
for (Range range : ranges)
93-
this.closed.compute(range, (k, prev) -> prev == null ? epoch : Long.max(prev, epoch));
94-
}
98+
closed = ReducingRangeMap.merge(closed, ReducingRangeMap.create(ranges, epoch), Long::max);
9599
}
96100

97101
@Override
98-
public void onEpochRetired(Ranges ranges, long epoch)
102+
public synchronized void onEpochRetired(Ranges ranges, long epoch)
99103
{
100-
synchronized (this)
101-
{
102-
for (Range range : ranges)
103-
this.retired.compute(range, (k, prev) -> prev == null ? epoch : Long.max(prev, epoch));
104-
}
104+
retired = ReducingRangeMap.merge(retired, ReducingRangeMap.create(ranges, epoch), Long::max);
105105
}
106106

107107
public final IVerbHandler<Void> handler = new IVerbHandler<Void>()
108108
{
109-
public void doVerb(Message<Void> message) throws IOException
109+
public void doVerb(Message<Void> message)
110110
{
111111
Invariants.require(AccordService.started());
112112
Snapshot snapshot;
113113
synchronized (WatermarkCollector.this)
114114
{
115-
snapshot = new Snapshot(new HashMap<>(closed), new HashMap<>(retired), new Int2ObjectHashMap<>(synced));
115+
List<Map.Entry<Range, Long>> closedSnapshot = closed.foldlWithBounds((epoch, list, start, end) -> { list.add(Map.entry(start.rangeFactory().newRange(start, end), epoch)); return list; }, new ArrayList<>(), Predicates.alwaysFalse());
116+
List<Map.Entry<Range, Long>> retiredSnapshot = retired.foldlWithBounds((epoch, list, start, end) -> { list.add(Map.entry(start.rangeFactory().newRange(start, end), epoch)); return list; }, new ArrayList<>(), Predicates.alwaysFalse());
117+
Long2LongHashMap syncedSnapshot = new Long2LongHashMap(synced.size(), 0.6f, -1);
118+
syncedSnapshot.putAll(synced);
119+
snapshot = new Snapshot(closedSnapshot, retiredSnapshot, syncedSnapshot);
116120
}
117121
MessagingService.instance().respond(snapshot, message);
118122
}
@@ -139,32 +143,47 @@ static void fetchAndReportWatermarksAsync(AccordConfigurationService configServi
139143

140144
Snapshot snapshot = m.payload;
141145
long minEpoch = configService.minEpoch();
142-
for (Map.Entry<Range, Long> e : snapshot.closed.entrySet())
143-
{
144-
Ranges r = Ranges.of(e.getKey());
145-
configService.receiveClosed(r, e.getValue());
146-
}
147-
for (Map.Entry<Range, Long> e : snapshot.retired.entrySet())
148-
{
149-
Ranges r = Ranges.of(e.getKey());
150-
configService.receiveRetired(r, e.getValue());
151-
}
152-
for (Map.Entry<Integer, Long> e : snapshot.synced.entrySet())
146+
forEachEpoch(configService::receiveClosed, snapshot.closed);
147+
forEachEpoch(configService::receiveRetired, snapshot.retired);
148+
for (Map.Entry<Long, Long> e : snapshot.synced.entrySet())
153149
{
154-
Node.Id node = new Node.Id(e.getKey());
150+
Node.Id node = new Node.Id(Ints.saturatedCast(e.getKey()));
155151
for (long epoch = minEpoch; epoch <= e.getValue(); epoch++)
156152
configService.receiveRemoteSyncComplete(node, epoch);
157153
}
158154
});
159155
}
160156

157+
private static void forEachEpoch(BiConsumer<Ranges, Long> forEachEpoch, List<Map.Entry<Range, Long>> rangesAndEpochs)
158+
{
159+
if (rangesAndEpochs.isEmpty())
160+
return;
161+
162+
rangesAndEpochs.sort(sortByEpochThenRange);
163+
long collectingEpoch = rangesAndEpochs.get(0).getValue();
164+
List<Range> ranges = new ArrayList<>();
165+
for (Map.Entry<Range, Long> e : rangesAndEpochs)
166+
{
167+
Range range = e.getKey();
168+
long epoch = e.getValue();
169+
if (epoch != collectingEpoch)
170+
{
171+
forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)), collectingEpoch);
172+
collectingEpoch = epoch;
173+
ranges.clear();
174+
}
175+
ranges.add(range);
176+
}
177+
forEachEpoch.accept(Ranges.of(ranges.toArray(Range[]::new)), collectingEpoch);
178+
}
179+
161180
public static class Snapshot
162181
{
163-
public final Map<Range, Long> closed;
164-
public final Map<Range, Long> retired;
165-
public final Int2ObjectHashMap<Long> synced;
182+
public final List<Map.Entry<Range, Long>> closed;
183+
public final List<Map.Entry<Range, Long>> retired;
184+
public final Long2LongHashMap synced;
166185

167-
public Snapshot(Map<Range, Long> closed, Map<Range, Long> retired, Int2ObjectHashMap<Long> synced)
186+
public Snapshot(List<Map.Entry<Range, Long>> closed, List<Map.Entry<Range, Long>> retired, Long2LongHashMap synced)
168187
{
169188
this.closed = closed;
170189
this.retired = retired;
@@ -193,21 +212,21 @@ public int hashCode()
193212
public void serialize(Snapshot t, DataOutputPlus out) throws IOException
194213
{
195214
out.writeUnsignedVInt32(t.closed.size());
196-
for (Map.Entry<Range, Long> e : t.closed.entrySet())
215+
for (Map.Entry<Range, Long> e : t.closed)
197216
{
198217
TokenRange.serializer.serialize((TokenRange) e.getKey(), out);
199218
out.writeUnsignedVInt(e.getValue());
200219
}
201220
out.writeUnsignedVInt32(t.retired.size());
202-
for (Map.Entry<Range, Long> e : t.retired.entrySet())
221+
for (Map.Entry<Range, Long> e : t.retired)
203222
{
204223
TokenRange.serializer.serialize((TokenRange) e.getKey(), out);
205224
out.writeUnsignedVInt(e.getValue());
206225
}
207226
out.writeUnsignedVInt32(t.synced.size());
208-
for (Map.Entry<Integer, Long> e : t.synced.entrySet())
227+
for (Map.Entry<Long, Long> e : t.synced.entrySet())
209228
{
210-
out.writeUnsignedVInt32(e.getKey());
229+
out.writeUnsignedVInt(e.getKey());
211230
out.writeUnsignedVInt(e.getValue());
212231
}
213232
}
@@ -217,25 +236,20 @@ public void serialize(Snapshot t, DataOutputPlus out) throws IOException
217236
public Snapshot deserialize(DataInputPlus in) throws IOException
218237
{
219238
int closedSize = in.readUnsignedVInt32();
220-
Map<Range, Long> closed = new HashMap<>();
239+
List<Map.Entry<Range, Long>> closed = new ArrayList<>();
221240
for (int i = 0; i < closedSize; i++)
222-
{
223-
closed.put(TokenRange.serializer.deserialize(in),
224-
in.readUnsignedVInt());
225-
}
241+
closed.add(Map.entry(TokenRange.serializer.deserialize(in), in.readUnsignedVInt()));
242+
226243
int retiredSize = in.readUnsignedVInt32();
227-
Map<Range, Long> retired = new HashMap<>();
244+
List<Map.Entry<Range, Long>> retired = new ArrayList<>();
228245
for (int i = 0; i < retiredSize; i++)
229-
{
230-
retired.put(TokenRange.serializer.deserialize(in),
231-
in.readUnsignedVInt());
232-
}
246+
retired.add(Map.entry(TokenRange.serializer.deserialize(in), in.readUnsignedVInt()));
247+
233248
int syncedSize = in.readUnsignedVInt32();
234-
Int2ObjectHashMap<Long> synced = new Int2ObjectHashMap<>();
249+
Long2LongHashMap synced = new Long2LongHashMap(-1);
235250
for (int i = 0; i < syncedSize; i++)
236251
{
237-
synced.put(in.readUnsignedVInt32(),
238-
(Long) in.readUnsignedVInt());
252+
synced.put(in.readUnsignedVInt(), in.readUnsignedVInt());
239253
}
240254
return new Snapshot(closed, retired, synced);
241255
}
@@ -245,19 +259,19 @@ public long serializedSize(Snapshot t)
245259
{
246260
int size = 0;
247261
size += TypeSizes.sizeofUnsignedVInt(t.closed.size());
248-
for (Map.Entry<Range, Long> e : t.closed.entrySet())
262+
for (Map.Entry<Range, Long> e : t.closed)
249263
{
250264
size += TokenRange.serializer.serializedSize((TokenRange) e.getKey());
251265
size += TypeSizes.sizeofUnsignedVInt(e.getValue());
252266
}
253267
size += TypeSizes.sizeofUnsignedVInt(t.retired.size());
254-
for (Map.Entry<Range, Long> e : t.retired.entrySet())
268+
for (Map.Entry<Range, Long> e : t.retired)
255269
{
256270
size += TokenRange.serializer.serializedSize((TokenRange) e.getKey());
257271
size += TypeSizes.sizeofUnsignedVInt(e.getValue());
258272
}
259273
size += TypeSizes.sizeofUnsignedVInt(t.synced.size());
260-
for (Map.Entry<Integer, Long> e : t.synced.entrySet())
274+
for (Map.Entry<Long, Long> e : t.synced.entrySet())
261275
{
262276
size += TypeSizes.sizeofUnsignedVInt(e.getKey());
263277
size += TypeSizes.sizeofUnsignedVInt(e.getValue());

test/unit/org/apache/cassandra/service/accord/WatermarkCollectorTest.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
package org.apache.cassandra.service.accord;
2020

21+
import java.util.ArrayList;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324

24-
import com.google.common.collect.Sets;
2525
import org.junit.Test;
2626

2727
import accord.local.Node;
@@ -31,6 +31,7 @@
3131
import accord.utils.Gens;
3232
import accord.utils.Invariants;
3333
import org.agrona.collections.Int2ObjectHashMap;
34+
import org.agrona.collections.Long2LongHashMap;
3435
import org.apache.cassandra.config.DatabaseDescriptor;
3536
import org.apache.cassandra.dht.IPartitioner;
3637
import org.apache.cassandra.io.Serializers;
@@ -58,34 +59,30 @@ public void snapshotSerializer()
5859

5960
private static void maybeUpdatePartitioner(WatermarkCollector.Snapshot snapshot)
6061
{
61-
for (Range range : Sets.union(snapshot.closed.keySet(), snapshot.retired.keySet()))
62-
{
63-
TokenRange tr = (TokenRange) range;
64-
DatabaseDescriptor.setPartitionerUnsafe(tr.start().token().getPartitioner());
65-
break;
66-
}
62+
if (!snapshot.closed.isEmpty()) DatabaseDescriptor.setPartitionerUnsafe(((TokenRange)snapshot.closed.get(0).getKey()).start().token().getPartitioner());
63+
else if (!snapshot.retired.isEmpty()) DatabaseDescriptor.setPartitionerUnsafe(((TokenRange)snapshot.retired.get(0).getKey()).start().token().getPartitioner());
6764
}
6865

6966
private Gen<WatermarkCollector.Snapshot> snapshotGen()
7067
{
7168
Gen<IPartitioner> partitionerGen = AccordGenerators.partitioner();
7269
Gen.LongGen epochGen = AccordGens.epochs();
73-
Gen<Int2ObjectHashMap<Long>> syncedGen = syncedGen();
70+
Gen<Long2LongHashMap> syncedGen = syncedGen();
7471
return rs -> {
7572
IPartitioner partitioner = partitionerGen.next(rs);
7673
Gen<Range> rangeGen = AccordGenerators.range(partitioner);
7774
Gen<Map<Range, Long>> mapGen = mapGen(Gens.ints().between(0, 10), rangeGen, epochGen);
78-
return new WatermarkCollector.Snapshot(mapGen.next(rs), mapGen.next(rs), syncedGen.next(rs));
75+
return new WatermarkCollector.Snapshot(new ArrayList<>(mapGen.next(rs).entrySet()), new ArrayList<>(mapGen.next(rs).entrySet()), syncedGen.next(rs));
7976
};
8077
}
8178

82-
private static Gen<Int2ObjectHashMap<Long>> syncedGen()
79+
private static Gen<Long2LongHashMap> syncedGen()
8380
{
8481
Gen.IntGen sizeGen = Gens.ints().between(0, 10);
8582
Gen<Node.Id> idGen = AccordGens.nodes();
8683
Gen.LongGen epochGen = AccordGens.epochs();
8784
return rs -> {
88-
Int2ObjectHashMap<Long> map = new Int2ObjectHashMap<>();
85+
Long2LongHashMap map = new Long2LongHashMap(-1);
8986
Gen<Node.Id> uniqueIdGen = idGen.filter(id -> !map.containsKey(id.id));
9087
for (int i = 0, size = sizeGen.nextInt(rs); i < size; i++)
9188
map.put(uniqueIdGen.next(rs).id, epochGen.next(rs));

0 commit comments

Comments
 (0)