Skip to content

Commit e7037f1

Browse files
committed
Add configs to cassandra.yaml and refactoring
1 parent 7bf7e30 commit e7037f1

11 files changed

+280
-53
lines changed

conf/cassandra.yaml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,54 @@ counter_cache_save_period: 7200s
617617
# Disabled by default, meaning all keys are going to be saved
618618
# counter_cache_keys_to_save: 100
619619

620+
# Dictionary compression settings for ZSTD dictionary-based compression
621+
# These settings control the automatic training and caching of compression dictionaries
622+
# for tables that use ZSTD dictionary compression.
623+
624+
# How often to refresh compression dictionaries across the cluster.
625+
# During refresh, nodes will check for newer dictionary versions and update their caches.
626+
# Min unit: s
627+
compression_dictionary_refresh_interval: 3600s
628+
629+
# Initial delay before starting the first dictionary refresh cycle after node startup.
630+
# This prevents all nodes from refreshing simultaneously when the cluster starts.
631+
# Min unit: s
632+
compression_dictionary_refresh_initial_delay: 10s
633+
634+
# Maximum number of compression dictionaries to cache per table.
635+
# Each table using dictionary compression can have multiple dictionaries cached
636+
# (current version plus recently used versions for reading older SSTables).
637+
compression_dictionary_cache_size: 10
638+
639+
# How long to keep compression dictionaries in the cache before they expire.
640+
# Expired dictionaries will be removed from memory but can be reloaded if needed.
641+
# Min unit: s
642+
compression_dictionary_cache_expire: 3600s
643+
644+
# Dictionary training configuration (advanced settings)
645+
# These settings control how compression dictionaries are trained from sample data.
646+
647+
# Maximum size of a trained compression dictionary in bytes.
648+
# Larger dictionaries may provide better compression but use more memory.
649+
# Min unit: B
650+
compression_dictionary_training_max_dictionary_size: 65536
651+
652+
# Maximum total size of sample data to collect for dictionary training.
653+
# More sample data generally produces better dictionaries but takes longer to train.
654+
# The recommended sample size is 100x the dictionary size.
655+
# Min unit: B
656+
compression_dictionary_training_max_total_sample_size: 10485760
657+
658+
# Enable automatic dictionary training based on sampling of write operations.
659+
# When enabled, the system will automatically collect samples and train new dictionaries.
660+
# Manual training via nodetool is always available regardless of this setting.
661+
compression_dictionary_training_auto_train_enabled: false
662+
663+
# Sampling rate for automatic dictionary training (1-10000).
664+
# Value of 100 means 1% of writes are sampled. Lower values reduce overhead but may
665+
# result in less representative sample data for dictionary training.
666+
compression_dictionary_training_sampling_rate: 100
667+
620668
# saved caches
621669
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
622670
# saved_caches_directory: /var/lib/cassandra/saved_caches

src/java/org/apache/cassandra/config/Config.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,12 +519,10 @@ public static class SSTableConfig
519519
public volatile int compression_dictionary_cache_size = 10; // max dictionaries per table
520520
public volatile DurationSpec.IntSecondsBound compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("3600s"); // 1 hour default
521521

522-
// TODO: add to cassandra.yaml
523522
// Dictionary training settings
524523
public volatile int compression_dictionary_training_max_dictionary_size = 65536; // 64KB
525524
public volatile int compression_dictionary_training_max_total_sample_size = 10485760; // 10MB total
526525
public volatile boolean compression_dictionary_training_auto_train_enabled = false;
527-
public volatile DurationSpec.IntSecondsBound compression_dictionary_training_manual_sampling_duration = new DurationSpec.IntSecondsBound("600s"); // 10 minutes default
528526
public volatile int compression_dictionary_training_sampling_rate = 100; // samples 1%; using int since random.nextInt is generally faster than random.nextDouble
529527

530528
public DataStorageSpec.LongMebibytesBound paxos_cache_size = null;

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4396,10 +4396,6 @@ public static boolean getCompressionDictionaryTrainingAutoTrainEnabled()
43964396
return conf.compression_dictionary_training_auto_train_enabled;
43974397
}
43984398

4399-
public static int getCompressionDictionaryTrainingManualSamplingDurationSeconds()
4400-
{
4401-
return conf.compression_dictionary_training_manual_sampling_duration.toSeconds();
4402-
}
44034399

44044400
public static int getCompressionDictionaryTrainingSamplingRate()
44054401
{

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,11 @@ public synchronized void train(Map<String, String> options)
212212
throw new IllegalStateException("Dictionary trainer is not available for table " + keyspaceName + '.' + tableName);
213213
}
214214

215+
// Parse and validate training options
216+
ManualTrainingOptions trainingOptions = ManualTrainingOptions.fromStringMap(options);
217+
215218
trainer.start(true);
216-
scheduler.scheduleManualTraining(options, trainer);
219+
scheduler.scheduleManualTraining(trainingOptions, trainer);
217220
}
218221

219222
@Override

src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21-
import java.util.Map;
2221
import java.util.concurrent.ScheduledFuture;
2322
import java.util.concurrent.TimeUnit;
2423

@@ -79,28 +78,14 @@ public void scheduleRefreshTask()
7978
}
8079

8180
@Override
82-
public void scheduleManualTraining(Map<String, String> options, ICompressionDictionaryTrainer trainer)
81+
public void scheduleManualTraining(ManualTrainingOptions options, ICompressionDictionaryTrainer trainer)
8382
{
8483
if (scheduledManualTrainingTask != null)
8584
{
8685
throw new IllegalStateException("Training already in progress for table " + keyspaceName + '.' + tableName);
8786
}
8887

89-
// Parse max sampling duration from options (default from configuration)
90-
int maxSamplingDurationSeconds = DatabaseDescriptor.getCompressionDictionaryTrainingManualSamplingDurationSeconds();
91-
if (options.containsKey("maxSamplingDurationSeconds"))
92-
{
93-
String durationStr = options.get("maxSamplingDurationSeconds");
94-
try
95-
{
96-
maxSamplingDurationSeconds = Integer.parseInt(durationStr);
97-
}
98-
catch (NumberFormatException e)
99-
{
100-
logger.warn("Invalid maxSamplingDurationSeconds value: {}, using default: {}",
101-
durationStr, maxSamplingDurationSeconds);
102-
}
103-
}
88+
int maxSamplingDurationSeconds = options.getMaxSamplingDurationSeconds();
10489

10590
logger.info("Starting manual dictionary training for {}.{} with max sampling duration: {} seconds",
10691
keyspaceName, tableName, maxSamplingDurationSeconds);

src/java/org/apache/cassandra/db/compression/ICompressionDictionaryScheduler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21-
import java.util.Map;
22-
2321
/**
2422
* Interface for managing scheduled tasks for compression dictionary operations.
2523
* <p>
@@ -38,11 +36,11 @@ public interface ICompressionDictionaryScheduler extends AutoCloseable
3836
/**
3937
* Schedules manual training with the specified options.
4038
*
41-
* @param options training options including maxSamplingDurationSeconds
39+
* @param options parsed and validated training options
4240
* @param trainer the trainer to use
4341
* @throws IllegalStateException if training is already in progress
4442
*/
45-
void scheduleManualTraining(Map<String, String> options, ICompressionDictionaryTrainer trainer);
43+
void scheduleManualTraining(ManualTrainingOptions options, ICompressionDictionaryTrainer trainer);
4644

4745
/**
4846
* Cancel the in-progress manual training
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.compression;
20+
21+
import java.util.Map;
22+
23+
/**
24+
* Configuration options for manual compression dictionary training.
25+
* This class encapsulates the parsed and validated parameters needed for training.
26+
*/
27+
public class ManualTrainingOptions
28+
{
29+
private final int maxSamplingDurationSeconds;
30+
31+
public ManualTrainingOptions(int maxSamplingDurationSeconds)
32+
{
33+
if (maxSamplingDurationSeconds <= 0)
34+
{
35+
throw new IllegalArgumentException("maxSamplingDurationSeconds must be positive, got: " + maxSamplingDurationSeconds);
36+
}
37+
this.maxSamplingDurationSeconds = maxSamplingDurationSeconds;
38+
}
39+
40+
/**
41+
* Parse options from a string map, typically from JMX/MBean calls.
42+
*
43+
* @param options the string map containing training options
44+
* @return parsed and validated ManualTrainingOptions
45+
* @throws IllegalArgumentException if required parameters are missing or invalid
46+
*/
47+
public static ManualTrainingOptions fromStringMap(Map<String, String> options)
48+
{
49+
if (options == null || !options.containsKey("maxSamplingDurationSeconds"))
50+
{
51+
throw new IllegalArgumentException("maxSamplingDurationSeconds parameter is required for manual dictionary training");
52+
}
53+
54+
String durationStr = options.get("maxSamplingDurationSeconds");
55+
int maxSamplingDurationSeconds;
56+
try
57+
{
58+
maxSamplingDurationSeconds = Integer.parseInt(durationStr);
59+
}
60+
catch (NumberFormatException e)
61+
{
62+
throw new IllegalArgumentException("Invalid maxSamplingDurationSeconds value: " + durationStr, e);
63+
}
64+
65+
return new ManualTrainingOptions(maxSamplingDurationSeconds);
66+
}
67+
68+
public int getMaxSamplingDurationSeconds()
69+
{
70+
return maxSamplingDurationSeconds;
71+
}
72+
73+
@Override
74+
public String toString()
75+
{
76+
return "ManualTrainingOptions{" +
77+
"maxSamplingDurationSeconds=" + maxSamplingDurationSeconds +
78+
'}';
79+
}
80+
}

test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ public void testEnableDisableDictionaryCompression()
112112

113113
assertThatNoException()
114114
.as("Should allow manual training")
115-
.isThrownBy(() -> manager.train(Map.of()));
115+
.isThrownBy(() -> manager.train(Map.of("maxSamplingDurationSeconds", "600")));
116116

117117
// Disable dictionary compression
118118
CompressionParams nonDictParams = CompressionParams.lz4();
119119
manager.maybeReloadFromSchema(nonDictParams);
120120

121-
assertThatThrownBy(() -> manager.train(Map.of()))
121+
assertThatThrownBy(() -> manager.train(Map.of("maxSamplingDurationSeconds", "600")))
122122
.as("Should disallow manual training when using lz4")
123123
.isInstanceOf(IllegalArgumentException.class)
124124
.hasMessageContaining("does not support dictionary compression");
@@ -130,7 +130,7 @@ public void testEnableDisableDictionaryCompression()
130130

131131
assertThatNoException()
132132
.as("Should allow manual training after switching back to dictionary compression")
133-
.isThrownBy(() -> manager.train(Map.of()));
133+
.isThrownBy(() -> manager.train(Map.of("maxSamplingDurationSeconds", "600")));
134134
}
135135

136136
@Test

test/unit/org/apache/cassandra/db/compression/CompressionDictionaryManagerTest.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void testMaybeReloadFromSchemaEnableDictionaryCompression()
153153

154154
managerWithoutDict.maybeReloadFromSchema(dictParams);
155155

156-
managerWithoutDict.train(Map.of());
156+
managerWithoutDict.train(Map.of("maxSamplingDurationSeconds", "600"));
157157
// Should now have training capability
158158
String newStatus = managerWithoutDict.getTrainingStatus();
159159
assertThat(newStatus)
@@ -164,7 +164,7 @@ public void testMaybeReloadFromSchemaEnableDictionaryCompression()
164164
@Test
165165
public void testMaybeReloadFromSchemaDisableDictionaryCompression()
166166
{
167-
managerWithDict.train(Map.of());
167+
managerWithDict.train(Map.of("maxSamplingDurationSeconds", "600"));
168168
String status = managerWithDict.getTrainingStatus();
169169
assertThat(status)
170170
.as("Should be sampling")
@@ -184,7 +184,7 @@ public void testMaybeReloadFromSchemaDisableDictionaryCompression()
184184
@Test
185185
public void testTrainerCompatibilityCheck()
186186
{
187-
managerWithDict.train(Map.of());
187+
managerWithDict.train(Map.of("maxSamplingDurationSeconds", "600"));
188188
String initialStatus = managerWithDict.getTrainingStatus();
189189
assertThat(initialStatus)
190190
.as("Should be sampling")
@@ -221,11 +221,36 @@ public void testAddSample()
221221
@Test
222222
public void testTrainManualWithNonDictionaryTable()
223223
{
224-
assertThatThrownBy(() -> managerWithoutDict.train(Map.of()))
224+
assertThatThrownBy(() -> managerWithoutDict.train(Map.of("maxSamplingDurationSeconds", "600")))
225225
.isInstanceOf(IllegalArgumentException.class)
226226
.hasMessageContaining("does not support dictionary compression");
227227
}
228228

229+
@Test
230+
public void testTrainManualWithMissingParameters()
231+
{
232+
assertThatThrownBy(() -> managerWithDict.train(Map.of()))
233+
.isInstanceOf(IllegalArgumentException.class)
234+
.hasMessageContaining("maxSamplingDurationSeconds parameter is required");
235+
236+
assertThatThrownBy(() -> managerWithDict.train(null))
237+
.isInstanceOf(IllegalArgumentException.class)
238+
.hasMessageContaining("maxSamplingDurationSeconds parameter is required");
239+
}
240+
241+
@Test
242+
public void testTrainManualWithInvalidParameters()
243+
{
244+
assertThatThrownBy(() -> managerWithDict.train(Map.of("maxSamplingDurationSeconds", "invalid")))
245+
.isInstanceOf(IllegalArgumentException.class)
246+
.hasMessageContaining("Invalid maxSamplingDurationSeconds value: invalid")
247+
.hasCauseInstanceOf(NumberFormatException.class);
248+
249+
assertThatThrownBy(() -> managerWithDict.train(Map.of("maxSamplingDurationSeconds", "-1")))
250+
.isInstanceOf(IllegalArgumentException.class)
251+
.hasMessageContaining("maxSamplingDurationSeconds must be positive, got: -1");
252+
}
253+
229254
@Test
230255
public void testTrainManualWithOptions()
231256
{
@@ -249,7 +274,7 @@ public void testSchemaChangeWorkflow()
249274
CompressionParams dictParams = CompressionParams.zstd(CompressionParams.DEFAULT_CHUNK_LENGTH, true,
250275
Map.of("compression_level", "3"));
251276
managerWithoutDict.maybeReloadFromSchema(dictParams);
252-
managerWithoutDict.train(Map.of());
277+
managerWithoutDict.train(Map.of("maxSamplingDurationSeconds", "600"));
253278
// Should now support training
254279
String enabledStatus = managerWithoutDict.getTrainingStatus();
255280
assertThat(enabledStatus).isEqualTo(TrainingStatus.SAMPLING.toString());
@@ -262,7 +287,7 @@ public void testSchemaChangeWorkflow()
262287
// Should still support training with new parameters
263288
String updatedStatus = managerWithoutDict.getTrainingStatus();
264289
assertThat(updatedStatus).isEqualTo(TrainingStatus.NOT_STARTED.toString());
265-
managerWithoutDict.train(Map.of());
290+
managerWithoutDict.train(Map.of("maxSamplingDurationSeconds", "600"));
266291
assertThat(enabledStatus).isEqualTo(TrainingStatus.SAMPLING.toString());
267292

268293
// Disable dictionary compression
@@ -278,7 +303,7 @@ public void testSchemaChangeWorkflow()
278303
public void testUpdateSamplingRate()
279304
{
280305
// Test with enabled dictionary manager
281-
managerWithDict.train(Map.of());
306+
managerWithDict.train(Map.of("maxSamplingDurationSeconds", "600"));
282307

283308
// Should be able to update sampling rate
284309
assertThatNoException().isThrownBy(() -> managerWithDict.updateSamplingRate(5));
@@ -299,7 +324,7 @@ public void testUpdateSamplingRateWithoutTrainer()
299324
public void testUpdateSamplingRateValidation()
300325
{
301326
// Test with enabled dictionary manager
302-
managerWithDict.train(Map.of());
327+
managerWithDict.train(Map.of("maxSamplingDurationSeconds", "600"));
303328

304329
// Test invalid sampling rates are rejected by the trainer
305330
assertThatThrownBy(() -> managerWithDict.updateSamplingRate(0))

0 commit comments

Comments
 (0)