Skip to content

Commit 7a4f56e

Browse files
Merge pull request #197 from datastax/RUBY-256
RUBY-256: Execution profiles
2 parents a4cd54c + c923308 commit 7a4f56e

File tree

24 files changed

+979
-184
lines changed

24 files changed

+979
-184
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ Features:
33
* Do not mark a host as down if there are active connections.
44
* Update Keyspace metadata to include collection of indexes defined in the keyspace.
55
* Update Table metadata to include trigger-collection and view-collection metadata.
6+
* Added execution profiles to encapsulate a group of request execution options.
67

78
Bug Fixes:
89
* [RUBY-255](https://datastax-oss.atlassian.net/browse/RUBY-255) ControlConnection.peer_ip ignores peers that are missing critical information in system.peers.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
Feature: Execution profiles
2+
3+
Execution profiles allow a user to group various execution options into a 'profile'. A user can then execute
4+
statements with different profiles by specifying the profile name.
5+
6+
Background:
7+
Given a running cassandra cluster
8+
9+
Scenario: Configure different load balancing policies with profiles.
10+
Given the following example:
11+
"""ruby
12+
require 'cassandra'
13+
14+
include Cassandra::LoadBalancing::Policies
15+
profiles = {
16+
p1: Cassandra::Execution::Profile.new(load_balancing_policy: WhiteList.new(['127.0.0.1'], RoundRobin.new)),
17+
p2: Cassandra::Execution::Profile.new(load_balancing_policy: WhiteList.new(['127.0.0.2'], RoundRobin.new))
18+
}
19+
20+
cluster = Cassandra.cluster(execution_profiles: profiles)
21+
session = cluster.connect
22+
23+
puts "Running with default profile"
24+
25+
# By default, the driver uses a dc-aware, token-aware round-robin load balancing policy that
26+
# is notified of which nodes are available in random order. To make this test's output
27+
# deterministic, we sort the results by ip address.
28+
ip_list = []
29+
3.times do
30+
rs = session.execute('select rpc_address from system.local')
31+
ip_list << rs.first['rpc_address'].to_s
32+
end
33+
puts ip_list.sort.join("\n")
34+
35+
# p2 and p3 set up load-balancing policies that will match only one node, so there's no
36+
# issue of hitting nodes in random order.
37+
puts "Running with profile p1"
38+
3.times do
39+
rs = session.execute('select rpc_address from system.local', execution_profile: :p1)
40+
puts rs.first['rpc_address']
41+
end
42+
43+
puts "Running with profile p2"
44+
3.times do
45+
rs = session.execute('select rpc_address from system.local', execution_profile: :p2)
46+
puts rs.first['rpc_address']
47+
end
48+
"""
49+
When it is executed
50+
Then its output should contain:
51+
"""
52+
Running with default profile
53+
127.0.0.1
54+
127.0.0.2
55+
127.0.0.3
56+
Running with profile p1
57+
127.0.0.1
58+
127.0.0.1
59+
127.0.0.1
60+
Running with profile p2
61+
127.0.0.2
62+
127.0.0.2
63+
127.0.0.2
64+
"""
65+

lib/cassandra.rb

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ module Cassandra
6868
:credentials,
6969
:custom_types,
7070
:datacenter,
71+
:execution_profiles,
7172
:futures_factory,
7273
:heartbeat_interval,
7374
:hosts,
@@ -117,10 +118,14 @@ module Cassandra
117118
# found by the default Token-Aware Load Balancing Policy should be
118119
# shuffled. See {Cassandra::LoadBalancing::Policies::TokenAware#initialize Token-Aware Load Balancing Policy}.
119120
#
121+
# @option options [Hash<String|Symbol, ExecutionProfile>] :execution_profiles (nil)
122+
# Hash of {Cassandra::Execution::Profile}s that are available for client use (e.g.
123+
# {Session#execute}, {Session#execute_async}, {Session#prepare}, and {Session#prepare_async}).
124+
#
120125
# @option options [Numeric] :connect_timeout (10) connection timeout in
121126
# seconds. Setting value to `nil` will reset it to 5 seconds.
122127
#
123-
# @option options [Numeric] :timeout (10) request execution timeout in
128+
# @option options [Numeric] :timeout (12) request execution timeout in
124129
# seconds. Setting value to `nil` will remove request timeout.
125130
#
126131
# @option options [Numeric] :heartbeat_interval (30) how often should a
@@ -503,14 +508,28 @@ def self.validate_and_massage_options(options)
503508
end
504509
end
505510

511+
if options.key?(:execution_profiles)
512+
Util.assert_instance_of(::Hash, options[:execution_profiles],
513+
':execution_profiles must be a hash of <name,ExecutionProfile> entries.')
514+
end
515+
506516
if options.key?(:timeout)
507517
timeout = options[:timeout]
508518

509519
unless timeout.nil?
510-
Util.assert_instance_of(::Numeric, timeout) do
511-
":timeout must be a number of seconds, #{timeout.inspect} given"
520+
Util.assert_instance_of(::Numeric, timeout, ":timeout must be a number of seconds, #{timeout.inspect} given")
521+
Util.assert(timeout > 0, ":timeout must be greater than 0, #{timeout} given")
522+
end
523+
end
524+
if options.key?(:execution_profiles)
525+
options[:execution_profiles].each do |name, profile|
526+
timeout = profile.timeout
527+
unless timeout.nil?
528+
Util.assert_instance_of(::Numeric, timeout,
529+
":timeout of execution profile #{name} must be a number of seconds, " \
530+
"#{timeout.inspect} given")
531+
Util.assert(timeout > 0, ":timeout of execution profile #{name} must be greater than 0, #{timeout} given")
512532
end
513-
Util.assert(timeout > 0) { ":timeout must be greater than 0, #{timeout} given" }
514533
end
515534
end
516535

@@ -566,12 +585,24 @@ def self.validate_and_massage_options(options)
566585
load_balancing_policy = options[:load_balancing_policy]
567586
methods = [:host_up, :host_down, :host_found, :host_lost, :setup, :teardown,
568587
:distance, :plan]
569-
570588
Util.assert_responds_to_all(methods, load_balancing_policy) do
571589
":load_balancing_policy #{load_balancing_policy.inspect} must respond " \
572590
"to #{methods.inspect}, but doesn't"
573591
end
574592
end
593+
if options.key?(:execution_profiles)
594+
methods = [:host_up, :host_down, :host_found, :host_lost, :setup, :teardown,
595+
:distance, :plan]
596+
options[:execution_profiles].each do |name, profile|
597+
load_balancing_policy = profile.load_balancing_policy
598+
unless load_balancing_policy.nil?
599+
Util.assert_responds_to_all(methods, load_balancing_policy,
600+
":load_balancing_policy in execution profile #{name} #{load_balancing_policy.inspect} must respond " \
601+
"to #{methods.inspect}, but doesn't"
602+
)
603+
end
604+
end
605+
end
575606

576607
if options.key?(:reconnection_policy)
577608
reconnection_policy = options[:reconnection_policy]
@@ -585,21 +616,42 @@ def self.validate_and_massage_options(options)
585616
if options.key?(:retry_policy)
586617
retry_policy = options[:retry_policy]
587618
methods = [:read_timeout, :write_timeout, :unavailable]
588-
589619
Util.assert_responds_to_all(methods, retry_policy) do
590620
":retry_policy #{retry_policy.inspect} must respond to #{methods.inspect}, " \
591621
"but doesn't"
592622
end
593623
end
624+
if options.key?(:execution_profiles)
625+
methods = [:read_timeout, :write_timeout, :unavailable]
626+
options[:execution_profiles].each do |name, profile|
627+
retry_policy = profile.retry_policy
628+
unless retry_policy.nil?
629+
Util.assert_responds_to_all(methods, retry_policy,
630+
":retry_policy in execution profile #{name} #{retry_policy.inspect} must " \
631+
"respond to #{methods.inspect}, but doesn't"
632+
)
633+
end
634+
end
635+
end
594636

595637
options[:listeners] = Array(options[:listeners]) if options.key?(:listeners)
596638

597639
if options.key?(:consistency)
598640
consistency = options[:consistency]
599-
600-
Util.assert_one_of(CONSISTENCIES, consistency) do
641+
Util.assert_one_of(CONSISTENCIES, consistency,
601642
":consistency must be one of #{CONSISTENCIES.inspect}, " \
602643
"#{consistency.inspect} given"
644+
)
645+
end
646+
if options.key?(:execution_profiles)
647+
options[:execution_profiles].each do |name, profile|
648+
consistency = profile.consistency
649+
unless consistency.nil?
650+
Util.assert_one_of(CONSISTENCIES, consistency,
651+
":consistency in execution profile #{name} must be one of #{CONSISTENCIES.inspect}, " \
652+
"#{consistency.inspect} given"
653+
)
654+
end
603655
end
604656
end
605657

@@ -824,6 +876,8 @@ def self.validate_and_massage_options(options)
824876

825877
require 'cassandra/execution/info'
826878
require 'cassandra/execution/options'
879+
require 'cassandra/execution/profile'
880+
require 'cassandra/execution/profile_manager'
827881
require 'cassandra/execution/trace'
828882

829883
require 'cassandra/load_balancing'
@@ -846,7 +900,9 @@ module Cassandra
846900
# @private
847901
VOID_STATEMENT = Statements::Void.new
848902
# @private
849-
VOID_OPTIONS = Execution::Options.new(consistency: :one)
903+
VOID_OPTIONS = Execution::Options.new(consistency: :one,
904+
load_balancing_policy: LoadBalancing::Policies::RoundRobin.new,
905+
retry_policy: Retry::Policies::Default.new)
850906
# @private
851907
NO_HOSTS = Errors::NoHostsAvailable.new
852908
end

lib/cassandra/cluster.rb

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ def initialize(logger,
3636
cluster_metadata,
3737
execution_options,
3838
connection_options,
39-
load_balancing_policy,
39+
profile_manager,
4040
reconnection_policy,
41-
retry_policy,
4241
address_resolution_policy,
4342
connector,
4443
futures_factory,
@@ -52,17 +51,16 @@ def initialize(logger,
5251
@metadata = cluster_metadata
5352
@execution_options = execution_options
5453
@connection_options = connection_options
55-
@load_balancing_policy = load_balancing_policy
54+
@profile_manager = profile_manager
5655
@reconnection_policy = reconnection_policy
57-
@retry_policy = retry_policy
5856
@address_resolver = address_resolution_policy
5957
@connector = connector
6058
@futures = futures_factory
6159
@timestamp_generator = timestamp_generator
6260

6361
@control_connection.on_close do |_cause|
6462
begin
65-
@load_balancing_policy.teardown(self)
63+
@profile_manager.teardown(self)
6664
rescue
6765
nil
6866
end
@@ -201,14 +199,13 @@ def connect_async(keyspace = nil)
201199
@schema,
202200
@io_reactor,
203201
@connector,
204-
@load_balancing_policy,
202+
@profile_manager,
205203
@reconnection_policy,
206-
@retry_policy,
207204
@address_resolver,
208205
@connection_options,
209206
@futures,
210207
@timestamp_generator)
211-
session = Session.new(client, @execution_options, @futures)
208+
session = Session.new(client, @execution_options, @futures, @profile_manager)
212209
promise = @futures.promise
213210

214211
client.connect.on_complete do |f|
@@ -282,9 +279,7 @@ def inspect
282279
"name=#{name.inspect}, " \
283280
"port=#{@connection_options.port}, " \
284281
"protocol_version=#{@connection_options.protocol_version}, " \
285-
"load_balancing_policy=#{@load_balancing_policy.inspect}, " \
286-
"consistency=#{@execution_options.consistency.inspect}, " \
287-
"timeout=#{@execution_options.timeout.inspect}, " \
282+
"profile_manager=#{@profile_manager.inspect}, " \
288283
"hosts=#{hosts.inspect}, " \
289284
"keyspaces=#{keyspaces.inspect}>"
290285
end

lib/cassandra/cluster/client.rb

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ def initialize(logger,
2929
cluster_schema,
3030
io_reactor,
3131
connector,
32-
load_balancing_policy,
32+
profile_manager,
3333
reconnection_policy,
34-
retry_policy,
3534
address_resolution_policy,
3635
connection_options,
3736
futures_factory,
@@ -41,9 +40,8 @@ def initialize(logger,
4140
@schema = cluster_schema
4241
@reactor = io_reactor
4342
@connector = connector
44-
@load_balancing_policy = load_balancing_policy
43+
@profile_manager = profile_manager
4544
@reconnection_policy = reconnection_policy
46-
@retry_policy = retry_policy
4745
@address_resolver = address_resolution_policy
4846
@connection_options = connection_options
4947
@futures = futures_factory
@@ -67,7 +65,7 @@ def connect
6765

6866
@state = :connecting
6967
@registry.each_host do |host|
70-
distance = @load_balancing_policy.distance(host)
68+
distance = @profile_manager.distance(host)
7169

7270
case distance
7371
when :ignore
@@ -168,7 +166,7 @@ def host_up(host)
168166
pool_size = 0
169167

170168
synchronize do
171-
distance = @load_balancing_policy.distance(host)
169+
distance = @profile_manager.distance(host)
172170
case distance
173171
when :ignore
174172
return Ione::Future.resolved
@@ -248,7 +246,7 @@ def query(statement, options)
248246
promise = @futures.promise
249247

250248
keyspace = @keyspace
251-
plan = @load_balancing_policy.plan(keyspace, statement, options)
249+
plan = options.load_balancing_policy.plan(keyspace, statement, options)
252250

253251
send_request_by_plan(promise,
254252
keyspace,
@@ -270,7 +268,7 @@ def prepare(cql, options)
270268

271269
keyspace = @keyspace
272270
statement = VOID_STATEMENT
273-
plan = @load_balancing_policy.plan(keyspace, statement, options)
271+
plan = options.load_balancing_policy.plan(keyspace, statement, options)
274272

275273
send_request_by_plan(promise,
276274
keyspace,
@@ -303,7 +301,7 @@ def execute(statement, options)
303301
promise = @futures.promise
304302

305303
keyspace = @keyspace
306-
plan = @load_balancing_policy.plan(keyspace, statement, options)
304+
plan = options.load_balancing_policy.plan(keyspace, statement, options)
307305

308306
execute_by_plan(promise, keyspace, statement, options, request, plan, timeout)
309307

@@ -329,7 +327,7 @@ def batch(statement, options)
329327
timestamp,
330328
payload)
331329
keyspace = @keyspace
332-
plan = @load_balancing_policy.plan(keyspace, statement, options)
330+
plan = options.load_balancing_policy.plan(keyspace, statement, options)
333331
promise = @futures.promise
334332

335333
batch_by_plan(promise, keyspace, statement, options, request, plan, timeout)
@@ -1075,20 +1073,20 @@ def handle_response(response_future,
10751073

10761074
case r
10771075
when Protocol::UnavailableErrorResponse
1078-
decision = @retry_policy.unavailable(statement,
1076+
decision = options.retry_policy.unavailable(statement,
10791077
r.consistency,
10801078
r.required,
10811079
r.alive,
10821080
retries)
10831081
when Protocol::WriteTimeoutErrorResponse
1084-
decision = @retry_policy.write_timeout(statement,
1082+
decision = options.retry_policy.write_timeout(statement,
10851083
r.consistency,
10861084
r.write_type,
10871085
r.blockfor,
10881086
r.received,
10891087
retries)
10901088
when Protocol::ReadTimeoutErrorResponse
1091-
decision = @retry_policy.read_timeout(statement,
1089+
decision = options.retry_policy.read_timeout(statement,
10921090
r.consistency,
10931091
r.blockfor,
10941092
r.received,
@@ -1445,15 +1443,15 @@ def wait_for_schema_agreement(connection, schedule)
14451443
unless local.empty?
14461444
host = @registry.host(connection.host)
14471445

1448-
if host && @load_balancing_policy.distance(host) != :ignore
1446+
if host && @profile_manager.distance(host) != :ignore
14491447
versions << version = local.first['schema_version']
14501448
@logger.debug("Host #{host.ip} schema version is #{version}")
14511449
end
14521450
end
14531451

14541452
peers.each do |row|
14551453
host = @registry.host(peer_ip(row))
1456-
next unless host && @load_balancing_policy.distance(host) != :ignore
1454+
next unless host && @profile_manager.distance(host) != :ignore
14571455

14581456
versions << version = row['schema_version']
14591457
@logger.debug("Host #{host.ip} schema version is #{version}")

0 commit comments

Comments
 (0)