Skip to content

Commit 8289bb4

Browse files
committed
Accord: Topology serializer has a lot of repeated data, can dedup to shrink the cost
patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-20715
1 parent f6c1002 commit 8289bb4

File tree

21 files changed

+794
-54
lines changed

21 files changed

+794
-54
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Accord: Topology serializer has a lot of repeated data, can dedup to shrink the cost (CASSANDRA-20715)
23
* Stream individual files in their own transactions and hand over ownership to a parent transaction on completion (CASSANDRA-20728)
34
* Limit the number of held heap dumps to not consume disk space excessively (CASSANDRA-20457)
45
* Accord: BEGIN TRANSACTIONs IF condition logic does not properly support meaningless emptiness and null values (CASSANDRA-20667)

src/java/org/apache/cassandra/schema/TableId.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.function.LongUnaryOperator;
2626

2727
import javax.annotation.Nullable;
28+
29+
import org.apache.cassandra.io.UnversionedSerializer;
2830
import org.apache.cassandra.tcm.ClusterMetadata;
2931
import org.apache.commons.lang3.ArrayUtils;
3032

@@ -52,6 +54,10 @@
5254
public final class TableId implements Comparable<TableId>
5355
{
5456
public static final long MAGIC = 1956074401491665062L;
57+
/**
58+
* Represents a placeholder for cases where a table id is defined, but has no meaning.
59+
*/
60+
public static final TableId UNDEFINED = TableId.fromRaw(Long.MIN_VALUE, Long.MIN_VALUE);
5561
public static final long EMPTY_SIZE = ObjectSizes.measureDeep(new UUID(0, 0));
5662
private static final int MAGIC_BYTE = (int) ((flipSign(MAGIC) >>> 56) & 0xf0);
5763

@@ -402,6 +408,27 @@ public long serializedSize(TableId t, int version)
402408
}
403409
};
404410

411+
public static final UnversionedSerializer<TableId> compactComparableSerializer = new UnversionedSerializer<TableId>()
412+
{
413+
@Override
414+
public void serialize(TableId t, DataOutputPlus out) throws IOException
415+
{
416+
t.serializeCompactComparable(out);
417+
}
418+
419+
@Override
420+
public TableId deserialize(DataInputPlus in) throws IOException
421+
{
422+
return TableId.deserializeCompactComparable(in);
423+
}
424+
425+
@Override
426+
public long serializedSize(TableId t)
427+
{
428+
return t.serializedCompactComparableSize();
429+
}
430+
};
431+
405432
public static final MetadataSerializer<TableId> metadataSerializer = new MetadataSerializer<TableId>()
406433
{
407434
@Override

src/java/org/apache/cassandra/service/accord/FetchTopologies.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.Collection;
2424
import java.util.List;
25+
import java.util.Objects;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -90,6 +91,20 @@ public FetchTopologies(long minEpoch, long maxEpoch)
9091
this.maxEpoch = maxEpoch;
9192
}
9293

94+
@Override
95+
public boolean equals(Object o)
96+
{
97+
if (o == null || getClass() != o.getClass()) return false;
98+
FetchTopologies that = (FetchTopologies) o;
99+
return minEpoch == that.minEpoch && maxEpoch == that.maxEpoch;
100+
}
101+
102+
@Override
103+
public int hashCode()
104+
{
105+
return Objects.hash(minEpoch, maxEpoch);
106+
}
107+
93108
public static final UnversionedSerializer<TopologyRange> responseSerializer = new UnversionedSerializer<>()
94109
{
95110
@Override
@@ -101,7 +116,7 @@ public void serialize(TopologyRange t, DataOutputPlus out) throws IOException
101116
out.writeUnsignedVInt32(t.topologies.size());
102117

103118
for (Topology topology : t.topologies)
104-
TopologySerializers.topology.serialize(topology, out);
119+
TopologySerializers.compactTopology.serialize(topology, out);
105120
}
106121

107122
@Override
@@ -113,7 +128,7 @@ public TopologyRange deserialize(DataInputPlus in) throws IOException
113128
int count = in.readUnsignedVInt32();
114129
List<Topology> topologies = new ArrayList<>(count);
115130
for (int i = 0; i < count; ++i)
116-
topologies.add(TopologySerializers.topology.deserialize(in));
131+
topologies.add(TopologySerializers.compactTopology.deserialize(in));
117132
return new TopologyRange(min, current, firstNonEmpty, topologies);
118133
}
119134

@@ -125,7 +140,7 @@ public long serializedSize(TopologyRange t)
125140
size += TypeSizes.sizeofUnsignedVInt(t.firstNonEmpty);
126141
size += TypeSizes.sizeofUnsignedVInt(t.topologies.size());
127142
for (Topology topology : t.topologies)
128-
size += TopologySerializers.topology.serializedSize(topology);
143+
size += TopologySerializers.compactTopology.serializedSize(topology);
129144
return size;
130145
}
131146
};

src/java/org/apache/cassandra/service/accord/TokenRange.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public boolean isFullRange()
8787
}
8888

8989
@VisibleForTesting
90-
public Range withTable(TableId table)
90+
public TokenRange withTable(TableId table)
9191
{
9292
return new TokenRange(start().withTable(table), end().withTable(table));
9393
}
@@ -148,5 +148,29 @@ public long serializedSize(TokenRange range)
148148
return TokenKey.serializer.serializedSize(range.start())
149149
+ TokenKey.serializer.serializedSize(range.end());
150150
}
151+
}
152+
153+
public static final UnversionedSerializer<TokenRange> noTableSerializer = new UnversionedSerializer<TokenRange>()
154+
{
155+
@Override
156+
public void serialize(TokenRange t, DataOutputPlus out) throws IOException
157+
{
158+
TokenKey.noTableSerializer.serialize(t.start(), out);
159+
TokenKey.noTableSerializer.serialize(t.end(), out);
160+
}
161+
162+
@Override
163+
public TokenRange deserialize(DataInputPlus in) throws IOException
164+
{
165+
return TokenRange.create(TokenKey.noTableSerializer.deserialize(TableId.UNDEFINED, in),
166+
TokenKey.noTableSerializer.deserialize(TableId.UNDEFINED, in));
167+
}
168+
169+
@Override
170+
public long serializedSize(TokenRange t)
171+
{
172+
return TokenKey.noTableSerializer.serializedSize(t.start())
173+
+ TokenKey.noTableSerializer.serializedSize(t.end());
174+
}
151175
};
152176
}

src/java/org/apache/cassandra/service/accord/api/TokenKey.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.cassandra.db.marshal.ValueAccessor;
4141
import org.apache.cassandra.dht.IPartitioner;
4242
import org.apache.cassandra.dht.Token;
43+
import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
4344
import org.apache.cassandra.io.util.DataInputPlus;
4445
import org.apache.cassandra.io.util.DataOutputPlus;
4546
import org.apache.cassandra.schema.TableId;
@@ -230,6 +231,55 @@ public static TokenKey before(TableId table, Token token)
230231
return new TokenKey(table, BEFORE_TOKEN_SENTINEL, token);
231232
}
232233

234+
public static final NoTableSerializer noTableSerializer = new NoTableSerializer();
235+
236+
237+
public static class NoTableSerializer implements ParameterisedUnversionedSerializer<TokenKey, TableId>
238+
{
239+
@Override
240+
public void serialize(TokenKey key, TableId tableId, DataOutputPlus out) throws IOException
241+
{
242+
IPartitioner partitioner = key.token.getPartitioner();
243+
int fixedLength = partitioner.accordFixedLength();
244+
if (fixedLength < 0)
245+
{
246+
int len = partitioner.accordSerializedSize(key.token);
247+
out.writeUnsignedVInt32(len);
248+
}
249+
serializer.serializeWithoutPrefixOrLength(key, out);
250+
}
251+
252+
public void serialize(TokenKey key, DataOutputPlus out) throws IOException
253+
{
254+
serialize(key, key.table, out);
255+
}
256+
257+
@Override
258+
public long serializedSize(TokenKey key, TableId tableId)
259+
{
260+
IPartitioner partitioner = key.token.getPartitioner();
261+
int tokenSize = partitioner.accordFixedLength();
262+
if (tokenSize >= 0)
263+
return 2 + tokenSize;
264+
tokenSize = partitioner.accordSerializedSize(key.token);
265+
return 2 + tokenSize + VIntCoding.sizeOfUnsignedVInt(tokenSize);
266+
}
267+
268+
public long serializedSize(TokenKey key)
269+
{
270+
return serializedSize(key, key.table);
271+
}
272+
273+
@Override
274+
public TokenKey deserialize(TableId tableId, DataInputPlus in) throws IOException
275+
{
276+
IPartitioner partitioner = getPartitioner();
277+
int len = partitioner.accordFixedLength();
278+
if (len < 0) len = in.readUnsignedVInt32();
279+
return serializer.deserializeWithPrefix(tableId, len + 2, in, partitioner);
280+
}
281+
}
282+
233283
public static final class Serializer implements AccordSearchableKeySerializer<TokenKey>
234284
{
235285
private Serializer() {}

src/java/org/apache/cassandra/service/accord/journal/AccordTopologyUpdate.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ public void serialize(Journal.TopologyUpdate from, DataOutputPlus out) throws IO
114114
}
115115
//TODO (desired): local to what? Rather than serializing local we can serialize the node its relative too? that why when we deserialize we do globa.forNode(node)
116116
// this also decreases the size as we don't have redundent shards
117-
TopologySerializers.topology.serialize(from.local, out);
118-
TopologySerializers.topology.serialize(from.global, out);
117+
TopologySerializers.compactTopology.serialize(from.local, out);
118+
TopologySerializers.compactTopology.serialize(from.global, out);
119119
}
120120

121121
@Override
@@ -129,8 +129,8 @@ public Journal.TopologyUpdate deserialize(DataInputPlus in) throws IOException
129129
CommandStores.RangesForEpoch rangesForEpoch = RangesForEpochSerializer.instance.deserialize(in);
130130
commandStores.put(commandStoreId, rangesForEpoch);
131131
}
132-
Topology local = TopologySerializers.topology.deserialize(in);
133-
Topology global = TopologySerializers.topology.deserialize(in);
132+
Topology local = TopologySerializers.compactTopology.deserialize(in);
133+
Topology global = TopologySerializers.compactTopology.deserialize(in);
134134
return new Journal.TopologyUpdate(commandStores, local, global);
135135
}
136136

@@ -144,8 +144,8 @@ public long serializedSize(Journal.TopologyUpdate from)
144144
size += RangesForEpochSerializer.instance.serializedSize(e.getValue());
145145
}
146146

147-
size += TopologySerializers.topology.serializedSize(from.local);
148-
size += TopologySerializers.topology.serializedSize(from.global);
147+
size += TopologySerializers.compactTopology.serializedSize(from.local);
148+
size += TopologySerializers.compactTopology.serializedSize(from.global);
149149
return size;
150150
}
151151
}

0 commit comments

Comments
 (0)