@@ -26,11 +26,12 @@ use std::time::{Duration, Instant};
2626use tokio:: sync:: { RwLock , Semaphore } ;
2727
2828use actix_web:: Responder ;
29- use actix_web:: http:: header:: { self , HeaderMap } ;
29+ use actix_web:: http:: StatusCode ;
30+ use actix_web:: http:: header:: HeaderMap ;
3031use actix_web:: web:: Path ;
3132use bytes:: Bytes ;
3233use chrono:: Utc ;
33- use http:: { StatusCode , header as http_header } ;
34+ use http:: header;
3435use itertools:: Itertools ;
3536use serde:: de:: { DeserializeOwned , Error } ;
3637use serde_json:: error:: Error as SerdeError ;
@@ -367,10 +368,15 @@ pub async fn sync_streams_with_ingestors(
367368 body : Bytes ,
368369 stream_name : & str ,
369370) -> Result < ( ) , StreamError > {
370- let mut reqwest_headers = http_header :: HeaderMap :: new ( ) ;
371+ let mut reqwest_headers = reqwest :: header :: HeaderMap :: new ( ) ;
371372
372373 for ( key, value) in headers. iter ( ) {
373- reqwest_headers. insert ( key. clone ( ) , value. clone ( ) ) ;
374+ // Convert actix header name/value to reqwest header name/value
375+ if let Ok ( name) = reqwest:: header:: HeaderName :: from_bytes ( key. as_str ( ) . as_bytes ( ) )
376+ && let Ok ( val) = reqwest:: header:: HeaderValue :: from_bytes ( value. as_bytes ( ) )
377+ {
378+ reqwest_headers. insert ( name, val) ;
379+ }
374380 }
375381
376382 let body_clone = body. clone ( ) ;
0 commit comments