22mod integration {
33 use anyhow:: Result ;
44 use dotenv:: dotenv;
5+ use rand:: Rng ;
56 use scopeguard;
67 use serial_test:: serial;
7- use std:: sync:: Arc ;
8+ use std:: sync:: { Arc , Mutex } ;
89 use std:: time:: { Duration , Instant } ;
910 use timefusion:: database:: Database ;
1011 use tokio:: { sync:: Notify , time:: sleep} ;
1112 use tokio_postgres:: { Client , NoTls } ;
1213 use tokio_util:: sync:: CancellationToken ;
1314 use uuid:: Uuid ;
15+ use std:: collections:: HashSet ;
1416
15- async fn connect_with_retry ( timeout : Duration ) -> Result < ( Client , tokio:: task:: JoinHandle < ( ) > ) , tokio_postgres:: Error > {
17+ async fn connect_with_retry ( port : u16 , timeout : Duration ) -> Result < ( Client , tokio:: task:: JoinHandle < ( ) > ) , tokio_postgres:: Error > {
1618 let start = Instant :: now ( ) ;
17- let conn_string = "host=localhost port=5433 user=postgres password=postgres" ;
19+ let conn_string = format ! ( "host=localhost port={port} user=postgres password=postgres" ) ;
1820
1921 while start. elapsed ( ) < timeout {
20- match tokio_postgres:: connect ( conn_string, NoTls ) . await {
22+ match tokio_postgres:: connect ( & conn_string, NoTls ) . await {
2123 Ok ( ( client, connection) ) => {
2224 let handle = tokio:: spawn ( async move {
2325 if let Err ( e) = connection. await {
@@ -31,7 +33,7 @@ mod integration {
3133 }
3234
3335 // Final attempt
34- let ( client, connection) = tokio_postgres:: connect ( conn_string, NoTls ) . await ?;
36+ let ( client, connection) = tokio_postgres:: connect ( & conn_string, NoTls ) . await ?;
3537 let handle = tokio:: spawn ( async move {
3638 if let Err ( e) = connection. await {
3739 eprintln ! ( "Connection error: {}" , e) ;
@@ -41,13 +43,17 @@ mod integration {
4143 Ok ( ( client, handle) )
4244 }
4345
44- async fn start_test_server ( ) -> Result < ( Arc < Notify > , String ) > {
46+ async fn start_test_server ( ) -> Result < ( Arc < Notify > , String , u16 ) > {
4547 let test_id = Uuid :: new_v4 ( ) . to_string ( ) ;
4648 let _ = env_logger:: builder ( ) . is_test ( true ) . try_init ( ) ;
4749 dotenv ( ) . ok ( ) ;
4850
51+ // Use a different port for each test to avoid conflicts
52+ let mut rng = rand:: thread_rng ( ) ;
53+ let port = 5433 + ( rng. gen_range ( 1 ..100 ) as u16 ) ;
54+
4955 unsafe {
50- std:: env:: set_var ( "PGWIRE_PORT" , "5433" ) ;
56+ std:: env:: set_var ( "PGWIRE_PORT" , & port . to_string ( ) ) ;
5157 std:: env:: set_var ( "TIMEFUSION_TABLE_PREFIX" , format ! ( "test-{}" , test_id) ) ;
5258 }
5359
@@ -60,9 +66,14 @@ mod integration {
6066 let session_context = db. create_session_context ( ) ;
6167 db. setup_session_context ( & session_context) . expect ( "Failed to setup session context" ) ;
6268
69+ let port = std:: env:: var ( "PGWIRE_PORT" )
70+ . expect ( "PGWIRE_PORT not set" )
71+ . parse :: < u16 > ( )
72+ . expect ( "Invalid PGWIRE_PORT" ) ;
73+
6374 let shutdown_token = CancellationToken :: new ( ) ;
6475 let pg_server = db
65- . start_pgwire_server ( session_context, 5433 , shutdown_token. clone ( ) )
76+ . start_pgwire_server ( session_context, port , shutdown_token. clone ( ) )
6677 . await
6778 . expect ( "Failed to start PGWire server" ) ;
6879
@@ -72,23 +83,29 @@ mod integration {
7283 let _ = pg_server. await ;
7384 } ) ;
7485
86+ // Get the port number we set
87+ let port = std:: env:: var ( "PGWIRE_PORT" )
88+ . expect ( "PGWIRE_PORT not set" )
89+ . parse :: < u16 > ( )
90+ . expect ( "Invalid PGWIRE_PORT" ) ;
91+
7592 // Wait for server to be ready
76- let _ = connect_with_retry ( Duration :: from_secs ( 5 ) ) . await ?;
93+ let _ = connect_with_retry ( port , Duration :: from_secs ( 5 ) ) . await ?;
7794
78- Ok ( ( shutdown_signal, test_id) )
95+ Ok ( ( shutdown_signal, test_id, port ) )
7996 }
8097
8198 #[ tokio:: test]
8299 #[ serial]
83100 async fn test_postgres_integration ( ) -> Result < ( ) > {
84- let ( shutdown_signal, test_id) = start_test_server ( ) . await ?;
101+ let ( shutdown_signal, test_id, port ) = start_test_server ( ) . await ?;
85102 let shutdown = || { shutdown_signal. notify_one ( ) ; } ;
86103
87104 // Use a guard to ensure we notify of shutdown even if the test panics
88105 let shutdown_guard = scopeguard:: guard ( ( ) , |_| shutdown ( ) ) ;
89106
90107 // Connect to database
91- let ( client, _) = connect_with_retry ( Duration :: from_secs ( 3 ) ) . await
108+ let ( client, _) = connect_with_retry ( port , Duration :: from_secs ( 3 ) ) . await
92109 . map_err ( |e| anyhow:: anyhow!( "Failed to connect to PostgreSQL: {}" , e) ) ?;
93110
94111 // Insert test data
@@ -159,4 +176,218 @@ mod integration {
159176 // Map postgres errors to anyhow
160177 result. map_err ( |e| anyhow:: anyhow!( "Test failed: {}" , e) )
161178 }
179+
180+ #[ tokio:: test]
181+ #[ serial]
182+ async fn test_concurrent_postgres_requests ( ) -> Result < ( ) > {
183+ // Start test server
184+ let ( shutdown_signal, test_id, port) = start_test_server ( ) . await ?;
185+ let shutdown = || { shutdown_signal. notify_one ( ) ; } ;
186+
187+ // Use a guard to ensure we notify of shutdown even if the test panics
188+ let shutdown_guard = scopeguard:: guard ( ( ) , |_| shutdown ( ) ) ;
189+
190+ // Number of concurrent clients
191+ let num_clients = 5 ;
192+ // Number of operations per client
193+ let ops_per_client = 10 ;
194+
195+ println ! ( "Creating {} client connections" , num_clients) ;
196+
197+ // Shared set to track all inserted IDs
198+ let inserted_ids = Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ;
199+
200+ // Create timestamp for the insert query
201+ let timestamp_str = format ! ( "'{}'" , chrono:: Utc :: now( ) . format( "%Y-%m-%d %H:%M:%S" ) ) ;
202+ let insert_query = format ! (
203+ "INSERT INTO otel_logs_and_spans (project_id, timestamp, id, name, status_code, status_message, level)
204+ VALUES ($1, {}, $2, $3, $4, $5, $6)" ,
205+ timestamp_str
206+ ) ;
207+
208+ // Spawn tasks for each client to execute operations concurrently
209+ let mut handles = Vec :: with_capacity ( num_clients) ;
210+
211+ for i in 0 ..num_clients {
212+ // Create a new client connection for each task
213+ let ( client, _) = connect_with_retry ( port, Duration :: from_secs ( 3 ) ) . await
214+ . map_err ( |e| anyhow:: anyhow!( "Failed to connect to PostgreSQL: {}" , e) ) ?;
215+
216+ let insert_query = insert_query. clone ( ) ;
217+ let inserted_ids_clone = Arc :: clone ( & inserted_ids) ;
218+ let test_id_prefix = format ! ( "{}-client-{}" , test_id, i) ;
219+
220+ // Create a task for each client
221+ let handle = tokio:: spawn ( async move {
222+ let mut client_ids = HashSet :: new ( ) ;
223+
224+ // Perform multiple operations per client
225+ for j in 0 ..ops_per_client {
226+ // Generate a unique ID for this operation
227+ let span_id = format ! ( "{}-op-{}" , test_id_prefix, j) ;
228+
229+ // Insert a record
230+ println ! ( "Client {} executing operation {}" , i, j) ;
231+ let start = Instant :: now ( ) ;
232+ client. execute (
233+ & insert_query,
234+ & [
235+ & "test_project" ,
236+ & span_id,
237+ & format ! ( "concurrent_span_client_{}_op_{}" , i, j) ,
238+ & "OK" ,
239+ & format ! ( "Concurrent test client {} op {}" , i, j) ,
240+ & "INFO"
241+ ] ,
242+ ) . await . expect ( "Insert should succeed" ) ;
243+ println ! ( "Client {} operation {} completed in {:?}" , i, j, start. elapsed( ) ) ;
244+
245+ // Add the ID to the client's set
246+ client_ids. insert ( span_id) ;
247+
248+ // Randomly perform queries to simulate mixed workload
249+ if j % 3 == 0 {
250+ let _query_result = client. query (
251+ "SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = $1" ,
252+ & [ & "test_project" ]
253+ ) . await . expect ( "Query should succeed" ) ;
254+ }
255+
256+ if j % 5 == 0 {
257+ // Use explicit concatenation for LIKE patterns since some PG implementations
258+ // don't handle parameter binding with % correctly
259+ let _detail_rows = client. query (
260+ & format ! ( "SELECT name, status_code FROM otel_logs_and_spans WHERE id LIKE '{test_id_prefix}%'" ) ,
261+ & [ ]
262+ ) . await . expect ( "Query should succeed" ) ;
263+ }
264+ }
265+
266+ // Rather than returning IDs, add them to shared collection
267+ let mut ids = inserted_ids_clone. lock ( ) . unwrap ( ) ;
268+ ids. extend ( client_ids) ;
269+ // Return nothing specific
270+ ( )
271+ } ) ;
272+
273+ handles. push ( handle) ;
274+ }
275+
276+ // Wait for all tasks to complete
277+ for handle in handles {
278+ let _ = handle. await . expect ( "Task should complete successfully" ) ;
279+ }
280+
281+ // Verify all records were inserted correctly
282+ let ( client, _) = connect_with_retry ( port, Duration :: from_secs ( 3 ) ) . await
283+ . map_err ( |e| anyhow:: anyhow!( "Failed to connect to PostgreSQL: {}" , e) ) ?;
284+
285+ // Get total count of inserted records
286+ let count_rows = client. query (
287+ & format ! ( "SELECT COUNT(*) FROM otel_logs_and_spans WHERE id LIKE '{test_id}%'" ) ,
288+ & [ ]
289+ ) . await . map_err ( |e| anyhow:: anyhow!( "Query failed: {}" , e) ) ?;
290+
291+ let count = count_rows[ 0 ] . get :: < _ , i64 > ( 0 ) ;
292+ let expected_count = ( num_clients * ops_per_client) as i64 ;
293+
294+ println ! ( "Total records found: {} (expected {})" , count, expected_count) ;
295+ assert_eq ! ( count, expected_count, "Should have inserted the expected number of records" ) ;
296+
297+ // Get and verify inserted IDs
298+ let id_rows = client. query (
299+ & format ! ( "SELECT id FROM otel_logs_and_spans WHERE id LIKE '{test_id}%'" ) ,
300+ & [ ]
301+ ) . await . map_err ( |e| anyhow:: anyhow!( "Query failed: {}" , e) ) ?;
302+
303+ let mut db_ids = HashSet :: new ( ) ;
304+ for row in id_rows {
305+ db_ids. insert ( row. get :: < _ , String > ( 0 ) ) ;
306+ }
307+
308+ // Verify all expected IDs were found
309+ let ids = inserted_ids. lock ( ) . unwrap ( ) ;
310+ let missing_ids: Vec < _ > = ids. difference ( & db_ids) . collect ( ) ;
311+ let unexpected_ids: Vec < _ > = db_ids. difference ( & ids) . collect ( ) ;
312+
313+ assert ! ( missing_ids. is_empty( ) , "Expected all IDs to be found, missing: {:?}" , missing_ids) ;
314+ assert ! ( unexpected_ids. is_empty( ) , "Found unexpected IDs: {:?}" , unexpected_ids) ;
315+
316+ // Measure read performance with concurrent queries
317+ let num_query_clients = 3 ;
318+ let queries_per_client = 5 ;
319+
320+ let mut query_handles = Vec :: with_capacity ( num_query_clients) ;
321+ let query_times = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
322+
323+ for _i in 0 ..num_query_clients {
324+ let ( client, _) = connect_with_retry ( port, Duration :: from_secs ( 3 ) ) . await
325+ . map_err ( |e| anyhow:: anyhow!( "Failed to connect to PostgreSQL: {}" , e) ) ?;
326+
327+ let test_id = test_id. clone ( ) ;
328+ let query_times = Arc :: clone ( & query_times) ;
329+
330+ let handle = tokio:: spawn ( async move {
331+ let start = Instant :: now ( ) ;
332+
333+ for j in 0 ..queries_per_client {
334+ // Mix different query types
335+ match j % 3 {
336+ 0 => {
337+ // Count query
338+ let _ = client. query (
339+ "SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = $1" ,
340+ & [ & "test_project" ]
341+ ) . await . expect ( "Query should succeed" ) ;
342+ } ,
343+ 1 => {
344+ // Filter query
345+ let _ = client. query (
346+ & format ! ( "SELECT name, status_code FROM otel_logs_and_spans WHERE id LIKE '{test_id}%' LIMIT 10" ) ,
347+ & [ ]
348+ ) . await . expect ( "Query should succeed" ) ;
349+ } ,
350+ _ => {
351+ // Aggregate query
352+ let _ = client. query (
353+ "SELECT status_code, COUNT(*) FROM otel_logs_and_spans GROUP BY status_code" ,
354+ & [ ]
355+ ) . await . expect ( "Query should succeed" ) ;
356+ }
357+ }
358+ }
359+
360+ // Store elapsed time in shared collection
361+ let elapsed = start. elapsed ( ) ;
362+ let mut times = query_times. lock ( ) . unwrap ( ) ;
363+ times. push ( elapsed) ;
364+
365+ // Return nothing
366+ ( )
367+ } ) ;
368+
369+ query_handles. push ( handle) ;
370+ }
371+
372+ // Wait for all query tasks to complete
373+ for handle in query_handles {
374+ let _ = handle. await . expect ( "Task should complete successfully" ) ;
375+ }
376+
377+ // Calculate average query time
378+ let times = query_times. lock ( ) . unwrap ( ) ;
379+ let total_time: Duration = times. iter ( ) . sum ( ) ;
380+ let avg_time = if times. is_empty ( ) {
381+ Duration :: new ( 0 , 0 )
382+ } else {
383+ total_time / times. len ( ) as u32
384+ } ;
385+ println ! ( "Average query execution time per client: {:?}" , avg_time) ;
386+
387+ // Clean up
388+ std:: mem:: drop ( shutdown_guard) ;
389+ shutdown ( ) ;
390+
391+ Ok ( ( ) )
392+ }
162393}
0 commit comments