Skip to content

Commit ac448a9

Browse files
committed
review
1 parent d13f656 commit ac448a9

14 files changed

+121
-105
lines changed

src/java/org/apache/cassandra/db/compression/CompressionDictionary.java

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@
2323
import java.io.EOFException;
2424
import java.io.IOException;
2525
import java.util.Objects;
26+
import java.util.Set;
2627
import javax.annotation.Nullable;
2728

29+
import com.google.common.collect.ImmutableSet;
2830
import com.google.common.hash.Hasher;
2931
import com.google.common.hash.Hashing;
3032

3133
import org.apache.cassandra.cql3.UntypedResultSet;
34+
import org.apache.cassandra.io.compress.ICompressor;
35+
import org.apache.cassandra.io.compress.IDictionaryCompressor;
36+
import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
3237

3338
public interface CompressionDictionary extends AutoCloseable
3439
{
@@ -56,6 +61,11 @@ default Kind kind()
5661
return identifier().kind;
5762
}
5863

64+
default IDictionaryCompressor<? extends CompressionDictionary> getCompressor()
65+
{
66+
return kind().getCompressor(this);
67+
}
68+
5969
/**
6070
* Write compression dictionary to file
6171
*
@@ -120,41 +130,31 @@ static CompressionDictionary deserialize(DataInput input, @Nullable CompressionD
120130
int checksum = input.readInt();
121131
int calculatedChecksum = calculateChecksum((byte) kindOrdinal, id, dict);
122132
if (checksum != calculatedChecksum)
123-
{
124133
throw new IOException("Compression dictionary checksum does not match");
125-
}
126134

127-
CompressionDictionary dictionary = null;
128-
if (kind == Kind.ZSTD)
129-
{
130-
dictionary = new ZstdCompressionDictionary(dictId, dict);
131-
}
132-
133-
if (dictionary == null)
134-
{
135-
throw new IOException(kind + " compression dictionary is not created");
136-
}
135+
CompressionDictionary dictionary = kind.getDictionary(dictId, dict);
137136

138137
// update the dictionary manager if it exists
139138
if (manager != null)
140-
{
141139
manager.add(dictionary);
142-
}
140+
143141
return dictionary;
144142
}
145143

146144
static CompressionDictionary createFromRow(UntypedResultSet.Row row)
147145
{
148146
String kindStr = row.getString("kind");
149-
long id = row.getLong("dict_id");
150-
byte[] dict = row.getByteArray("dict");
151-
CompressionDictionary.DictId dictId = new CompressionDictionary.DictId(CompressionDictionary.Kind.valueOf(kindStr), id);
152-
if (dictId.kind == CompressionDictionary.Kind.ZSTD)
147+
long dictId = row.getLong("dict_id");
148+
149+
try
153150
{
154-
return new ZstdCompressionDictionary(dictId, dict);
151+
Kind kind = CompressionDictionary.Kind.valueOf(kindStr);
152+
return kind.getDictionary(new DictId(kind, dictId), row.getByteArray("dict"));
153+
}
154+
catch (IllegalArgumentException ex)
155+
{
156+
throw new IllegalStateException(kindStr + " compression dictionary is not created for dict id " + dictId);
155157
}
156-
157-
throw new IllegalStateException(kindStr + " compression dictionary is not created");
158158
}
159159

160160
@SuppressWarnings("UnstableApiUsage")
@@ -170,14 +170,75 @@ static int calculateChecksum(byte kindOrdinal, long dictId, byte[] dict)
170170
enum Kind
171171
{
172172
// Order matters: the enum ordinal is serialized
173-
ZSTD;
173+
ZSTD
174+
{
175+
public CompressionDictionary getDictionary(DictId dictId, byte[] dict)
176+
{
177+
return new ZstdCompressionDictionary(dictId, dict);
178+
}
179+
180+
@Override
181+
public IDictionaryCompressor<? extends CompressionDictionary> getCompressor(CompressionDictionary dictionary)
182+
{
183+
assert dictionary instanceof ZstdCompressionDictionary;
184+
return ZstdDictionaryCompressor.create((ZstdCompressionDictionary) dictionary);
185+
}
186+
187+
@Override
188+
public ICompressionDictionaryTrainer getTrainer(String keyspaceName, String tableName, CompressionDictionaryTrainingConfig config, ICompressor compressor)
189+
{
190+
assert compressor instanceof ZstdDictionaryCompressor;
191+
return new ZstdDictionaryTrainer(keyspaceName, tableName, config, ((ZstdDictionaryCompressor) compressor).compressionLevel());
192+
}
193+
};
194+
195+
public static final Set<Kind> ACCEPTABLE_DICTIONARY_KINDS = ImmutableSet.of(Kind.ZSTD);
196+
197+
public abstract CompressionDictionary getDictionary(CompressionDictionary.DictId dictId, byte[] dict);
198+
199+
public abstract IDictionaryCompressor<? extends CompressionDictionary> getCompressor(CompressionDictionary dictionary);
200+
201+
public abstract ICompressionDictionaryTrainer getTrainer(String keyspaceName, String tableName, CompressionDictionaryTrainingConfig config, ICompressor compressor);
174202
}
175203

176204
final class DictId
177205
{
178206
public final Kind kind;
179207
public final long id; // A value of negative or 0 means no dictionary
180208

209+
/**
210+
* Creates a monotonically increasing dictionary ID by combining timestamp and dictionary ID.
211+
* <p>
212+
* The resulting dictionary ID has the following structure:
213+
* - Upper 32 bits: timestamp in minutes (signed int)
214+
* - Lower 32 bits: Zstd dictionary ID (unsigned int, passed as long due to Java limitations)
215+
* <p>
216+
* This ensures dictionary IDs are monotonically increasing over time, which helps to identify
217+
* the latest dictionary.
218+
* <p>
219+
* The implementation assumes that dictionary training frequency is significantly larger than
220+
* every minute, which a healthy system should do. In the scenario when multiple dictionaries
221+
* are trained in the same minute (only possible using manual training), there should not be
222+
* correctness concerns since the dictionary is attached to the SSTables, but leads to performance
223+
* hit from having too many dictionary. Therefore, such scenario should be avoided at the best.
224+
*
225+
* @param currentTimeMillis the current time in milliseconds
226+
* @param dictId dictionary ID (unsigned 32-bit value represented as long)
227+
* @return combined dictionary ID that is monotonically increasing over time
228+
*/
229+
static long makeDictId(long currentTimeMillis, long dictId)
230+
{
231+
// timestamp in minutes since Unix epoch. Good until year 6053
232+
long timestampMinutes = currentTimeMillis / 1000 / 60;
233+
// Convert timestamp to long and shift to upper 32 bits
234+
long combined = timestampMinutes << 32;
235+
236+
// Add the unsigned int (already as long) to lower 32 bits
237+
combined |= (dictId & 0xFFFFFFFFL);
238+
239+
return combined;
240+
}
241+
181242
public DictId(Kind kind, long id)
182243
{
183244
this.kind = kind;

src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public CompressionDictionaryEventHandler(ColumnFamilyStore cfs, ICompressionDict
5959
}
6060

6161
@Override
62-
public void onNewDictionaryTrained(long dictionaryId)
62+
public void onNewDictionaryTrained(CompressionDictionary.DictId dictionaryId)
6363
{
64-
logger.info("Notifying cluster about dictionary update for {}.{} with dictionaryId {}",
64+
logger.info("Notifying cluster about dictionary update for {}.{} with {}",
6565
keyspaceName, tableName, dictionaryId);
6666

6767
CompressionDictionaryUpdateMessage message = new CompressionDictionaryUpdateMessage(cfs.metadata().id, dictionaryId);
@@ -76,7 +76,7 @@ public void onNewDictionaryTrained(long dictionaryId)
7676
}
7777

7878
@Override
79-
public void onNewDictionaryAvailable(long dictId)
79+
public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId)
8080
{
8181
// Best effort to retrieve the dictionary; otherwise, the periodic task should retrieve the dictionary later
8282
CompletableFuture.runAsync(() -> {
@@ -87,13 +87,13 @@ public void onNewDictionaryAvailable(long dictId)
8787
return;
8888
}
8989

90-
CompressionDictionary dictionary = SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, tableName, dictId);
90+
CompressionDictionary dictionary = SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, tableName, dictionaryId);
9191
cache.setCurrentIfNewer(dictionary);
9292
}
9393
catch (Exception e)
9494
{
95-
logger.warn("Failed to retrieve compression dictionary for {}.{}. dictionaryId={}",
96-
keyspaceName, tableName, dictId, e);
95+
logger.warn("Failed to retrieve compression dictionary for {}.{}. {}",
96+
keyspaceName, tableName, dictionaryId, e);
9797
}
9898
}, ScheduledExecutors.nonPeriodicTasks);
9999
}
@@ -102,7 +102,7 @@ public void onNewDictionaryAvailable(long dictId)
102102
// If the request fails, each peer has periodic task scheduled to pull.
103103
private void sendNotification(InetAddressAndPort target, CompressionDictionaryUpdateMessage message)
104104
{
105-
logger.debug("Sending dictionary update notification to {}", target);
105+
logger.debug("Sending dictionary update notification for {} to {}", message.dictionaryId, target);
106106

107107
Message<CompressionDictionaryUpdateMessage> msg = Message.out(Verb.DICTIONARY_UPDATE_REQ, message);
108108
MessagingService.instance()

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,13 +192,13 @@ public void setCurrentIfNewer(@Nullable CompressionDictionary dictionary)
192192
}
193193

194194
@Override
195-
public void onNewDictionaryTrained(long dictionaryId)
195+
public void onNewDictionaryTrained(CompressionDictionary.DictId dictionaryId)
196196
{
197197
eventHandler.onNewDictionaryTrained(dictionaryId);
198198
}
199199

200200
@Override
201-
public void onNewDictionaryAvailable(long dictionaryId)
201+
public void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId)
202202
{
203203
eventHandler.onNewDictionaryAvailable(dictionaryId);
204204
}
@@ -266,7 +266,7 @@ private void handleNewDictionary(CompressionDictionary dictionary)
266266
{
267267
// sequence meatters; persist the new dictionary before broadcasting to others.
268268
storeDictionary(dictionary);
269-
onNewDictionaryTrained(dictionary.identifier().id);
269+
onNewDictionaryTrained(dictionary.identifier());
270270
}
271271

272272
private CompressionDictionaryTrainingConfig createTrainingConfig()

src/java/org/apache/cassandra/db/compression/CompressionDictionaryUpdateMessage.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222

23+
import org.apache.cassandra.db.compression.CompressionDictionary.DictId;
2324
import org.apache.cassandra.io.IVersionedSerializer;
2425
import org.apache.cassandra.io.util.DataInputPlus;
2526
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -30,9 +31,9 @@ public class CompressionDictionaryUpdateMessage
3031
public static final IVersionedSerializer<CompressionDictionaryUpdateMessage> serializer = new DictionaryUpdateMessageSerializer();
3132

3233
public final TableId tableId;
33-
public final long dictionaryId;
34+
public final DictId dictionaryId;
3435

35-
public CompressionDictionaryUpdateMessage(TableId tableId, long dictionaryId)
36+
public CompressionDictionaryUpdateMessage(TableId tableId, DictId dictionaryId)
3637
{
3738
this.tableId = tableId;
3839
this.dictionaryId = dictionaryId;
@@ -44,21 +45,25 @@ public static class DictionaryUpdateMessageSerializer implements IVersionedSeria
4445
public void serialize(CompressionDictionaryUpdateMessage message, DataOutputPlus out, int version) throws IOException
4546
{
4647
TableId.serializer.serialize(message.tableId, out, version);
47-
out.writeLong(message.dictionaryId);
48+
out.writeByte(message.dictionaryId.kind.ordinal());
49+
out.writeLong(message.dictionaryId.id);
4850
}
4951

5052
@Override
5153
public CompressionDictionaryUpdateMessage deserialize(DataInputPlus in, int version) throws IOException
5254
{
5355
TableId tableId = TableId.serializer.deserialize(in, version);
56+
int kindOrdinal = in.readByte();
5457
long dictionaryId = in.readLong();
55-
return new CompressionDictionaryUpdateMessage(tableId, dictionaryId);
58+
DictId dictId = new DictId(CompressionDictionary.Kind.values()[kindOrdinal], dictionaryId);
59+
return new CompressionDictionaryUpdateMessage(tableId, dictId);
5660
}
5761

5862
@Override
5963
public long serializedSize(CompressionDictionaryUpdateMessage message, int version)
6064
{
61-
return TableId.serializer.serializedSize(message.tableId, version) +
65+
return TableId.serializer.serializedSize(message.tableId, version) +
66+
1 + // byte for kind ordinal
6267
8; // long for dictionaryId
6368
}
6469
}

src/java/org/apache/cassandra/db/compression/ICompressionDictionaryEventHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ public interface ICompressionDictionaryEventHandler
2424
* Invoked when a new dictionary is trained
2525
* @param dictionaryId dictionary id
2626
*/
27-
void onNewDictionaryTrained(long dictionaryId);
27+
void onNewDictionaryTrained(CompressionDictionary.DictId dictionaryId);
2828

2929
/**
3030
* Invoked when {@link CompressionDictionaryUpdateMessage} is received indicating
3131
* a dictionary is trained and local node should retrieve the specified dictionary
3232
* @param dictionaryId dictionary id
3333
*/
34-
void onNewDictionaryAvailable(long dictionaryId);
34+
void onNewDictionaryAvailable(CompressionDictionary.DictId dictionaryId);
3535
}

src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.cassandra.concurrent.ScheduledExecutors;
2626
import org.apache.cassandra.io.compress.ICompressor;
2727
import org.apache.cassandra.io.compress.IDictionaryCompressor;
28-
import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
2928
import org.apache.cassandra.schema.CompressionParams;
3029

3130
/**
@@ -143,16 +142,11 @@ static ICompressionDictionaryTrainer create(String keyspaceName,
143142
{
144143
ICompressor compressor = params.getSstableCompressor();
145144
if (!(compressor instanceof IDictionaryCompressor))
146-
{
147145
throw new IllegalArgumentException("Compressor does not support dictionary training: " + params.getSstableCompressor());
148-
}
149146

150147
IDictionaryCompressor dictionaryCompressor = (IDictionaryCompressor) compressor;
151-
if (dictionaryCompressor.acceptableDictionaryKind() == CompressionDictionary.Kind.ZSTD)
152-
{
153-
ZstdDictionaryCompressor zstdDictionaryCompressor = (ZstdDictionaryCompressor) compressor;
154-
return new ZstdDictionaryTrainer(keyspaceName, tableName, config, zstdDictionaryCompressor.compressionLevel());
155-
}
148+
if (CompressionDictionary.Kind.ACCEPTABLE_DICTIONARY_KINDS.contains(dictionaryCompressor.acceptableDictionaryKind()))
149+
return dictionaryCompressor.acceptableDictionaryKind().getTrainer(keyspaceName, tableName, config, compressor);
156150

157151
throw new IllegalArgumentException("No dictionary trainer available for: " + params.getSstableCompressor());
158152
}

src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public CompressionDictionary trainDictionary(boolean force)
125125
currentSampleCount, totalSampleSize.get(), isReady);
126126
byte[] dictBytes = zstdTrainer.trainSamples();
127127
long zstdDictId = Zstd.getDictIdFromDict(dictBytes);
128-
DictId dictId = new DictId(Kind.ZSTD, makeDictionaryId(System.currentTimeMillis(), zstdDictId));
128+
DictId dictId = new DictId(Kind.ZSTD, DictId.makeDictId(System.currentTimeMillis(), zstdDictId));
129129
currentTrainingStatus = TrainingStatus.COMPLETED;
130130
logger.debug("New dictionary is trained with {}", dictId);
131131
CompressionDictionary dictionary = new ZstdCompressionDictionary(dictId, dictBytes);
@@ -287,39 +287,6 @@ public void close()
287287
logger.info("Permanently closed dictionary trainer for {}.{}", keyspaceName, tableName);
288288
}
289289

290-
/**
291-
* Creates a monotonically increasing dictionary ID by combining timestamp and Zstd dictionary ID.
292-
*
293-
* The resulting dictionary ID has the following structure:
294-
* - Upper 32 bits: timestamp in minutes (signed int)
295-
* - Lower 32 bits: Zstd dictionary ID (unsigned int, passed as long due to Java limitations)
296-
*
297-
* This ensures dictionary IDs are monotonically increasing over time, which helps to identify
298-
* the latest dictionary.
299-
*
300-
* The implementation assumes that dictionary training frequency is significantly larger than
301-
* every minute, which a healthy system should do. In the scenario when multiple dictionaries
302-
* are trained in the same minute (only possible using manual training), there should not be
303-
* correctness concerns since the dictionary is attached to the SSTables, but leads to performance
304-
* hit from having too many dictionary. Therefore, such scenario should be avoided at the best.
305-
*
306-
* @param currentTimeMillis the current time in milliseconds
307-
* @param zstdDictId Zstd dictionary ID (unsigned 32-bit value represented as long)
308-
* @return combined dictionary ID that is monotonically increasing over time
309-
*/
310-
static long makeDictionaryId(long currentTimeMillis, long zstdDictId)
311-
{
312-
// timestamp in minutes since Unix epoch. Good until year 6053
313-
long timestampMinutes = currentTimeMillis / 1000 / 60;
314-
// Convert timestamp to long and shift to upper 32 bits
315-
long combined = timestampMinutes << 32;
316-
317-
// Add the unsigned int (already as long) to lower 32 bits
318-
combined |= (zstdDictId & 0xFFFFFFFFL);
319-
320-
return combined;
321-
}
322-
323290
@VisibleForTesting
324291
long getSampleCount()
325292
{

0 commit comments

Comments
 (0)