Skip to content

Commit fe69146

Browse files
author
Bulat Shakirzyanov
committed
add support for client timestamps and serial consistency in batch statements
1 parent b238c4f commit fe69146

File tree

9 files changed

+90
-34
lines changed

9 files changed

+90
-34
lines changed

lib/cassandra.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ module Cassandra
122122
# address resolver to use. Must be one of `:none` or
123123
# `:ec2_multi_region`.
124124
#
125+
# @option options [Boolean] :client_timestamps (false) whether the driver
126+
# should send timestamps for each execited statement. Enabling this setting
127+
# allows mitigating Cassandra cluster clock skew because the timestamp of
128+
# the client machine will be used. This does not help mitigate application
129+
# cluster clock skew.
130+
#
125131
# @option options [Boolean] :synchronize_schema (true) whether the driver
126132
# should automatically keep schema metadata synchronized. When enabled, the
127133
# driver updates schema metadata after receiving schema change
@@ -217,7 +223,7 @@ def self.cluster_async(options = {})
217223
:connect_timeout, :futures_factory, :datacenter, :address_resolution,
218224
:address_resolution_policy, :idle_timeout, :heartbeat_interval, :timeout,
219225
:synchronize_schema, :schema_refresh_delay, :schema_refresh_timeout,
220-
:shuffle_replicas
226+
:shuffle_replicas, :client_timestamps
221227
].include?(key)
222228
end
223229

@@ -476,6 +482,10 @@ def self.cluster_async(options = {})
476482
options[:synchronize_schema] = !!options[:synchronize_schema]
477483
end
478484

485+
if options.has_key?(:client_timestamps)
486+
options[:client_timestamps] = !!options[:client_timestamps]
487+
end
488+
479489
hosts = []
480490

481491
Array(options.fetch(:hosts, '127.0.0.1')).each do |host|

lib/cassandra/cluster/client.rb

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,11 @@ def keyspace_dropped(keyspace)
207207
def query(statement, options)
208208
return @futures.error(Errors::ClientError.new("Positional arguments are not supported by the current version of Apache Cassandra")) if !statement.params.empty? && @connection_options.protocol_version == 1
209209

210-
request = Protocol::QueryRequest.new(statement.cql, statement.params, statement.params_types, options.consistency, options.serial_consistency, options.page_size, options.paging_state, options.trace?, statement.params_names)
211-
timeout = options.timeout
212-
promise = @futures.promise
210+
timestamp = nil
211+
timestamp = Time.now if @connection_options.client_timestamps? && @connection_options.protocol_version > 2
212+
request = Protocol::QueryRequest.new(statement.cql, statement.params, statement.params_types, options.consistency, options.serial_consistency, options.page_size, options.paging_state, options.trace?, statement.params_names, timestamp)
213+
timeout = options.timeout
214+
promise = @futures.promise
213215

214216
keyspace = @keyspace
215217
plan = @load_balancing_policy.plan(keyspace, statement, options)
@@ -234,9 +236,11 @@ def prepare(cql, options)
234236
end
235237

236238
def execute(statement, options)
239+
timestamp = nil
240+
timestamp = Time.now if @connection_options.client_timestamps? && @connection_options.protocol_version > 2
237241
timeout = options.timeout
238242
result_metadata = statement.result_metadata
239-
request = Protocol::ExecuteRequest.new(nil, statement.params_types, statement.params, result_metadata.nil?, options.consistency, options.serial_consistency, options.page_size, options.paging_state, options.trace?)
243+
request = Protocol::ExecuteRequest.new(nil, statement.params_types, statement.params, result_metadata.nil?, options.consistency, options.serial_consistency, options.page_size, options.paging_state, options.trace?, timestamp)
240244
promise = @futures.promise
241245

242246
keyspace = @keyspace
@@ -250,12 +254,15 @@ def execute(statement, options)
250254
def batch(statement, options)
251255
return @futures.error(Errors::ClientError.new("Batch statements are not supported by the current version of Apache Cassandra")) if @connection_options.protocol_version < 2
252256

253-
timeout = options.timeout
254-
keyspace = @keyspace
255-
plan = @load_balancing_policy.plan(keyspace, statement, options)
256-
promise = @futures.promise
257+
timestamp = nil
258+
timestamp = Time.now if @connection_options.client_timestamps? && @connection_options.protocol_version > 2
259+
timeout = options.timeout
260+
request = Protocol::BatchRequest.new(BATCH_TYPES[statement.type], options.consistency, options.trace?, options.serial_consistency, timestamp)
261+
keyspace = @keyspace
262+
plan = @load_balancing_policy.plan(keyspace, statement, options)
263+
promise = @futures.promise
257264

258-
batch_by_plan(promise, keyspace, statement, options, plan, timeout)
265+
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout)
259266

260267
promise.future
261268
end
@@ -509,7 +516,7 @@ def prepare_and_send_request_by_plan(host, connection, promise, keyspace, statem
509516
end
510517
end
511518

512-
def batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors = nil, hosts = [])
519+
def batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors = nil, hosts = [])
513520
unless plan.has_next?
514521
promise.break(Errors::NoHostsAvailable.new(errors))
515522
return
@@ -522,7 +529,7 @@ def batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors =
522529
unless pool
523530
errors ||= {}
524531
errors[host] = NOT_CONNECTED
525-
return batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors, hosts)
532+
return batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
526533
end
527534

528535
connection = pool.random_connection
@@ -531,31 +538,31 @@ def batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors =
531538
switch = switch_keyspace(connection, keyspace, timeout)
532539
switch.on_complete do |s|
533540
if s.resolved?
534-
batch_and_send_request_by_plan(host, connection, promise, keyspace, statement, options, plan, timeout, errors, hosts)
541+
batch_and_send_request_by_plan(host, connection, promise, keyspace, statement, request, options, plan, timeout, errors, hosts)
535542
else
536543
s.on_failure do |e|
537544
case e
538545
when Errors::HostError
539546
errors ||= {}
540547
errors[host] = e
541-
batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors, hosts)
548+
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
542549
else
543550
promise.break(e)
544551
end
545552
end
546553
end
547554
end
548555
else
549-
batch_and_send_request_by_plan(host, connection, promise, keyspace, statement, options, plan, timeout, errors, hosts)
556+
batch_and_send_request_by_plan(host, connection, promise, keyspace, statement, request, options, plan, timeout, errors, hosts)
550557
end
551558
rescue => e
552559
errors ||= {}
553560
errors[host] = e
554-
batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors, hosts)
561+
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
555562
end
556563

557-
def batch_and_send_request_by_plan(host, connection, promise, keyspace, statement, options, plan, timeout, errors, hosts)
558-
request = Protocol::BatchRequest.new(BATCH_TYPES[statement.type], options.consistency, options.trace?)
564+
def batch_and_send_request_by_plan(host, connection, promise, keyspace, statement, request, options, plan, timeout, errors, hosts)
565+
request.clear
559566
unprepared = Hash.new {|hash, cql| hash[cql] = []}
560567

561568
statement.statements.each do |statement|
@@ -598,7 +605,7 @@ def batch_and_send_request_by_plan(host, connection, promise, keyspace, statemen
598605
when Errors::HostError
599606
errors ||= {}
600607
errors[host] = e
601-
batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors, hosts)
608+
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
602609
else
603610
promise.break(e)
604611
end
@@ -711,7 +718,7 @@ def do_send_request_by_plan(host, connection, promise, keyspace, statement, opti
711718
when Protocol::ExecuteRequest
712719
execute_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
713720
when Protocol::BatchRequest
714-
batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors, hosts)
721+
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
715722
end
716723
else
717724
promise.break(error)
@@ -774,7 +781,7 @@ def do_send_request_by_plan(host, connection, promise, keyspace, statement, opti
774781
when Protocol::ExecuteRequest
775782
execute_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
776783
when Protocol::BatchRequest
777-
batch_by_plan(promise, keyspace, statement, options, plan, timeout, errors, hosts)
784+
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout, errors, hosts)
778785
else
779786
promise.break(e)
780787
end

lib/cassandra/cluster/options.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Options
2525
:schema_refresh_delay, :schema_refresh_timeout
2626
attr_accessor :protocol_version
2727

28-
def initialize(protocol_version, credentials, auth_provider, compressor, port, connect_timeout, ssl, connections_per_local_node, connections_per_remote_node, heartbeat_interval, idle_timeout, synchronize_schema, schema_refresh_delay, schema_refresh_timeout)
28+
def initialize(protocol_version, credentials, auth_provider, compressor, port, connect_timeout, ssl, connections_per_local_node, connections_per_remote_node, heartbeat_interval, idle_timeout, synchronize_schema, schema_refresh_delay, schema_refresh_timeout, client_timestamps)
2929
@protocol_version = protocol_version
3030
@credentials = credentials
3131
@auth_provider = auth_provider
@@ -38,6 +38,7 @@ def initialize(protocol_version, credentials, auth_provider, compressor, port, c
3838
@synchronize_schema = synchronize_schema
3939
@schema_refresh_delay = schema_refresh_delay
4040
@schema_refresh_timeout = schema_refresh_timeout
41+
@client_timestamps = client_timestamps
4142

4243
@connections_per_local_node = connections_per_local_node
4344
@connections_per_remote_node = connections_per_remote_node
@@ -47,6 +48,10 @@ def synchronize_schema?
4748
@synchronize_schema
4849
end
4950

51+
def client_timestamps?
52+
@client_timestamps
53+
end
54+
5055
def compression
5156
@compressor && @compressor.algorithm
5257
end

lib/cassandra/driver.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ def self.let(name, &block)
8686
idle_timeout,
8787
synchronize_schema,
8888
schema_refresh_delay,
89-
schema_refresh_timeout
89+
schema_refresh_timeout,
90+
client_timestamps
9091
)
9192
end
9293

@@ -114,6 +115,7 @@ def self.let(name, &block)
114115
let(:schema_refresh_timeout) { 10 }
115116
let(:thread_pool_size) { 4 }
116117
let(:shuffle_replicas) { true }
118+
let(:client_timestamps) { false }
117119

118120
let(:connections_per_local_node) { 2 }
119121
let(:connections_per_remote_node) { 1 }

lib/cassandra/protocol/cql_byte_buffer.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ def append_string_map(map)
258258
self
259259
end
260260

261+
def append_timestamp(timestamp)
262+
append_long(timestamp.tv_sec * 1000000 + timestamp.tv_usec)
263+
end
264+
261265
def append_long(n)
262266
top = n >> 32
263267
bottom = n & 0xffffffff

lib/cassandra/protocol/requests/batch_request.rb

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,21 @@ class BatchRequest < Request
2323
UNLOGGED_TYPE = 1
2424
COUNTER_TYPE = 2
2525

26-
attr_reader :type
26+
attr_reader :type, :timestamp
2727
attr_accessor :consistency, :retries
2828

29-
def initialize(type, consistency, trace=false)
29+
def initialize(type, consistency, trace=false, serial_consistency = nil, timestamp = nil)
3030
super(0x0D, trace)
3131
@type = type
3232
@parts = []
3333
@consistency = consistency
34+
@serial_consistency = serial_consistency
35+
@timestamp = timestamp
36+
end
37+
38+
def clear
39+
@parts.clear
40+
nil
3441
end
3542

3643
def add_query(cql, values, types)
@@ -54,9 +61,13 @@ def write(buffer, protocol_version, encoder)
5461
buffer.append_consistency(@consistency)
5562

5663
if protocol_version > 2
57-
flags = 0
64+
flags = 0
65+
flags |= 0x10 if @serial_consistency
66+
flags |= 0x20 if @timestamp
5867

5968
buffer.append(flags.chr)
69+
buffer.append_consistency(@serial_consistency) if @serial_consistency
70+
buffer.append_timestamp(@timestamp) if @timestamp
6071
end
6172

6273
buffer

lib/cassandra/protocol/requests/execute_request.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
module Cassandra
2020
module Protocol
2121
class ExecuteRequest < Request
22-
attr_reader :metadata, :values, :request_metadata, :serial_consistency, :page_size, :paging_state
22+
attr_reader :metadata, :values, :request_metadata, :serial_consistency, :page_size, :paging_state, :timestamp
2323
attr_accessor :consistency, :retries, :id
2424

25-
def initialize(id, metadata, values, request_metadata, consistency, serial_consistency=nil, page_size=nil, paging_state=nil, trace=false)
25+
def initialize(id, metadata, values, request_metadata, consistency, serial_consistency=nil, page_size=nil, paging_state=nil, trace=false, timestamp = nil)
2626
raise ArgumentError, "Metadata for #{metadata.size} columns, but #{values.size} values given" if metadata.size != values.size
2727
raise ArgumentError, %(No such consistency: #{consistency.inspect}) if consistency.nil? || !CONSISTENCIES.include?(consistency)
2828
raise ArgumentError, %(No such consistency: #{serial_consistency.inspect}) unless serial_consistency.nil? || CONSISTENCIES.include?(serial_consistency)
@@ -36,6 +36,7 @@ def initialize(id, metadata, values, request_metadata, consistency, serial_consi
3636
@serial_consistency = serial_consistency
3737
@page_size = page_size
3838
@paging_state = paging_state
39+
@timestamp = timestamp
3940
end
4041

4142
def write(buffer, protocol_version, encoder)
@@ -48,13 +49,19 @@ def write(buffer, protocol_version, encoder)
4849
flags |= 0x04 if @page_size
4950
flags |= 0x08 if @paging_state
5051
flags |= 0x10 if @serial_consistency
52+
if protocol_version > 2
53+
flags |= 0x20 if @timestamp
54+
end
5155
buffer.append(flags.chr)
5256
if @values.size > 0
5357
encoder.write_parameters(buffer, @values, @metadata)
5458
end
5559
buffer.append_int(@page_size) if @page_size
5660
buffer.append_bytes(@paging_state) if @paging_state
5761
buffer.append_consistency(@serial_consistency) if @serial_consistency
62+
if protocol_version > 2
63+
buffer.append_timestamp(@timestamp) if @timestamp
64+
end
5865
else
5966
encoder.write_parameters(buffer, @values, @metadata)
6067
buffer.append_consistency(@consistency)

lib/cassandra/protocol/requests/query_request.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
module Cassandra
2020
module Protocol
2121
class QueryRequest < Request
22-
attr_reader :cql, :values, :type_hints, :serial_consistency, :page_size, :paging_state
22+
attr_reader :cql, :values, :type_hints, :serial_consistency, :page_size, :paging_state, :timestamp
2323
attr_accessor :consistency, :retries
2424

25-
def initialize(cql, values, type_hints, consistency, serial_consistency = nil, page_size = nil, paging_state = nil, trace = false, names = EMPTY_LIST)
25+
def initialize(cql, values, type_hints, consistency, serial_consistency = nil, page_size = nil, paging_state = nil, trace = false, names = EMPTY_LIST, timestamp = nil)
2626
super(7, trace)
2727
@cql = cql
2828
@values = values
@@ -32,6 +32,7 @@ def initialize(cql, values, type_hints, consistency, serial_consistency = nil, p
3232
@page_size = page_size
3333
@paging_state = paging_state
3434
@names = names
35+
@timestamp = timestamp
3536
end
3637

3738
def write(buffer, protocol_version, encoder)
@@ -44,7 +45,10 @@ def write(buffer, protocol_version, encoder)
4445
flags |= 0x10 if @serial_consistency
4546
if @values && @values.size > 0
4647
flags |= 0x01
47-
flags |= 0x40 unless @names.empty?
48+
if protocol_version > 2
49+
flags |= 0x20 if @timestamp
50+
flags |= 0x40 unless @names.empty?
51+
end
4852
buffer.append(flags.chr)
4953
encoder.write_parameters(buffer, @values, @type_hints, @names)
5054
else
@@ -53,6 +57,9 @@ def write(buffer, protocol_version, encoder)
5357
buffer.append_int(@page_size) if @page_size
5458
buffer.append_bytes(@paging_state) if @paging_state
5559
buffer.append_consistency(@serial_consistency) if @serial_consistency
60+
if protocol_version > 2
61+
buffer.append_timestamp(@timestamp) if @timestamp
62+
end
5663
end
5764
buffer
5865
end

spec/cassandra/cluster/client_spec.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,8 @@ class Cluster
770770

771771
client.connect.value
772772

773-
expect(Cassandra::Protocol::BatchRequest).to receive(:new).once.with(0, :one, false).and_return(batch_request)
773+
expect(Cassandra::Protocol::BatchRequest).to receive(:new).once.with(0, :one, false, nil, nil).and_return(batch_request)
774+
allow(batch_request).to receive(:clear)
774775
expect(batch_request).to receive(:add_query).once.with('INSERT INTO songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?)', [1, 2, 3, 4, 5], [Cassandra::Types.bigint, Cassandra::Types.bigint, Cassandra::Types.bigint, Cassandra::Types.bigint, Cassandra::Types.bigint])
775776
expect(batch_request).to receive(:retries=).once.with(0)
776777
client.batch(batch, Execution::Options.new(:consistency => :one, :trace => false)).get
@@ -810,7 +811,8 @@ class Cluster
810811

811812
batch.add(statement, [Cassandra::Uuid.new(1), 'some title', 'some album', 'some artist', Set['cool', 'stuff']])
812813

813-
expect(Cassandra::Protocol::BatchRequest).to receive(:new).once.with(0, :one, false).and_return(batch_request)
814+
expect(Cassandra::Protocol::BatchRequest).to receive(:new).once.with(0, :one, false, nil, nil).and_return(batch_request)
815+
allow(batch_request).to receive(:clear)
814816
expect(batch_request).to receive(:add_prepared).once.with(123, [Cassandra::Uuid.new(1), 'some title', 'some album', 'some artist', Set['cool', 'stuff']], params_metadata.map(&:last))
815817
expect(batch_request).to receive(:retries=).once.with(0)
816818
client.batch(batch, Execution::Options.new(:consistency => :one, :trace => false)).get
@@ -852,7 +854,8 @@ class Cluster
852854

853855
batch.add(statement, [Cassandra::Uuid.new(1), 'some title', 'some album', 'some artist', Set['cool', 'stuff']])
854856

855-
expect(Cassandra::Protocol::BatchRequest).to receive(:new).once.with(0, :one, false).and_return(batch_request)
857+
expect(Cassandra::Protocol::BatchRequest).to receive(:new).once.with(0, :one, false, nil, nil).and_return(batch_request)
858+
allow(batch_request).to receive(:clear)
856859
expect(batch_request).to receive(:add_prepared).once.with(123, [Cassandra::Uuid.new(1), 'some title', 'some album', 'some artist', Set['cool', 'stuff']], params_metadata.map(&:last))
857860
expect(batch_request).to receive(:retries=).once.with(0)
858861

0 commit comments

Comments
 (0)