@@ -854,7 +854,8 @@ mod tests {
854
854
cass_statement_set_execution_profile_n,
855
855
} ,
856
856
future:: {
857
- cass_future_error_code, cass_future_error_message, cass_future_free, cass_future_wait,
857
+ cass_future_error_code, cass_future_error_message, cass_future_free,
858
+ cass_future_set_callback, cass_future_wait,
858
859
} ,
859
860
retry_policy:: {
860
861
CassRetryPolicy , cass_retry_policy_default_new, cass_retry_policy_fallthrough_new,
@@ -866,8 +867,10 @@ mod tests {
866
867
use std:: {
867
868
collections:: HashSet ,
868
869
convert:: { TryFrom , TryInto } ,
870
+ ffi:: { CStr , c_void} ,
869
871
iter,
870
872
net:: SocketAddr ,
873
+ sync:: atomic:: { AtomicUsize , Ordering } ,
871
874
} ;
872
875
873
876
// This is for convenient logs from failing tests. Just call it at the beginning of a test.
@@ -1849,4 +1852,130 @@ mod tests {
1849
1852
cass_cluster_free ( cluster_raw)
1850
1853
}
1851
1854
}
1855
+
1856
+ #[ tokio:: test]
1857
+ #[ ntest:: timeout( 5000 ) ]
1858
+ async fn session_free_waits_for_requests_to_complete ( ) {
1859
+ init_logger ( ) ;
1860
+ test_with_one_proxy_one (
1861
+ session_free_waits_for_requests_to_complete_do,
1862
+ handshake_rules ( )
1863
+ . into_iter ( )
1864
+ . chain ( std:: iter:: once ( RequestRule (
1865
+ Condition :: RequestOpcode ( RequestOpcode :: Query )
1866
+ . or ( Condition :: RequestOpcode ( RequestOpcode :: Prepare ) )
1867
+ . or ( Condition :: RequestOpcode ( RequestOpcode :: Batch ) ) ,
1868
+ // We won't respond to any queries (including metadata fetch),
1869
+ // but the driver will manage to continue with dummy metadata.
1870
+ RequestReaction :: forge ( ) . server_error ( ) ,
1871
+ ) ) ) ,
1872
+ )
1873
+ . with_current_subscriber ( )
1874
+ . await ;
1875
+ }
1876
+
1877
+ fn session_free_waits_for_requests_to_complete_do (
1878
+ node_addr : SocketAddr ,
1879
+ proxy : RunningProxy ,
1880
+ ) -> RunningProxy {
1881
+ unsafe {
1882
+ let mut cluster_raw = cass_cluster_new ( ) ;
1883
+ let ip = node_addr. ip ( ) . to_string ( ) ;
1884
+ let ( c_ip, c_ip_len) = str_to_c_str_n ( ip. as_str ( ) ) ;
1885
+
1886
+ assert_cass_error_eq ! (
1887
+ cass_cluster_set_contact_points_n( cluster_raw. borrow_mut( ) , c_ip, c_ip_len) ,
1888
+ CassError :: CASS_OK
1889
+ ) ;
1890
+ let session_raw = cass_session_new ( ) ;
1891
+ cass_future_wait_check_and_free ( cass_session_connect (
1892
+ session_raw. borrow ( ) ,
1893
+ cluster_raw. borrow ( ) . into_c_const ( ) ,
1894
+ ) ) ;
1895
+
1896
+ tracing:: debug!( "Session connected, starting to execute requests..." ) ;
1897
+
1898
+ let statement = c"SELECT host_id FROM system.local WHERE key='local'" as * const CStr
1899
+ as * const c_char ;
1900
+ let statement_raw = cass_statement_new ( statement, 0 ) ;
1901
+
1902
+ let mut batch_raw = cass_batch_new ( CassBatchType :: CASS_BATCH_TYPE_LOGGED ) ;
1903
+ // This batch is obviously invalid, because it contains a SELECT statement. This is OK for us,
1904
+ // because we anyway expect the batch to fail. The goal is to have the future set, no matter if it's
1905
+ // set with a success or an error.
1906
+ cass_batch_add_statement ( batch_raw. borrow_mut ( ) , statement_raw. borrow ( ) ) ;
1907
+
1908
+ let finished_executions = AtomicUsize :: new ( 0 ) ;
1909
+ unsafe extern "C" fn finished_execution_callback (
1910
+ _future_raw : CassBorrowedSharedPtr < CassFuture , CMut > ,
1911
+ data : * mut c_void ,
1912
+ ) {
1913
+ let finished_executions = unsafe { & * ( data as * const AtomicUsize ) } ;
1914
+ finished_executions. fetch_add ( 1 , Ordering :: SeqCst ) ;
1915
+ }
1916
+
1917
+ const ITERATIONS : usize = 1 ;
1918
+ const EXECUTIONS : usize = 3 * ITERATIONS ; // One prepare, one statement and one batch per iteration.
1919
+
1920
+ let futures = ( 0 ..ITERATIONS )
1921
+ . flat_map ( |_| {
1922
+ // Prepare a statement
1923
+ let prepare_fut = cass_session_prepare ( session_raw. borrow ( ) , statement) ;
1924
+
1925
+ // Execute a statement
1926
+ let statement_fut = cass_session_execute (
1927
+ session_raw. borrow ( ) ,
1928
+ statement_raw. borrow ( ) . into_c_const ( ) ,
1929
+ ) ;
1930
+
1931
+ // Execute a batch
1932
+ let batch_fut = cass_session_execute_batch (
1933
+ session_raw. borrow ( ) ,
1934
+ batch_raw. borrow ( ) . into_c_const ( ) ,
1935
+ ) ;
1936
+ for fut in [
1937
+ prepare_fut. borrow ( ) ,
1938
+ statement_fut. borrow ( ) ,
1939
+ batch_fut. borrow ( ) ,
1940
+ ] {
1941
+ cass_future_set_callback (
1942
+ fut,
1943
+ Some ( finished_execution_callback) ,
1944
+ std:: ptr:: addr_of!( finished_executions) as _ ,
1945
+ ) ;
1946
+ }
1947
+
1948
+ [ prepare_fut, statement_fut, batch_fut]
1949
+ } )
1950
+ . collect :: < Vec < _ > > ( ) ;
1951
+
1952
+ tracing:: debug!( "Started all requests. Now, freeing statements and session..." ) ;
1953
+
1954
+ // Free the statement
1955
+ cass_statement_free ( statement_raw) ;
1956
+ // Free the batch
1957
+ cass_batch_free ( batch_raw) ;
1958
+
1959
+ // Session is freed, but the requests may still be in-flight.
1960
+ cass_session_free ( session_raw) ;
1961
+
1962
+ tracing:: debug!( "Session freed." ) ;
1963
+
1964
+ // Assert that the session awaited completion of all requests.
1965
+ let actually_finished_executions = finished_executions. load ( Ordering :: SeqCst ) ;
1966
+ assert_eq ! (
1967
+ actually_finished_executions, EXECUTIONS ,
1968
+ "Expected {} requests to complete before the session was freed, but only {} did." ,
1969
+ EXECUTIONS , actually_finished_executions
1970
+ ) ;
1971
+
1972
+ futures. into_iter ( ) . for_each ( |fut| {
1973
+ // As per cassandra.h, "a future can be freed anytime".
1974
+ cass_future_free ( fut) ;
1975
+ } ) ;
1976
+
1977
+ cass_cluster_free ( cluster_raw) ;
1978
+ }
1979
+ proxy
1980
+ }
1852
1981
}
0 commit comments