11#[ cfg( test) ]
22mod integration {
3+ use anyhow:: Result ;
34 use dotenv:: dotenv;
5+ use scopeguard;
46 use serial_test:: serial;
5- use std:: time:: Duration ;
7+ use std:: sync:: Arc ;
8+ use std:: time:: { Duration , Instant } ;
69 use timefusion:: database:: Database ;
7- use tokio:: sync:: oneshot;
8- use tokio:: time:: sleep;
9- use tokio_postgres:: NoTls ;
10+ use tokio:: { sync:: Notify , time:: sleep} ;
11+ use tokio_postgres:: { Client , NoTls } ;
1012 use tokio_util:: sync:: CancellationToken ;
1113 use uuid:: Uuid ;
1214
13- async fn start_test_server ( ) -> anyhow:: Result < ( oneshot:: Sender < ( ) > , String ) > {
15+ async fn connect_with_retry ( timeout : Duration ) -> Result < ( Client , tokio:: task:: JoinHandle < ( ) > ) , tokio_postgres:: Error > {
16+ let start = Instant :: now ( ) ;
17+ let conn_string = "host=localhost port=5433 user=postgres password=postgres" ;
18+
19+ while start. elapsed ( ) < timeout {
20+ match tokio_postgres:: connect ( conn_string, NoTls ) . await {
21+ Ok ( ( client, connection) ) => {
22+ let handle = tokio:: spawn ( async move {
23+ if let Err ( e) = connection. await {
24+ eprintln ! ( "Connection error: {}" , e) ;
25+ }
26+ } ) ;
27+ return Ok ( ( client, handle) ) ;
28+ } ,
29+ Err ( _) => sleep ( Duration :: from_millis ( 100 ) ) . await ,
30+ }
31+ }
32+
33+ // Final attempt
34+ let ( client, connection) = tokio_postgres:: connect ( conn_string, NoTls ) . await ?;
35+ let handle = tokio:: spawn ( async move {
36+ if let Err ( e) = connection. await {
37+ eprintln ! ( "Connection error: {}" , e) ;
38+ }
39+ } ) ;
40+
41+ Ok ( ( client, handle) )
42+ }
43+
44+ async fn start_test_server ( ) -> Result < ( Arc < Notify > , String ) > {
1445 let test_id = Uuid :: new_v4 ( ) . to_string ( ) ;
1546 let _ = env_logger:: builder ( ) . is_test ( true ) . try_init ( ) ;
1647 dotenv ( ) . ok ( ) ;
@@ -20,186 +51,112 @@ mod integration {
2051 std:: env:: set_var ( "TIMEFUSION_TABLE_PREFIX" , format ! ( "test-{}" , test_id) ) ;
2152 }
2253
23- let ( shutdown_tx, shutdown_rx) = oneshot:: channel ( ) ;
54+ // Use a shareable notification
55+ let shutdown_signal = Arc :: new ( Notify :: new ( ) ) ;
56+ let shutdown_signal_clone = shutdown_signal. clone ( ) ;
2457
2558 tokio:: spawn ( async move {
26- let db = match Database :: new ( ) . await {
27- Ok ( db) => db,
28- Err ( e) => {
29- panic ! ( "Failed to create database: {:?}" , e) ;
30- }
31- } ;
32-
59+ let db = Database :: new ( ) . await . expect ( "Failed to create database" ) ;
3360 let session_context = db. create_session_context ( ) ;
3461 db. setup_session_context ( & session_context) . expect ( "Failed to setup session context" ) ;
3562
3663 let shutdown_token = CancellationToken :: new ( ) ;
3764 let pg_server = db
38- . start_pgwire_server (
39- session_context,
40- 5433 ,
41- shutdown_token. clone ( ) ,
42- )
65+ . start_pgwire_server ( session_context, 5433 , shutdown_token. clone ( ) )
4366 . await
4467 . expect ( "Failed to start PGWire server" ) ;
4568
46- let _ = shutdown_rx. await ;
69+ // Wait for shutdown signal
70+ shutdown_signal_clone. notified ( ) . await ;
4771 shutdown_token. cancel ( ) ;
4872 let _ = pg_server. await ;
4973 } ) ;
5074
51- // Give server time to start with retry approach
52- let start_time = std:: time:: Instant :: now ( ) ;
53- let timeout = Duration :: from_secs ( 5 ) ;
75+ // Wait for server to be ready
76+ let _ = connect_with_retry ( Duration :: from_secs ( 5 ) ) . await ?;
5477
55- // Try a test connection to make sure server is up
56- while start_time. elapsed ( ) < timeout {
57- match tokio_postgres:: connect (
58- "host=localhost port=5433 user=postgres password=postgres" ,
59- NoTls
60- ) . await {
61- Ok ( ( _client, connection) ) => {
62- // Connection succeeded, spawn a task to drive it and return
63- tokio:: spawn ( async move {
64- if let Err ( e) = connection. await {
65- eprintln ! ( "Test connection error: {}" , e) ;
66- }
67- } ) ;
68-
69- // Small delay before we release the client
70- sleep ( Duration :: from_millis ( 100 ) ) . await ;
71-
72- // Server is ready
73- break ;
74- } ,
75- Err ( _) => {
76- // Small backoff between retry attempts
77- sleep ( Duration :: from_millis ( 100 ) ) . await ;
78- }
79- }
80- }
81-
82- Ok ( ( shutdown_tx, test_id) )
78+ Ok ( ( shutdown_signal, test_id) )
8379 }
8480
8581 #[ tokio:: test]
8682 #[ serial]
87- async fn test_postgres_integration ( ) -> anyhow:: Result < ( ) > {
88- let ( shutdown_tx, test_id) = start_test_server ( ) . await ?;
83+ async fn test_postgres_integration ( ) -> Result < ( ) > {
84+ let ( shutdown_signal, test_id) = start_test_server ( ) . await ?;
85+ let shutdown = || { shutdown_signal. notify_one ( ) ; } ;
86+
87+ // Use a guard to ensure we notify of shutdown even if the test panics
88+ let shutdown_guard = scopeguard:: guard ( ( ) , |_| shutdown ( ) ) ;
8989
9090 // Connect to database
91- let connection_string = "host=localhost port=5433 user=postgres password=postgres" ;
92- let ( client, connection) = match tokio_postgres:: connect ( connection_string, NoTls ) . await {
93- Ok ( ( client, connection) ) => ( client, connection) ,
94- Err ( e) => {
95- shutdown_tx. send ( ( ) ) . ok ( ) ;
96- return Err ( anyhow:: anyhow!( "Failed to connect to PostgreSQL: {:?}" , e) ) ;
97- }
98- } ;
99-
100- tokio:: spawn ( async move {
101- if let Err ( e) = connection. await {
102- eprintln ! ( "Connection error: {}" , e) ;
103- }
104- } ) ;
105-
106- // Allow connection to stabilize
107- sleep ( Duration :: from_millis ( 500 ) ) . await ;
91+ let ( client, _) = connect_with_retry ( Duration :: from_secs ( 3 ) ) . await
92+ . map_err ( |e| anyhow:: anyhow!( "Failed to connect to PostgreSQL: {}" , e) ) ?;
10893
10994 // Insert test data
11095 let timestamp_str = format ! ( "'{}'" , chrono:: Utc :: now( ) . format( "%Y-%m-%d %H:%M:%S" ) ) ;
111-
112- match client
113- . execute (
114- format ! (
115- "INSERT INTO otel_logs_and_spans (
116- project_id, timestamp, id,
117- name, status_code, status_message,
118- level
119- ) VALUES ($1, {}, $2, $3, $4, $5, $6)" ,
120- timestamp_str
121- )
122- . as_str ( ) ,
96+ let insert_query = format ! (
97+ "INSERT INTO otel_logs_and_spans (project_id, timestamp, id, name, status_code, status_message, level)
98+ VALUES ($1, {}, $2, $3, $4, $5, $6)" ,
99+ timestamp_str
100+ ) ;
101+
102+ // Run the test with proper error handling
103+ let result = async {
104+ // Insert initial record
105+ client. execute (
106+ & insert_query,
123107 & [ & "test_project" , & test_id, & "test_span_name" , & "OK" , & "Test integration" , & "INFO" ] ,
124- )
125- . await
126- {
127- Ok ( _) => ( ) ,
128- Err ( e) => {
129- shutdown_tx. send ( ( ) ) . ok ( ) ;
130- return Err ( anyhow:: anyhow!( "Failed to insert test data: {:?}" , e) ) ;
131- }
132- }
133-
134- // Query back the data and check with the client directly
135- let rows = match client. query ( "SELECT COUNT(*) FROM otel_logs_and_spans WHERE id = $1" , & [ & test_id] ) . await {
136- Ok ( rows) => rows,
137- Err ( e) => {
138- shutdown_tx. send ( ( ) ) . ok ( ) ;
139- return Err ( anyhow:: anyhow!( "Failed to query data: {:?}" , e) ) ;
108+ ) . await ?;
109+
110+ // Verify record count
111+ let rows = client. query (
112+ "SELECT COUNT(*) FROM otel_logs_and_spans WHERE id = $1" ,
113+ & [ & test_id]
114+ ) . await ?;
115+
116+ assert_eq ! ( rows[ 0 ] . get:: <_, i64 >( 0 ) , 1 , "Should have found exactly one row" ) ;
117+
118+ // Verify field values
119+ let detail_rows = client. query (
120+ "SELECT name, status_code FROM otel_logs_and_spans WHERE id = $1" ,
121+ & [ & test_id]
122+ ) . await ?;
123+
124+ assert_eq ! ( detail_rows. len( ) , 1 , "Should have found exactly one detailed row" ) ;
125+ assert_eq ! ( detail_rows[ 0 ] . get:: <_, String >( 0 ) , "test_span_name" , "Name should match" ) ;
126+ assert_eq ! ( detail_rows[ 0 ] . get:: <_, String >( 1 ) , "OK" , "Status code should match" ) ;
127+
128+ // Insert multiple records in a batch
129+ for i in 0 ..5 {
130+ let span_id = Uuid :: new_v4 ( ) . to_string ( ) ;
131+ client. execute (
132+ & insert_query,
133+ & [
134+ & "test_project" ,
135+ & span_id,
136+ & format ! ( "batch_span_{}" , i) ,
137+ & "OK" ,
138+ & format ! ( "Batch test {}" , i) ,
139+ & "INFO"
140+ ] ,
141+ ) . await ?;
140142 }
141- } ;
142143
143- assert_eq ! ( rows[ 0 ] . get:: <_, i64 >( 0 ) , 1 , "Should have found exactly one row" ) ;
144+ // Query with filter to get total count
145+ let count_rows = client. query (
146+ "SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = $1" ,
147+ & [ & "test_project" ]
148+ ) . await ?;
144149
145- // Verify field values
146- let detail_rows = match client. query ( "SELECT name, status_code FROM otel_logs_and_spans WHERE id = $1" , & [ & test_id] ) . await {
147- Ok ( rows) => rows,
148- Err ( e) => {
149- shutdown_tx. send ( ( ) ) . ok ( ) ;
150- return Err ( anyhow:: anyhow!( "Failed to query detailed data: {:?}" , e) ) ;
151- }
152- } ;
153-
154- assert_eq ! ( detail_rows. len( ) , 1 , "Should have found exactly one detailed row" ) ;
155- assert_eq ! ( detail_rows[ 0 ] . get:: <_, String >( 0 ) , "test_span_name" , "Name should match" ) ;
156- assert_eq ! ( detail_rows[ 0 ] . get:: <_, String >( 1 ) , "OK" , "Status code should match" ) ;
157-
158- // Insert multiple records
159- for i in 0 ..5 {
160- let span_id = Uuid :: new_v4 ( ) . to_string ( ) ;
161- let batch_name = format ! ( "batch_span_{}" , i) ;
162- let batch_message = format ! ( "Batch test {}" , i) ;
163-
164- match client
165- . execute (
166- format ! (
167- "INSERT INTO otel_logs_and_spans (
168- project_id, timestamp, id,
169- name, status_code, status_message,
170- level
171- ) VALUES ($1, {}, $2, $3, $4, $5, $6)" ,
172- timestamp_str
173- )
174- . as_str ( ) ,
175- & [ & "test_project" , & span_id, & batch_name, & "OK" , & batch_message, & "INFO" ] ,
176- )
177- . await
178- {
179- Ok ( _) => ( ) ,
180- Err ( e) => {
181- shutdown_tx. send ( ( ) ) . ok ( ) ;
182- return Err ( anyhow:: anyhow!( "Failed to insert batch record: {:?}" , e) ) ;
183- }
184- } ;
185- }
186-
187- // Give a little time for all inserts to complete
188- sleep ( Duration :: from_millis ( 500 ) ) . await ;
189-
190- // Query with filter to get total count
191- let count_rows = match client. query ( "SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = $1" , & [ & "test_project" ] ) . await {
192- Ok ( rows) => rows,
193- Err ( e) => {
194- shutdown_tx. send ( ( ) ) . ok ( ) ;
195- return Err ( anyhow:: anyhow!( "Failed to query total count: {:?}" , e) ) ;
196- }
197- } ;
198-
199- assert_eq ! ( count_rows[ 0 ] . get:: <_, i64 >( 0 ) , 6 , "Should have a total of 6 records (1 initial + 5 batch)" ) ;
200-
201- let _ = shutdown_tx. send ( ( ) ) ;
202-
203- Ok ( ( ) )
150+ assert_eq ! ( count_rows[ 0 ] . get:: <_, i64 >( 0 ) , 6 , "Should have a total of 6 records (1 initial + 5 batch)" ) ;
151+
152+ Ok :: < _ , tokio_postgres:: Error > ( ( ) )
153+ } . await ;
154+
155+ // Drop the guard to ensure shutdown happens
156+ std:: mem:: drop ( shutdown_guard) ;
157+ shutdown ( ) ;
158+
159+ // Map postgres errors to anyhow
160+ result. map_err ( |e| anyhow:: anyhow!( "Test failed: {}" , e) )
204161 }
205162}
0 commit comments