1- use std:: collections:: HashMap ;
2- use std:: sync:: Arc ;
3- use tokio:: sync:: RwLock ;
1+ use std:: { collections:: HashMap , sync:: Arc } ;
42
53use anyhow:: Result ;
64use datafusion:: {
@@ -10,16 +8,15 @@ use datafusion::{
108 record_batch:: RecordBatch ,
119 } ,
1210 execution:: context:: SessionContext ,
13- prelude:: * ,
1411} ;
15- use deltalake:: storage:: StorageOptions ;
16- use deltalake :: { DeltaOps , DeltaTable , DeltaTableBuilder } ;
12+ use deltalake:: { storage:: StorageOptions , DeltaOps , DeltaTable , DeltaTableBuilder } ;
13+ use tokio :: sync :: RwLock ;
1714type ProjectConfig = ( String , StorageOptions , Arc < RwLock < DeltaTable > > ) ;
1815
1916pub type ProjectConfigs = Arc < RwLock < HashMap < String , ProjectConfig > > > ;
2017
2118pub struct Database {
22- pub ctx : SessionContext ,
19+ pub ctx : SessionContext ,
2320 project_configs : ProjectConfigs ,
2421}
2522
@@ -34,14 +31,7 @@ impl Database {
3431 . with_storage_options ( default_options. 0 . clone ( ) )
3532 . build ( ) ?;
3633 ctx. register_table ( "otel_logs_and_spans" , Arc :: new ( table. clone ( ) ) ) ?;
37- project_configs. insert (
38- "default" . to_string ( ) ,
39- (
40- storage_uri. to_string ( ) ,
41- default_options,
42- Arc :: new ( RwLock :: new ( table) ) ,
43- ) ,
44- ) ;
34+ project_configs. insert ( "default" . to_string ( ) , ( storage_uri. to_string ( ) , default_options, Arc :: new ( RwLock :: new ( table) ) ) ) ;
4535
4636 Ok ( Self {
4737 ctx,
@@ -70,16 +60,8 @@ impl Database {
7060 Schema :: new ( vec ! [
7161 Field :: new( "traceId" , DataType :: Utf8 , false ) ,
7262 Field :: new( "spanId" , DataType :: Utf8 , false ) ,
73- Field :: new(
74- "startTimeUnixNano" ,
75- DataType :: Timestamp ( TimeUnit :: Nanosecond , Some ( Arc :: from( "UTC" ) ) ) ,
76- false ,
77- ) ,
78- Field :: new(
79- "endTimeUnixNano" ,
80- DataType :: Timestamp ( TimeUnit :: Nanosecond , Some ( Arc :: from( "UTC" ) ) ) ,
81- true ,
82- ) ,
63+ Field :: new( "startTimeUnixNano" , DataType :: Timestamp ( TimeUnit :: Nanosecond , Some ( Arc :: from( "UTC" ) ) ) , false ) ,
64+ Field :: new( "endTimeUnixNano" , DataType :: Timestamp ( TimeUnit :: Nanosecond , Some ( Arc :: from( "UTC" ) ) ) , true ) ,
8365 Field :: new( "name" , DataType :: Utf8 , false ) ,
8466 ] )
8567 }
@@ -91,19 +73,15 @@ impl Database {
9173 ) -> Result < ( ) > {
9274 let ( _conn_str, _options, table_ref) = {
9375 let configs = self . project_configs . read ( ) . await ;
94- configs
95- . get ( project_id)
96- . ok_or_else ( || anyhow:: anyhow!( "Project ID '{}' not found" , project_id) ) ?
97- . clone ( )
76+ configs. get ( project_id) . ok_or_else ( || anyhow:: anyhow!( "Project ID '{}' not found" , project_id) ) ?. clone ( )
9877 } ;
9978
10079 let arrays: Vec < ArrayRef > = vec ! [
101- Arc :: new( StringArray :: from( vec![ Some ( & record. traceId[ ..] ) ] ) ) ,
102- Arc :: new( StringArray :: from( vec![ Some ( & record. spanId[ ..] ) ] ) ) ,
103- Arc :: new( TimestampNanosecondArray :: from( vec![ record. startTimeUnixNano] ) ) ,
104- Arc :: new( TimestampNanosecondArray :: from( vec![
105- record. endTimeUnixNano. unwrap_or( 0 )
106- ] ) ) ,
80+ Arc :: new( StringArray :: from( vec![ Some ( & record. context___trace_id[ ..] ) ] ) ) ,
81+ Arc :: new( StringArray :: from( vec![ Some ( & record. context___span_id[ ..] ) ] ) ) ,
82+ Arc :: new( TimestampNanosecondArray :: from( vec![ record. timestamp] ) ) ,
83+ Arc :: new( TimestampNanosecondArray :: from( vec![ record. start_time] ) ) ,
84+ Arc :: new( TimestampNanosecondArray :: from( vec![ record. end_time. unwrap_or( 0 ) ] ) ) ,
10785 Arc :: new( StringArray :: from( vec![ Some ( & record. name[ ..] ) ] ) ) ,
10886 ] ;
10987
@@ -130,20 +108,16 @@ impl Database {
130108 storage_options. 0 . insert ( "AWS_SECRET_ACCESS_KEY" . to_string ( ) , secret_key. to_string ( ) ) ;
131109 storage_options. 0 . insert ( "AWS_ENDPOINT" . to_string ( ) , endpoint. to_string ( ) ) ;
132110 storage_options. 0 . insert ( "AWS_ALLOW_HTTP" . to_string ( ) , "true" . to_string ( ) ) ;
133-
111+
134112 let table = DeltaTableBuilder :: from_uri ( & conn_str)
135113 . with_storage_options ( storage_options. 0 . clone ( ) )
136114 . with_allow_http ( true )
137115 . build ( ) ?;
138-
139- self . ctx
140- . register_table ( & format ! ( "otel_logs_and_spans_{}" , project_id) , Arc :: new ( table. clone ( ) ) ) ?;
141-
116+
117+ self . ctx . register_table ( & format ! ( "otel_logs_and_spans_{}" , project_id) , Arc :: new ( table. clone ( ) ) ) ?;
118+
142119 let mut configs = self . project_configs . write ( ) . await ;
143- configs. insert (
144- project_id. to_string ( ) ,
145- ( conn_str, storage_options, Arc :: new ( RwLock :: new ( table) ) ) ,
146- ) ;
120+ configs. insert ( project_id. to_string ( ) , ( conn_str, storage_options, Arc :: new ( RwLock :: new ( table) ) ) ) ;
147121 Ok ( ( ) )
148122 }
149- }
123+ }
0 commit comments