11use std:: {
22 ops:: Deref ,
3+ str:: FromStr ,
34 sync:: Arc ,
45 time:: { Duration , Instant } ,
56} ;
@@ -8,7 +9,9 @@ use chrono::{DateTime, Utc};
89use parking_lot:: RwLock ;
910use tonic:: {
1011 codegen:: InterceptedService ,
12+ metadata:: { Ascii , MetadataKey , MetadataValue } ,
1113 transport:: { Channel , ClientTlsConfig , Endpoint , Uri } ,
14+ Status ,
1215} ;
1316
1417use crate :: {
@@ -90,12 +93,62 @@ impl LastError {
9093 }
9194}
9295
96+ type ParseCosmosGrpcResult =
97+ Result < ( String , Arc < [ ( MetadataKey < Ascii > , MetadataValue < Ascii > ) ] > ) , Status > ;
98+
99+ pub fn parse_cosmos_grpc ( value : & str ) -> ParseCosmosGrpcResult {
100+ let ( endpoint, raw_headers) = match value. split_once ( '#' ) {
101+ Some ( ( endpoint, headers) ) => ( endpoint. trim ( ) . to_string ( ) , Some ( headers) ) ,
102+ None => ( value. trim ( ) . to_string ( ) , None ) ,
103+ } ;
104+
105+ let headers = {
106+ let mut parsed = Vec :: new ( ) ;
107+ if let Some ( hdrs) = raw_headers {
108+ for pair in hdrs. split ( ';' ) . filter ( |s| !s. trim ( ) . is_empty ( ) ) {
109+ let ( key, val) = pair. split_once ( '=' ) . ok_or_else ( || {
110+ Status :: invalid_argument ( format ! ( "Malformed header: '{}'" , pair) )
111+ } ) ?;
112+
113+ let key = MetadataKey :: from_bytes ( key. trim ( ) . as_bytes ( ) ) . map_err ( |_| {
114+ Status :: invalid_argument ( format ! ( "Invalid header key '{}'" , key) )
115+ } ) ?;
116+
117+ let val_str = val. trim ( ) ;
118+
119+ if val_str. is_empty ( ) {
120+ return Err ( Status :: invalid_argument ( format ! (
121+ "Header '{}' has empty value" ,
122+ key
123+ ) ) ) ;
124+ }
125+
126+ let val = MetadataValue :: from_str ( val_str) . map_err ( |_| {
127+ Status :: invalid_argument ( format ! ( "Invalid header value for '{}'" , key) )
128+ } ) ?;
129+
130+ parsed. push ( ( key, val) ) ;
131+ }
132+ }
133+ Arc :: from ( parsed. into_boxed_slice ( ) )
134+ } ;
135+
136+ Ok ( ( endpoint, headers) )
137+ }
138+
93139impl CosmosBuilder {
94140 pub ( crate ) fn make_node (
95141 & self ,
96142 grpc_url : & Arc < String > ,
97143 is_fallback : bool ,
98144 ) -> Result < Node , BuilderError > {
145+ let ( url, mut headers) =
146+ parse_cosmos_grpc ( grpc_url. as_str ( ) ) . map_err ( |e| BuilderError :: InvalidGrpcHeaders {
147+ grpc_url : grpc_url. clone ( ) ,
148+ source : e,
149+ } ) ?;
150+ let grpc_url = Arc :: < String > :: new ( url) ;
151+
99152 let grpc_endpoint =
100153 grpc_url
101154 . parse :: < Endpoint > ( )
@@ -144,9 +197,23 @@ impl CosmosBuilder {
144197
145198 let grpc_channel = grpc_endpoint. connect_lazy ( ) ;
146199
147- let referer_header = self . referer_header ( ) . map ( |x| x. to_owned ( ) ) ;
200+ if let Some ( referer) = self . referer_header ( ) {
201+ if !headers. iter ( ) . any ( |( k, _) | k. as_str ( ) == "referer" ) {
202+ let mut vec = headers. as_ref ( ) . to_vec ( ) ;
203+ vec. push ( (
204+ MetadataKey :: from_bytes ( b"referer" ) . unwrap ( ) ,
205+ MetadataValue :: from_str ( referer) . map_err ( |_| {
206+ BuilderError :: InvalidRefererHeader {
207+ referer : Arc :: new ( referer. to_string ( ) ) ,
208+ source : Status :: invalid_argument ( "Invalid referer header value" ) ,
209+ }
210+ } ) ?,
211+ ) ) ;
212+ headers = Arc :: from ( vec. into_boxed_slice ( ) ) ;
213+ }
214+ }
148215
149- let interceptor = CosmosInterceptor ( referer_header . map ( Arc :: new ) ) ;
216+ let interceptor = CosmosInterceptor ( headers ) ;
150217 let channel = InterceptedService :: new ( grpc_channel, interceptor) ;
151218 let max_decoding_message_size = self . get_max_decoding_message_size ( ) ;
152219
0 commit comments