Skip to content

Commit 810cf80

Browse files
Merge pull request #199 from datastax/RUBY-254
RUBY-254 - beta protocol support
2 parents 7a4f56e + ed78bb0 commit 810cf80

File tree

15 files changed

+275
-128
lines changed

15 files changed

+275
-128
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Features:
44
* Update Keyspace metadata to include collection of indexes defined in the keyspace.
55
* Update Table metadata to include trigger-collection and view-collection metadata.
66
* Added execution profiles to encapsulate a group of request execution options.
7+
* Added support for v5 beta protocol.
78

89
Bug Fixes:
910
* [RUBY-255](https://datastax-oss.atlassian.net/browse/RUBY-255) ControlConnection.peer_ip ignores peers that are missing critical information in system.peers.

integration/client_error_test.rb

Lines changed: 81 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -69,32 +69,43 @@ def set_failing_nodes(failing_nodes, keyspace)
6969
# @test_category error_codes
7070
#
7171
def test_raise_error_on_write_failure
72-
begin
73-
skip("Client failure errors are only available in C* after 2.2") if CCM.cassandra_version < '2.2.0'
74-
75-
cluster = Retry.with_attempts(5, Cassandra::Errors::NoHostsAvailable) { Cassandra.cluster }
76-
session = cluster.connect
72+
skip("Client failure errors are only available in C* after 2.2") if CCM.cassandra_version < '2.2.0'
7773

78-
session.execute("CREATE KEYSPACE testwritefail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}",
79-
consistency: :all) rescue nil
80-
session.execute("CREATE TABLE testwritefail.test (k int PRIMARY KEY, v int)", consistency: :all)
74+
# Create a cluster object that tries to use the beta protocol. If it succeeds, the write failure response
75+
# will have a map of <node-ip, failure-code> instead of num-failures. When v5 is officially released, we
76+
# can remove the allow_beta_protocol arg in this test.
77+
cluster = Retry.with_attempts(5, Cassandra::Errors::NoHostsAvailable) {
78+
Cassandra.cluster(allow_beta_protocol: true)
79+
}
80+
session = cluster.connect
8181

82-
# Disable one node
83-
set_failing_nodes(["node1"], "testwritefail")
82+
session.execute("CREATE KEYSPACE testwritefail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}",
83+
consistency: :all) rescue nil
84+
session.execute("CREATE TABLE IF NOT EXISTS testwritefail.test (k int PRIMARY KEY, v int)", consistency: :all)
8485

85-
# One node disabled should trigger a WriteFailure
86-
assert_raises(Cassandra::Errors::WriteError) do
87-
session.execute("INSERT INTO testwritefail.test (k, v) VALUES (1, 0)", consistency: :all)
88-
end
86+
# Disable one node
87+
set_failing_nodes(["node1"], "testwritefail")
8988

90-
# Quorum should still work with two nodes
91-
session.execute("INSERT INTO testwritefail.test (k, v) VALUES (1, 0)", consistency: :quorum)
89+
# One node disabled should trigger a WriteFailure
90+
ex = assert_raises(Cassandra::Errors::WriteError) do
91+
session.execute("INSERT INTO testwritefail.test (k, v) VALUES (1, 0)", consistency: :all)
92+
end
9293

93-
# Restart the node to clear jvm settings
94-
set_failing_nodes([], "testwritefail")
95-
ensure
96-
cluster && cluster.close
94+
# If we're speaking protocol version > 4, verify that we have a failure map in the exception.
95+
connection_options = cluster.instance_variable_get(:@connection_options)
96+
assert_operator(ex.failed, :>=, 1)
97+
if connection_options.protocol_version > 4
98+
assert_equal(ex.failed, ex.failures_by_node.size)
99+
assert_equal(0, ex.failures_by_node.values.first)
97100
end
101+
102+
# Quorum should still work with two nodes
103+
session.execute("INSERT INTO testwritefail.test (k, v) VALUES (1, 0)", consistency: :quorum)
104+
105+
# Restart the node to clear jvm settings
106+
set_failing_nodes([], "testwritefail")
107+
ensure
108+
cluster && cluster.close
98109
end
99110

100111
# Test for validating ReadError
@@ -117,33 +128,45 @@ def test_raise_error_on_write_failure
117128
def test_raise_error_on_read_failure
118129
skip("Client failure errors are only available in C* after 2.2") if CCM.cassandra_version < '2.2.0'
119130

120-
begin
121-
cluster = Retry.with_attempts(5, Cassandra::Errors::NoHostsAvailable) { Cassandra.cluster }
122-
session = cluster.connect
123-
124-
session.execute("CREATE KEYSPACE testreadfail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}",
125-
consistency: :all) rescue nil
126-
session.execute("CREATE TABLE testreadfail.test2 (k int, v0 int, v1 int, PRIMARY KEY (k, v0))", consistency: :all)
131+
# Create a cluster object that tries to use the beta protocol. If it succeeds, the write failure response
132+
# will have a map of <node-ip, failure-code> instead of num-failures. When v5 is officially released, we
133+
# can remove the allow_beta_protocol arg in this test.
134+
cluster = Retry.with_attempts(5, Cassandra::Errors::NoHostsAvailable) {
135+
Cassandra.cluster(allow_beta_protocol: true)
136+
}
137+
session = cluster.connect
138+
139+
session.execute("CREATE KEYSPACE testreadfail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}",
140+
consistency: :all) rescue nil
141+
session.execute("CREATE TABLE IF NOT EXISTS testreadfail.test2 (k int, v0 int, v1 int, PRIMARY KEY (k, v0))",
142+
consistency: :all)
143+
144+
# Insert wide rows
145+
insert = session.prepare("INSERT INTO testreadfail.test2 (k, v0, v1) VALUES (1, ?, 1)")
146+
(0..3000).each do |num|
147+
session.execute(insert, arguments: [num])
148+
end
127149

128-
# Insert wide rows
129-
insert = session.prepare("INSERT INTO testreadfail.test2 (k, v0, v1) VALUES (1, ?, 1)")
130-
(0..3000).each do |num|
131-
session.execute(insert, arguments: [num])
132-
end
150+
# Delete wide rows
151+
delete = session.prepare("DELETE v1 FROM testreadfail.test2 WHERE k = 1 AND v0 =?")
152+
(0..2001).each do |num|
153+
session.execute(delete, arguments: [num])
154+
end
133155

134-
# Delete wide rows
135-
delete = session.prepare("DELETE v1 FROM testreadfail.test2 WHERE k = 1 AND v0 =?")
136-
(0..2001).each do |num|
137-
session.execute(delete, arguments: [num])
138-
end
156+
# Tombstones should trigger ReadFailure
157+
ex = assert_raises(Cassandra::Errors::ReadError) do
158+
session.execute("SELECT * FROM testreadfail.test2 WHERE k = 1")
159+
end
139160

140-
# Tombstones should trigger ReadFailure
141-
assert_raises(Cassandra::Errors::ReadError) do
142-
session.execute("SELECT * FROM testreadfail.test2 WHERE k = 1")
143-
end
144-
ensure
145-
cluster && cluster.close
161+
# If we're speaking protocol version > 4, verify that we have a failure map in the exception.
162+
connection_options = cluster.instance_variable_get(:@connection_options)
163+
assert_operator(ex.failed, :>=, 1)
164+
if connection_options.protocol_version > 4
165+
assert_equal(ex.failed, ex.failures_by_node.size)
166+
assert_equal(1, ex.failures_by_node.values.first)
146167
end
168+
ensure
169+
cluster && cluster.close
147170
end
148171

149172
# Test for validating FunctionCallError
@@ -165,30 +188,28 @@ def test_raise_error_on_read_failure
165188
def test_raise_error_on_function_failure
166189
skip("Client failure errors are only available in C* after 2.2") if CCM.cassandra_version < '2.2.0'
167190

168-
begin
169-
cluster = Retry.with_attempts(5, Cassandra::Errors::NoHostsAvailable) { Cassandra.cluster }
170-
session = cluster.connect
191+
cluster = Retry.with_attempts(5, Cassandra::Errors::NoHostsAvailable) { Cassandra.cluster }
192+
session = cluster.connect
171193

172-
session.execute("CREATE KEYSPACE testfunctionfail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}",
173-
consistency: :all) rescue nil
174-
session.execute("CREATE TABLE testfunctionfail.d (k int PRIMARY KEY , d double)", consistency: :all)
194+
session.execute("CREATE KEYSPACE testfunctionfail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}",
195+
consistency: :all) rescue nil
196+
session.execute("CREATE TABLE IF NOT EXISTS testfunctionfail.d (k int PRIMARY KEY , d double)", consistency: :all)
175197

176-
# Create a UDF that throws an exception
177-
session.execute("CREATE FUNCTION testfunctionfail.test_failure(d double)
198+
# Create a UDF that throws an exception
199+
session.execute("CREATE FUNCTION IF NOT EXISTS testfunctionfail.test_failure(d double)
178200
RETURNS NULL ON NULL INPUT
179201
RETURNS double
180202
LANGUAGE java AS 'throw new RuntimeException(\"failure\");'",
181-
consistency: :all)
203+
consistency: :all)
182204

183-
# Insert value to use for function
184-
session.execute("INSERT INTO testfunctionfail.d (k, d) VALUES (0, 5.12)")
205+
# Insert value to use for function
206+
session.execute("INSERT INTO testfunctionfail.d (k, d) VALUES (0, 5.12)")
185207

186-
# FunctionFailure should be triggered
187-
assert_raises(Cassandra::Errors::FunctionCallError) do
188-
session.execute("SELECT test_failure(d) FROM testfunctionfail.d WHERE k = 0")
189-
end
190-
ensure
191-
cluster && cluster.close
208+
# FunctionFailure should be triggered
209+
assert_raises(Cassandra::Errors::FunctionCallError) do
210+
session.execute("SELECT test_failure(d) FROM testfunctionfail.d WHERE k = 0")
192211
end
212+
ensure
213+
cluster && cluster.close
193214
end
194215
end

lib/cassandra.rb

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ module Cassandra
5656
CLUSTER_OPTIONS = [
5757
:address_resolution,
5858
:address_resolution_policy,
59+
:allow_beta_protocol,
5960
:auth_provider,
6061
:client_cert,
6162
:client_timestamps,
@@ -196,6 +197,9 @@ module Cassandra
196197
# nodes. By default, this is auto-negotiated to the highest common protocol version
197198
# that all nodes in `:hosts` speak.
198199
#
200+
# @option options [Boolean] :allow_beta_protocol (false) whether the driver should attempt to speak to nodes
201+
# with a beta version of the newest protocol (which is still under development). USE WITH CAUTION!
202+
#
199203
# @option options [Boolean, Cassandra::TimestampGenerator] :client_timestamps (false) whether the driver
200204
# should send timestamps for each executed statement and possibly which timestamp generator to use. Enabling this
201205
# setting helps mitigate Cassandra cluster clock skew because the timestamp of the client machine will be used.
@@ -658,6 +662,7 @@ def self.validate_and_massage_options(options)
658662
options[:nodelay] = !!options[:nodelay] if options.key?(:nodelay)
659663
options[:trace] = !!options[:trace] if options.key?(:trace)
660664
options[:shuffle_replicas] = !!options[:shuffle_replicas] if options.key?(:shuffle_replicas)
665+
options[:allow_beta_protocol] = !!options[:allow_beta_protocol] if options.key?(:allow_beta_protocol)
661666

662667
if options.key?(:page_size)
663668
page_size = options[:page_size]
@@ -675,12 +680,16 @@ def self.validate_and_massage_options(options)
675680
protocol_version = options[:protocol_version]
676681
unless protocol_version.nil?
677682
Util.assert_instance_of(::Integer, protocol_version)
678-
Util.assert_one_of(1..4, protocol_version) do
679-
":protocol_version must be a positive integer, #{protocol_version.inspect} given"
680-
end
683+
Util.assert_one_of(1..Cassandra::Protocol::Versions::MAX_SUPPORTED_VERSION, protocol_version,
684+
':protocol_version must be a positive integer between 1 and ' \
685+
"#{Cassandra::Protocol::Versions::MAX_SUPPORTED_VERSION}, #{protocol_version.inspect} given"
686+
)
681687
end
682688
end
683689

690+
Util.assert(!(options[:allow_beta_protocol] && options[:protocol_version]),
691+
'only one of :allow_beta_protocol and :protocol_version may be specified, both given')
692+
684693
if options.key?(:futures_factory)
685694
futures_factory = options[:futures_factory]
686695
methods = [:error, :value, :promise, :all]

lib/cassandra/cluster/connector.rb

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,8 @@ def do_connect(host)
122122
ssl: @connection_options.ssl) do |connection|
123123
raise Errors::ClientError, 'Not connected, reactor stopped' unless connection
124124

125-
if @connection_options.nodelay?
126-
connection.to_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
127-
else
128-
connection.to_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 0)
129-
end
125+
connection.to_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY,
126+
@connection_options.nodelay? ? 1 : 0)
130127

131128
Protocol::CqlProtocolHandler.new(connection,
132129
@reactor,

lib/cassandra/cluster/options.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class Options
2525
attr_reader :auth_provider, :compressor, :connect_timeout, :credentials,
2626
:heartbeat_interval, :idle_timeout, :port, :schema_refresh_delay,
2727
:schema_refresh_timeout, :ssl, :custom_type_handlers
28-
attr_boolean :protocol_negotiable, :synchronize_schema, :nodelay
28+
attr_boolean :protocol_negotiable, :synchronize_schema, :nodelay, :allow_beta_protocol
2929

3030
attr_accessor :protocol_version
3131

@@ -46,7 +46,8 @@ def initialize(logger,
4646
schema_refresh_timeout,
4747
nodelay,
4848
requests_per_connection,
49-
custom_types)
49+
custom_types,
50+
allow_beta_protocol)
5051
@logger = logger
5152
@protocol_version = protocol_version
5253
@credentials = credentials
@@ -61,6 +62,7 @@ def initialize(logger,
6162
@schema_refresh_delay = schema_refresh_delay
6263
@schema_refresh_timeout = schema_refresh_timeout
6364
@nodelay = nodelay
65+
@allow_beta_protocol = allow_beta_protocol
6466
@custom_type_handlers = {}
6567
custom_types.each do |type_klass|
6668
@custom_type_handlers[type_klass.type] = type_klass
@@ -71,12 +73,14 @@ def initialize(logger,
7173
@requests_per_connection = requests_per_connection
7274

7375
# If @protocol_version is nil, it means we want the driver to negotiate the
74-
# protocol starting with our known max (4). If @protocol_version is not nil,
76+
# protocol starting with our known max. If @protocol_version is not nil,
7577
# it means the user wants us to use a particular version, so we should not
7678
# support negotiation.
7779

7880
@protocol_negotiable = @protocol_version.nil?
79-
@protocol_version ||= 4
81+
@protocol_version ||= allow_beta_protocol ?
82+
Cassandra::Protocol::Versions::BETA_VERSION :
83+
Cassandra::Protocol::Versions::MAX_SUPPORTED_VERSION
8084
end
8185

8286
def compression

lib/cassandra/driver.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,15 @@ def self.let(name, &block)
141141
schema_refresh_timeout,
142142
nodelay,
143143
requests_per_connection,
144-
custom_types
144+
custom_types,
145+
allow_beta_protocol
145146
)
146147
end
147148

148149
let(:custom_types) { [] }
149150
let(:port) { 9042 }
150151
let(:protocol_version) { nil }
152+
let(:allow_beta_protocol) { false }
151153
let(:connect_timeout) { 10 }
152154
let(:ssl) { false }
153155
let(:logger) { NullLogger.new }

lib/cassandra/errors.rb

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,9 @@ class WriteError < ::StandardError
348348
attr_reader :received
349349
# @return [Integer] the number of writes failed
350350
attr_reader :failed
351+
# @return [Hash<IPAddr, Integer>] map of <ip, error-code>. This is new in v5 and is nil in previous versions
352+
# of the Casssandra protocol.
353+
attr_reader :failures_by_node
351354

352355
# @private
353356
def initialize(message,
@@ -363,7 +366,8 @@ def initialize(message,
363366
consistency,
364367
required,
365368
failed,
366-
received)
369+
received,
370+
failures_by_node)
367371
super(message,
368372
payload,
369373
warnings,
@@ -378,6 +382,7 @@ def initialize(message,
378382
@required = required
379383
@failed = failed
380384
@received = received
385+
@failures_by_node = failures_by_node
381386
end
382387
end
383388

@@ -400,6 +405,9 @@ class ReadError < ::StandardError
400405
attr_reader :received
401406
# @return [Integer] the number of reads failed
402407
attr_reader :failed
408+
# @return [Hash<IPaddr, Integer>] map of <ip, error-code>. This is new in v5 and is nil in previous versions
409+
# of the Casssandra protocol.
410+
attr_reader :failures_by_node
403411

404412
# @private
405413
def initialize(message,
@@ -415,7 +423,8 @@ def initialize(message,
415423
consistency,
416424
required,
417425
failed,
418-
received)
426+
received,
427+
failures_by_node)
419428
super(message,
420429
payload,
421430
warnings,
@@ -430,6 +439,7 @@ def initialize(message,
430439
@required = required
431440
@failed = failed
432441
@received = received
442+
@failures_by_node = failures_by_node
433443
end
434444

435445
def retrieved?

lib/cassandra/protocol.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ module Constants
4949
SCHEMA_CHANGE_TARGET_FUNCTION = 'FUNCTION'.freeze
5050
SCHEMA_CHANGE_TARGET_AGGREGATE = 'AGGREGATE'.freeze
5151
end
52+
53+
module Versions
54+
BETA_VERSION = 5
55+
MAX_SUPPORTED_VERSION = 4
56+
end
5257
end
5358
end
5459

0 commit comments

Comments
 (0)