Skip to content

Commit 504322d

Browse files
Gor027avelanarius
authored andcommitted
Add per partition rate limit table options
Added `maxReadsPerSecond` and `maxWritesPerSecond` options to `TableOptions` which will allow convenient creation of CREATE and ALTER statements with rate limit options for read and write operations. Both maxReadsPerSecond and maxWritesPerSecond are optional - omitting one of them means "no limit" for that type of operation. Fixes #166
1 parent 143ae5f commit 504322d

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

driver-core/src/main/java/com/datastax/driver/core/schemabuilder/TableOptions.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public abstract class TableOptions<T extends TableOptions> extends SchemaStateme
6666

6767
private Optional<Boolean> replicateOnWrite = Optional.absent();
6868

69+
private Optional<Integer> maxReadsPerSecond = Optional.absent();
70+
71+
private Optional<Integer> maxWritesPerSecond = Optional.absent();
72+
6973
private Optional<SpeculativeRetryValue> speculativeRetry = Optional.absent();
7074

7175
private Optional<Boolean> cdc = Optional.absent();
@@ -348,6 +352,36 @@ public T replicateOnWrite(Boolean replicateOnWrite) {
348352
return self;
349353
}
350354

355+
/**
356+
* Sets rate limit for read operations in table option "per_partition_rate_limit". NOTE: Due to
357+
* ScyllaDB’s distributed nature, tracking per-partition request rates is not perfect and the
358+
* actual rate of accepted requests may be higher up to a factor of keyspace’s RF. This feature
359+
* should not be used to enforce precise limits but rather serve as an overload protection
360+
* feature.
361+
*
362+
* @param maxReadsPerSecond rate limit for read operations
363+
* @return this {@code TableOptions} object.
364+
*/
365+
public T maxReadsPerSecond(int maxReadsPerSecond) {
366+
this.maxReadsPerSecond = Optional.of(maxReadsPerSecond);
367+
return self;
368+
}
369+
370+
/**
371+
* Sets rate limit for write operations in table option "per_partition_rate_limit". NOTE: Due to
372+
* ScyllaDB’s distributed nature, tracking per-partition request rates is not perfect and the
373+
* actual rate of accepted requests may be higher up to a factor of keyspace’s RF. This feature
374+
* should not be used to enforce precise limits but rather serve as an overload protection
375+
* feature.
376+
*
377+
* @param maxWritesPerSecond rate limit for write operations
378+
* @return this {@code TableOptions} object.
379+
*/
380+
public T maxWritesPerSecond(int maxWritesPerSecond) {
381+
this.maxWritesPerSecond = Optional.of(maxWritesPerSecond);
382+
return self;
383+
}
384+
351385
/**
352386
* To override normal read timeout when read_repair_chance is not 1.0, sending another request to
353387
* read, choose one of these values and use the property to create or alter the table:
@@ -518,6 +552,25 @@ private List<String> buildCommonOptions() {
518552
options.add("replicate_on_write = " + replicateOnWrite.get());
519553
}
520554

555+
if (maxReadsPerSecond.isPresent() || maxWritesPerSecond.isPresent()) {
556+
StringBuilder sBuilder = new StringBuilder("per_partition_rate_limit = {");
557+
558+
if (maxReadsPerSecond.isPresent()) {
559+
sBuilder.append("'max_reads_per_second': ").append(maxReadsPerSecond.get());
560+
561+
if (maxWritesPerSecond.isPresent()) {
562+
sBuilder.append(", ");
563+
}
564+
}
565+
566+
if (maxWritesPerSecond.isPresent()) {
567+
sBuilder.append("'max_writes_per_second': ").append(maxWritesPerSecond.get());
568+
}
569+
570+
sBuilder.append("}");
571+
options.add(sBuilder.toString());
572+
}
573+
521574
if (speculativeRetry.isPresent()) {
522575
options.add("speculative_retry = " + speculativeRetry.get().value());
523576
}

driver-core/src/test/java/com/datastax/driver/core/schemabuilder/AlterTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ public void should_alter_table_options() throws Exception {
121121
.memtableFlushPeriodInMillis(12)
122122
.populateIOCacheOnFlush(true)
123123
.replicateOnWrite(true)
124+
.maxReadsPerSecond(123)
125+
.maxWritesPerSecond(456)
124126
.readRepairChance(0.42)
125127
.speculativeRetry(always())
126128
.cdc(true);
@@ -147,6 +149,7 @@ public void should_alter_table_options() throws Exception {
147149
+ "AND populate_io_cache_on_flush = true "
148150
+ "AND read_repair_chance = 0.42 "
149151
+ "AND replicate_on_write = true "
152+
+ "AND per_partition_rate_limit = {'max_reads_per_second': 123, 'max_writes_per_second': 456} "
150153
+ "AND speculative_retry = 'ALWAYS' "
151154
+ "AND cdc = true");
152155

driver-core/src/test/java/com/datastax/driver/core/schemabuilder/CreateTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,30 @@ public void should_create_table_with_compact_storage() throws Exception {
351351
+ "WITH COMPACT STORAGE");
352352
}
353353

354+
@Test(groups = "unit")
355+
public void should_omit_write_rate_limit_option() throws Exception {
356+
// When
357+
SchemaStatement statement =
358+
createTable("test")
359+
.addPartitionKey("id", DataType.bigint())
360+
.addClusteringColumn("col1", DataType.uuid())
361+
.addClusteringColumn("col2", DataType.uuid())
362+
.addColumn("name", DataType.text())
363+
.withOptions()
364+
.maxReadsPerSecond(123);
365+
366+
// Then
367+
assertThat(statement.getQueryString())
368+
.isEqualTo(
369+
"\n\tCREATE TABLE test(\n\t\t"
370+
+ "id bigint,\n\t\t"
371+
+ "col1 uuid,\n\t\t"
372+
+ "col2 uuid,\n\t\t"
373+
+ "name text,\n\t\t"
374+
+ "PRIMARY KEY(id, col1, col2))\n\t"
375+
+ "WITH per_partition_rate_limit = {'max_reads_per_second': 123}");
376+
}
377+
354378
@Test(groups = "unit")
355379
public void should_create_table_with_all_options() throws Exception {
356380
// When
@@ -378,6 +402,8 @@ public void should_create_table_with_all_options() throws Exception {
378402
.populateIOCacheOnFlush(true)
379403
.readRepairChance(0.05)
380404
.replicateOnWrite(true)
405+
.maxReadsPerSecond(123)
406+
.maxWritesPerSecond(456)
381407
.speculativeRetry(always())
382408
.cdc(true);
383409

@@ -403,6 +429,7 @@ public void should_create_table_with_all_options() throws Exception {
403429
+ "AND populate_io_cache_on_flush = true "
404430
+ "AND read_repair_chance = 0.05 "
405431
+ "AND replicate_on_write = true "
432+
+ "AND per_partition_rate_limit = {'max_reads_per_second': 123, 'max_writes_per_second': 456} "
406433
+ "AND speculative_retry = 'ALWAYS' "
407434
+ "AND cdc = true AND CLUSTERING ORDER BY(col1 ASC, col2 DESC) AND COMPACT STORAGE");
408435
}

0 commit comments

Comments
 (0)