Skip to content

Commit 1f65343

Browse files
committed
Merge pull request #173 from datastax/3.0.1
Assorted 3.0.1 internal fixes.
2 parents cc29ca8 + 0b64ab0 commit 1f65343

File tree

17 files changed

+115
-76
lines changed

17 files changed

+115
-76
lines changed

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
cassandra-driver (3.0.0)
4+
cassandra-driver (3.0.1.beta.1)
55
ione (~> 1.2)
66

77
GEM

lib/cassandra.rb

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -293,28 +293,7 @@ def self.cluster(options = {})
293293
# @return [Cassandra::Future<Cassandra::Cluster>] a future resolving to the
294294
# cluster instance.
295295
def self.cluster_async(options = {})
296-
options = validate_and_massage_options(options)
297-
hosts = []
298-
299-
Array(options.fetch(:hosts, '127.0.0.1')).each do |host|
300-
case host
301-
when ::IPAddr
302-
hosts << host
303-
when ::String # ip address or hostname
304-
Resolv.each_address(host) do |ip|
305-
hosts << ::IPAddr.new(ip)
306-
end
307-
else
308-
raise ::ArgumentError, ":hosts must be String or IPAddr, #{host.inspect} given"
309-
end
310-
end
311-
312-
if hosts.empty?
313-
raise ::ArgumentError,
314-
":hosts #{options[:hosts].inspect} could not be resolved to any ip address"
315-
end
316-
317-
hosts.shuffle!
296+
options, hosts = validate_and_massage_options(options)
318297
rescue => e
319298
futures = options.fetch(:futures_factory) { return Future::Error.new(e) }
320299
futures.error(e)
@@ -664,14 +643,14 @@ def self.validate_and_massage_options(options)
664643

665644
case address_resolution
666645
when :none
667-
# do nothing
646+
# do nothing
668647
when :ec2_multi_region
669648
options[:address_resolution_policy] =
670649
AddressResolution::Policies::EC2MultiRegion.new
671650
else
672651
raise ::ArgumentError,
673652
':address_resolution must be either :none or :ec2_multi_region, ' \
674-
"#{address_resolution.inspect} given"
653+
"#{address_resolution.inspect} given"
675654
end
676655
end
677656

@@ -757,7 +736,31 @@ def self.validate_and_massage_options(options)
757736
end
758737
end
759738
end
760-
options
739+
740+
# Get host addresses.
741+
hosts = []
742+
743+
Array(options.fetch(:hosts, '127.0.0.1')).each do |host|
744+
case host
745+
when ::IPAddr
746+
hosts << host
747+
when ::String # ip address or hostname
748+
Resolv.each_address(host) do |ip|
749+
hosts << ::IPAddr.new(ip)
750+
end
751+
else
752+
raise ::ArgumentError, ":hosts must be String or IPAddr, #{host.inspect} given"
753+
end
754+
end
755+
756+
if hosts.empty?
757+
raise ::ArgumentError,
758+
":hosts #{options[:hosts].inspect} could not be resolved to any ip address"
759+
end
760+
761+
hosts.shuffle!
762+
763+
[options, hosts]
761764
end
762765

763766
# @private

lib/cassandra/auth.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ module Auth
3434
#
3535
# @see Cassandra::Auth::Providers
3636
class Provider
37-
# @!method create_authenticator(authentication_class, protocol_version)
37+
# @!method create_authenticator(authentication_class, host)
3838
#
3939
# Create a new authenticator object. This method will be called once per
4040
# connection that requires authentication. The auth provider can create
@@ -45,6 +45,8 @@ class Provider
4545
#
4646
# @param authentication_class [String] the authentication class used by
4747
# the server.
48+
# @param host [Cassandra::Host] the node to whom we're authenticating.
49+
#
4850
# @return [Cassandra::Auth::Authenticator, nil] an object with an
4951
# interface matching {Cassandra::Auth::Authenticator} or `nil` if the
5052
# authentication class is not supported.

lib/cassandra/cluster/connector.rb

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def do_connect(host)
154154
supported_cql_versions.first :
155155
'3.1.0'
156156

157-
startup_connection(connection, cql_version, compression)
157+
startup_connection(host, connection, cql_version, compression)
158158
end
159159
f.fallback do |error|
160160
case error
@@ -200,7 +200,7 @@ def do_connect(host)
200200
end
201201
end
202202

203-
def startup_connection(connection, cql_version, compression)
203+
def startup_connection(host, connection, cql_version, compression)
204204
connection.send_request(Protocol::StartupRequest.new(cql_version, compression),
205205
@execution_options.timeout).flat_map do |r|
206206
case r
@@ -213,12 +213,9 @@ def startup_connection(connection, cql_version, compression)
213213
Ione::Future.failed(cannot_authenticate_error)
214214
end
215215
else
216-
authenticator = @connection_options.create_authenticator(
217-
r.authentication_class)
216+
authenticator = @connection_options.create_authenticator(r.authentication_class, host)
218217
if authenticator
219-
challenge_response_cycle(connection,
220-
authenticator,
221-
authenticator.initial_response)
218+
challenge_response_cycle(connection, authenticator, authenticator.initial_response)
222219
else
223220
Ione::Future.failed(cannot_authenticate_error)
224221
end
@@ -283,7 +280,7 @@ def challenge_response_cycle(connection, authenticator, token)
283280
case r
284281
when Protocol::AuthChallengeResponse
285282
token = authenticator.challenge_response(r.token)
286-
challenge_response_cycle(pending_connection, authenticator, token)
283+
challenge_response_cycle(connection, authenticator, token)
287284
when Protocol::AuthSuccessResponse
288285
begin
289286
authenticator.authentication_successful(r.token)

lib/cassandra/cluster/options.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,17 @@ def compression
7878
@compressor && @compressor.algorithm
7979
end
8080

81-
def create_authenticator(authentication_class)
82-
@auth_provider && @auth_provider.create_authenticator(authentication_class)
81+
def create_authenticator(authentication_class, host)
82+
if @auth_provider
83+
# Auth providers should take an auth-class and host, but they used to not, so for backward compatibility
84+
# we figure out if this provider does, and if so send both args, otherwise just send the auth-class.
85+
86+
if @auth_provider.method(:create_authenticator).arity == 1
87+
@auth_provider.create_authenticator(authentication_class)
88+
else
89+
@auth_provider.create_authenticator(authentication_class, host)
90+
end
91+
end
8392
end
8493

8594
def connections_per_local_node

lib/cassandra/driver.rb

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,23 +92,25 @@ def self.let(name, &block)
9292
schema_fetcher)
9393
end
9494

95+
let(:cluster_klass) { Cluster }
96+
9597
let(:cluster) do
96-
Cluster.new(logger,
97-
io_reactor,
98-
executor,
99-
control_connection,
100-
cluster_registry,
101-
cluster_schema,
102-
cluster_metadata,
103-
execution_options,
104-
connection_options,
105-
load_balancing_policy,
106-
reconnection_policy,
107-
retry_policy,
108-
address_resolution_policy,
109-
connector,
110-
futures_factory,
111-
timestamp_generator)
98+
cluster_klass.new(logger,
99+
io_reactor,
100+
executor,
101+
control_connection,
102+
cluster_registry,
103+
cluster_schema,
104+
cluster_metadata,
105+
execution_options,
106+
connection_options,
107+
load_balancing_policy,
108+
reconnection_policy,
109+
retry_policy,
110+
address_resolution_policy,
111+
connector,
112+
futures_factory,
113+
timestamp_generator)
112114
end
113115

114116
let(:execution_options) do

lib/cassandra/result.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def next_page(options = nil)
7676
#
7777
# @note `:paging_state` option will be ignored.
7878
#
79-
# @return [Cassandra::Future<Cassandra::Result, nil>] `nil` if last
80-
# page
79+
# @return [Cassandra::Future<Cassandra::Result>] a future that resolves to a new Result if there is a new page,
80+
# `nil` otherwise.
8181
#
8282
# @see Cassandra::Session#execute
8383
def next_page_async(options = nil)

lib/cassandra/session.rb

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -89,20 +89,13 @@ def execute_async(statement, options = nil)
8989

9090
case statement
9191
when ::String
92-
@client.query(Statements::Simple.new(statement,
93-
options.arguments,
94-
options.type_hints,
95-
options.idempotent?),
96-
options)
97-
when Statements::Simple
98-
@client.query(statement, options)
99-
when Statements::Prepared
100-
@client.execute(statement.bind(options.arguments), options)
101-
when Statements::Bound
102-
@client.execute(statement, options)
103-
when Statements::Batch
104-
Util.assert_not_empty(statement.statements) { 'batch cannot be empty' }
105-
@client.batch(statement, options)
92+
Statements::Simple.new(statement,
93+
options.arguments,
94+
options.type_hints,
95+
options.idempotent?).accept(@client,
96+
options)
97+
when Statement
98+
statement.accept(@client, options)
10699
else
107100
@futures.error(::ArgumentError.new("unsupported statement #{statement.inspect}"))
108101
end

lib/cassandra/statement.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,10 @@ module Statement
2323
def idempotent?
2424
!!@idempotent
2525
end
26+
27+
# @private
28+
def accept(client, options)
29+
raise NotImplementedError, "#{self.class} must implement :accept method"
30+
end
2631
end
2732
end

lib/cassandra/statements/batch.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ def add(statement, options = nil)
113113
self
114114
end
115115

116+
# @private
117+
def accept(client, options)
118+
Util.assert_not_empty(statements) { 'batch cannot be empty' }
119+
client.batch(self, options)
120+
end
121+
116122
# Determines whether or not the statement is safe to retry on timeout
117123
# Batches are idempotent only when all statements in a batch are.
118124
# @return [Boolean] whether the statement is safe to retry on timeout

0 commit comments

Comments
 (0)