@@ -47,7 +47,7 @@ def initialize(logger,
4747 @futures = futures_factory
4848 @connections = ::Hash . new
4949 @prepared_statements = ::Hash . new
50- @preparing_statements = ::Hash . new
50+ @preparing_statements = ::Hash . new { | hash , host | hash [ host ] = { } }
5151 @pending_connections = ::Hash . new
5252 @keyspace = nil
5353 @state = :idle
@@ -83,7 +83,6 @@ def connect
8383
8484 connecting_hosts [ host ] = pool_size
8585 @pending_connections [ host ] = 0
86- @prepared_statements [ host ] = { }
8786 @preparing_statements [ host ] = { }
8887 @connections [ host ] = ConnectionPool . new
8988 end
@@ -182,7 +181,6 @@ def host_up(host)
182181 end
183182
184183 @pending_connections [ host ] ||= 0
185- @prepared_statements [ host ] = { }
186184 @preparing_statements [ host ] = { }
187185 @connections [ host ] = ConnectionPool . new
188186 end
@@ -197,7 +195,6 @@ def host_down(host)
197195 return Ione ::Future . resolved unless @connections . key? ( host )
198196
199197 @pending_connections . delete ( host ) unless @pending_connections [ host ] > 0
200- @prepared_statements . delete ( host )
201198 @preparing_statements . delete ( host )
202199 pool = @connections . delete ( host )
203200 end
@@ -632,15 +629,13 @@ def prepare_and_send_request_by_plan(host,
632629 errors ,
633630 hosts )
634631 cql = statement . cql
635- id = nil
636- host_is_up = true
637- synchronize do
638- if @prepared_statements [ host ] . nil?
639- host_is_up = false
640- else
641- id = @prepared_statements [ host ] [ cql ]
642- end
643- end
632+
633+ # Get the prepared statement id for this statement from our cache if possible. We are optimistic
634+ # that the statement has previously been prepared on all hosts, so the id will be valid. However, if
635+ # we're in the midst of preparing the statement on the given host, we know that executing with the id
636+ # will fail. So, act like we don't have the prepared-statement id in that case.
637+
638+ id = synchronize { @preparing_statements [ host ] [ cql ] ? nil : @prepared_statements [ cql ] }
644639
645640 if id
646641 request . id = id
@@ -655,19 +650,6 @@ def prepare_and_send_request_by_plan(host,
655650 timeout ,
656651 errors ,
657652 hosts )
658- elsif !host_is_up
659- # We've hit a race condition where the plan says we can query this host, but the host has gone
660- # down in the mean time. Just execute the plan again on the next host.
661- @logger . debug ( "#{ host } is down; executing plan on next host" )
662- execute_by_plan ( promise ,
663- keyspace ,
664- statement ,
665- options ,
666- request ,
667- plan ,
668- timeout ,
669- errors ,
670- hosts )
671653 else
672654 prepare = prepare_statement ( host , connection , cql , timeout )
673655 prepare . on_complete do |_ |
@@ -825,29 +807,15 @@ def batch_and_send_request_by_plan(host,
825807 cql = statement . cql
826808
827809 if statement . is_a? ( Statements ::Bound )
828- host_is_up = true
829- id = nil
830- synchronize do
831- if @prepared_statements [ host ] . nil?
832- host_is_up = false
833- else
834- id = @prepared_statements [ host ] [ cql ]
835- end
836- end
810+ # Get the prepared statement id for this statement from our cache if possible. We are optimistic
811+ # that the statement has previously been prepared on all hosts, so the id will be valid. However, if
812+ # we're in the midst of preparing the statement on the given host, we know that executing with the id
813+ # will fail. So, act like we don't have the prepared-statement id in that case.
814+
815+ id = synchronize { @preparing_statements [ host ] [ cql ] ? nil : @prepared_statements [ cql ] }
837816
838817 if id
839818 request . add_prepared ( id , statement . params , statement . params_types )
840- elsif !host_is_up
841- @logger . debug ( "#{ host } is down; executing on next host in plan" )
842- return batch_by_plan ( promise ,
843- keyspace ,
844- batch_statement ,
845- options ,
846- request ,
847- plan ,
848- timeout ,
849- errors ,
850- hosts )
851819 else
852820 unprepared [ cql ] << statement
853821 end
@@ -1093,17 +1061,23 @@ def handle_response(response_future,
10931061 r . data_present ,
10941062 retries )
10951063 when Protocol ::UnpreparedErrorResponse
1096- cql = statement . cql
1097-
1098- synchronize do
1099- @preparing_statements [ host ] . delete ( cql )
1100- @prepared_statements [ host ] . delete ( cql )
1064+ cql = nil
1065+ if statement . is_a? ( Cassandra ::Statements ::Batch )
1066+ # Find the prepared statement with the prepared-statement-id reported by the node.
1067+ unprepared_child = statement . statements . select do |s |
1068+ ( s . is_a? ( Cassandra ::Statements ::Prepared ) || s . is_a? ( Cassandra ::Statements ::Bound ) ) && s . id == r . id
1069+ end . first
1070+ cql = unprepared_child ? unprepared_child . cql : nil
1071+ else
1072+ # This is a normal statement, so we have everything we need.
1073+ cql = statement . cql
1074+ synchronize { @preparing_statements [ host ] . delete ( cql ) }
11011075 end
11021076
11031077 prepare = prepare_statement ( host , connection , cql , timeout )
11041078 prepare . on_complete do |_ |
11051079 if prepare . resolved?
1106- request . id = prepare . value
1080+ request . id = prepare . value unless request . is_a? ( Cassandra :: Protocol :: BatchRequest )
11071081 do_send_request_by_plan ( host ,
11081082 connection ,
11091083 promise ,
@@ -1198,7 +1172,7 @@ def handle_response(response_future,
11981172 when Protocol ::PreparedResultResponse
11991173 cql = request . cql
12001174 synchronize do
1201- @prepared_statements [ host ] [ cql ] = r . id
1175+ @prepared_statements [ cql ] = r . id
12021176 @preparing_statements [ host ] . delete ( cql )
12031177 end
12041178
@@ -1207,7 +1181,8 @@ def handle_response(response_future,
12071181 pk_idx ||= @schema . get_pk_idx ( metadata )
12081182
12091183 promise . fulfill (
1210- Statements ::Prepared . new ( r . custom_payload ,
1184+ Statements ::Prepared . new ( r . id ,
1185+ r . custom_payload ,
12111186 r . warnings ,
12121187 cql ,
12131188 metadata ,
@@ -1531,7 +1506,7 @@ def prepare_statement(host, connection, cql, timeout)
15311506 when Protocol ::PreparedResultResponse
15321507 id = r . id
15331508 synchronize do
1534- @prepared_statements [ host ] [ cql ] = id
1509+ @prepared_statements [ cql ] = id
15351510 @preparing_statements [ host ] . delete ( cql )
15361511 end
15371512 id
0 commit comments