@@ -854,7 +854,8 @@ mod tests {
854
854
cass_statement_set_execution_profile_n,
855
855
} ,
856
856
future:: {
857
- cass_future_error_code, cass_future_error_message, cass_future_free, cass_future_wait,
857
+ cass_future_error_code, cass_future_error_message, cass_future_free,
858
+ cass_future_set_callback, cass_future_wait,
858
859
} ,
859
860
retry_policy:: {
860
861
CassRetryPolicy , cass_retry_policy_default_new, cass_retry_policy_fallthrough_new,
@@ -866,8 +867,10 @@ mod tests {
866
867
use std:: {
867
868
collections:: HashSet ,
868
869
convert:: { TryFrom , TryInto } ,
870
+ ffi:: { CStr , c_void} ,
869
871
iter,
870
872
net:: SocketAddr ,
873
+ sync:: atomic:: { AtomicUsize , Ordering } ,
871
874
} ;
872
875
873
876
// This is for convenient logs from failing tests. Just call it at the beginning of a test.
@@ -1885,4 +1888,121 @@ mod tests {
1885
1888
. with_current_subscriber ( )
1886
1889
. await ;
1887
1890
}
1891
+
1892
+ #[ tokio:: test]
1893
+ #[ ntest:: timeout( 5000 ) ]
1894
+ async fn session_free_waits_for_requests_to_complete ( ) {
1895
+ init_logger ( ) ;
1896
+ test_with_one_proxy (
1897
+ session_free_waits_for_requests_to_complete_do,
1898
+ mock_init_rules ( ) ,
1899
+ )
1900
+ . with_current_subscriber ( )
1901
+ . await ;
1902
+ }
1903
+
1904
+ fn session_free_waits_for_requests_to_complete_do (
1905
+ node_addr : SocketAddr ,
1906
+ proxy : RunningProxy ,
1907
+ ) -> RunningProxy {
1908
+ unsafe {
1909
+ let mut cluster_raw = cass_cluster_new ( ) ;
1910
+ let ip = node_addr. ip ( ) . to_string ( ) ;
1911
+ let ( c_ip, c_ip_len) = str_to_c_str_n ( ip. as_str ( ) ) ;
1912
+
1913
+ assert_cass_error_eq ! (
1914
+ cass_cluster_set_contact_points_n( cluster_raw. borrow_mut( ) , c_ip, c_ip_len) ,
1915
+ CassError :: CASS_OK
1916
+ ) ;
1917
+ let session_raw = cass_session_new ( ) ;
1918
+ cass_future_wait_check_and_free ( cass_session_connect (
1919
+ session_raw. borrow ( ) ,
1920
+ cluster_raw. borrow ( ) . into_c_const ( ) ,
1921
+ ) ) ;
1922
+
1923
+ tracing:: debug!( "Session connected, starting to execute requests..." ) ;
1924
+
1925
+ let statement = c"SELECT host_id FROM system.local WHERE key='local'" as * const CStr
1926
+ as * const c_char ;
1927
+ let statement_raw = cass_statement_new ( statement, 0 ) ;
1928
+
1929
+ let mut batch_raw = cass_batch_new ( CassBatchType :: CASS_BATCH_TYPE_LOGGED ) ;
1930
+ // This batch is obviously invalid, because it contains a SELECT statement. This is OK for us,
1931
+ // because we anyway expect the batch to fail. The goal is to have the future set, no matter if it's
1932
+ // set with a success or an error.
1933
+ cass_batch_add_statement ( batch_raw. borrow_mut ( ) , statement_raw. borrow ( ) ) ;
1934
+
1935
+ let finished_executions = AtomicUsize :: new ( 0 ) ;
1936
+ unsafe extern "C" fn finished_execution_callback (
1937
+ _future_raw : CassBorrowedSharedPtr < CassFuture , CMut > ,
1938
+ data : * mut c_void ,
1939
+ ) {
1940
+ let finished_executions = unsafe { & * ( data as * const AtomicUsize ) } ;
1941
+ finished_executions. fetch_add ( 1 , Ordering :: SeqCst ) ;
1942
+ }
1943
+
1944
+ const ITERATIONS : usize = 1 ;
1945
+ const EXECUTIONS : usize = 3 * ITERATIONS ; // One prepare, one statement and one batch per iteration.
1946
+
1947
+ let futures = ( 0 ..ITERATIONS )
1948
+ . flat_map ( |_| {
1949
+ // Prepare a statement
1950
+ let prepare_fut = cass_session_prepare ( session_raw. borrow ( ) , statement) ;
1951
+
1952
+ // Execute a statement
1953
+ let statement_fut = cass_session_execute (
1954
+ session_raw. borrow ( ) ,
1955
+ statement_raw. borrow ( ) . into_c_const ( ) ,
1956
+ ) ;
1957
+
1958
+ // Execute a batch
1959
+ let batch_fut = cass_session_execute_batch (
1960
+ session_raw. borrow ( ) ,
1961
+ batch_raw. borrow ( ) . into_c_const ( ) ,
1962
+ ) ;
1963
+ for fut in [
1964
+ prepare_fut. borrow ( ) ,
1965
+ statement_fut. borrow ( ) ,
1966
+ batch_fut. borrow ( ) ,
1967
+ ] {
1968
+ cass_future_set_callback (
1969
+ fut,
1970
+ Some ( finished_execution_callback) ,
1971
+ std:: ptr:: addr_of!( finished_executions) as _ ,
1972
+ ) ;
1973
+ }
1974
+
1975
+ [ prepare_fut, statement_fut, batch_fut]
1976
+ } )
1977
+ . collect :: < Vec < _ > > ( ) ;
1978
+
1979
+ tracing:: debug!( "Started all requests. Now, freeing statements and session..." ) ;
1980
+
1981
+ // Free the statement
1982
+ cass_statement_free ( statement_raw) ;
1983
+ // Free the batch
1984
+ cass_batch_free ( batch_raw) ;
1985
+
1986
+ // Session is freed, but the requests may still be in-flight.
1987
+ cass_session_free ( session_raw) ;
1988
+
1989
+ tracing:: debug!( "Session freed." ) ;
1990
+
1991
+ // Assert that the session awaited completion of all requests.
1992
+ let actually_finished_executions = finished_executions. load ( Ordering :: SeqCst ) ;
1993
+ assert_eq ! (
1994
+ actually_finished_executions, EXECUTIONS ,
1995
+ "Expected {} requests to complete before the session was freed, but only {} did." ,
1996
+ EXECUTIONS , actually_finished_executions
1997
+ ) ;
1998
+
1999
+ futures. into_iter ( ) . for_each ( |fut| {
2000
+ // As per cassandra.h, "a future can be freed anytime".
2001
+ cass_future_free ( fut) ;
2002
+ } ) ;
2003
+
2004
+ cass_cluster_free ( cluster_raw) ;
2005
+ }
2006
+ proxy
2007
+ }
1888
2008
}
0 commit comments