@@ -52,47 +52,112 @@ fn session_id() -> SessionId {
5252#[ derive( Debug , serde:: Deserialize ) ]
5353#[ serde( rename_all = "camelCase" ) ]
5454pub struct PostEventQuery {
55- pub session_id : String ,
55+ #[ serde( default ) ] // Use None if session_id is not present in query
56+ pub session_id : Option < String > ,
5657}
5758
5859async fn post_event_handler (
5960 State ( app) : State < App > ,
60- Query ( PostEventQuery { session_id } ) : Query < PostEventQuery > ,
61+ Query ( query_params ) : Query < PostEventQuery > ,
6162 body : Body ,
6263) -> Result < StatusCode , StatusCode > {
64+ tracing:: debug!( ?query_params, "Received POST request" ) ;
6365 const BODY_BYTES_LIMIT : usize = 1 << 22 ;
64- let write_stream = {
65- let rg = app. txs . read ( ) . await ;
66- rg. get ( session_id. as_str ( ) )
67- . ok_or ( StatusCode :: NOT_FOUND ) ?
68- . clone ( )
69- } ;
70- let mut write_stream = write_stream. lock ( ) . await ;
71- let mut body = body. into_data_stream ( ) ;
72- if let ( _, Some ( size) ) = body. size_hint ( ) {
73- if size > BODY_BYTES_LIMIT {
66+ const BUFFER_SIZE : usize = 1 << 12 ; // For new sessions
67+
68+ let ( session_id_arc, c2s_writer_for_body) : ( SessionId , C2SWriter ) =
69+ match query_params. session_id {
70+ Some ( id_str) => {
71+ tracing:: debug!( session_id = %id_str, "sessionId provided in query" ) ;
72+ // Convert String to Arc<str> for map lookup
73+ let session_arc: SessionId = Arc :: from ( id_str. as_str ( ) ) ;
74+ let rg = app. txs . read ( ) . await ;
75+ match rg. get ( & session_arc) {
76+ Some ( writer) => {
77+ tracing:: debug!( session_id = %session_arc, "Found existing session writer" ) ;
78+ ( session_arc, writer. clone ( ) )
79+ }
80+ None => {
81+ tracing:: warn!( session_id = %session_arc, "sessionId provided but not found in active sessions" ) ;
82+ return Err ( StatusCode :: NOT_FOUND ) ;
83+ }
84+ }
85+ }
86+ None => {
87+ tracing:: info!( "sessionId not provided, creating new session for POST request" ) ;
88+ let new_session_id_arc = session_id ( ) ; // fn session_id() -> Arc<str>
89+ tracing:: info!( new_session_id = %new_session_id_arc, "Generated new session ID" ) ;
90+
91+ let ( c2s_read, c2s_write_half) = tokio:: io:: simplex ( BUFFER_SIZE ) ;
92+ // s2c_read/write are also needed for the ByteTransport and Server::run
93+ // _s2c_read is not directly used by this POST handler but needed for the spawned server task.
94+ let ( _s2c_read, s2c_write_half) = tokio:: io:: simplex ( BUFFER_SIZE ) ;
95+
96+ let new_c2s_writer_for_map = Arc :: new ( Mutex :: new ( c2s_write_half) ) ;
97+ app. txs
98+ . write ( )
99+ . await
100+ . insert ( new_session_id_arc. clone ( ) , new_c2s_writer_for_map. clone ( ) ) ;
101+ tracing:: info!( session_id = %new_session_id_arc, "Inserted new session writer into app.txs" ) ;
102+
103+ // Spawn the server task for the new session
104+ let app_clone = app. clone ( ) ;
105+ let task_session_id = new_session_id_arc. clone ( ) ;
106+ tokio:: spawn ( async move {
107+ let router = RouterService ( DocRouter :: new ( ) ) ;
108+ let server = Server :: new ( router) ;
109+ let bytes_transport = ByteTransport :: new ( c2s_read, s2c_write_half) ;
110+ tracing:: info!( session_id = %task_session_id, "Spawning server task for new POST session" ) ;
111+ let _result = server
112+ . run ( bytes_transport)
113+ . await
114+ . inspect_err ( |e| {
115+ tracing:: error!( ?e, session_id = %task_session_id, "Server run error for new POST session" )
116+ } ) ;
117+ app_clone. txs . write ( ) . await . remove ( & task_session_id) ;
118+ tracing:: info!( session_id = %task_session_id, "Cleaned up new POST session from app.txs after server task completion" ) ;
119+ } ) ;
120+ ( new_session_id_arc, new_c2s_writer_for_map)
121+ }
122+ } ;
123+
124+ // Process the request body using c2s_writer_for_body
125+ let mut write_stream_locked = c2s_writer_for_body. lock ( ) . await ;
126+ let mut body_data_stream = body. into_data_stream ( ) ;
127+
128+ if let ( _, Some ( size_hint) ) = body_data_stream. size_hint ( ) {
129+ if size_hint > BODY_BYTES_LIMIT {
130+ tracing:: warn!( %session_id_arc, body_size_hint = size_hint, limit = BODY_BYTES_LIMIT , "Payload too large based on hint" ) ;
74131 return Err ( StatusCode :: PAYLOAD_TOO_LARGE ) ;
75132 }
76133 }
77- // calculate the body size
78- let mut size = 0 ;
79- while let Some ( chunk) = body. next ( ) . await {
80- let Ok ( chunk) = chunk else {
81- return Err ( StatusCode :: BAD_REQUEST ) ;
134+
135+ let mut actual_size = 0 ;
136+ while let Some ( chunk_result) = body_data_stream. next ( ) . await {
137+ let chunk = match chunk_result {
138+ Ok ( c) => c,
139+ Err ( e) => {
140+ tracing:: error!( %session_id_arc, ?e, "Error reading chunk from body stream" ) ;
141+ return Err ( StatusCode :: BAD_REQUEST ) ;
142+ }
82143 } ;
83- size += chunk. len ( ) ;
84- if size > BODY_BYTES_LIMIT {
144+ actual_size += chunk. len ( ) ;
145+ if actual_size > BODY_BYTES_LIMIT {
146+ tracing:: warn!( %session_id_arc, actual_body_size = actual_size, limit = BODY_BYTES_LIMIT , "Payload too large during streaming" ) ;
85147 return Err ( StatusCode :: PAYLOAD_TOO_LARGE ) ;
86148 }
87- write_stream
88- . write_all ( & chunk)
89- . await
90- . map_err ( |_| StatusCode :: INTERNAL_SERVER_ERROR ) ? ;
149+ if let Err ( e ) = write_stream_locked . write_all ( & chunk ) . await {
150+ tracing :: error! ( %session_id_arc , ?e , "Error writing chunk to session stream" ) ;
151+ return Err ( StatusCode :: INTERNAL_SERVER_ERROR ) ;
152+ }
91153 }
92- write_stream
93- . write_u8 ( b'\n' )
94- . await
95- . map_err ( |_| StatusCode :: INTERNAL_SERVER_ERROR ) ?;
154+
155+ if let Err ( e) = write_stream_locked. write_u8 ( b'\n' ) . await {
156+ tracing:: error!( %session_id_arc, ?e, "Error writing newline to session stream" ) ;
157+ return Err ( StatusCode :: INTERNAL_SERVER_ERROR ) ;
158+ }
159+
160+ tracing:: info!( %session_id_arc, "Successfully processed POST request body" ) ;
96161 Ok ( StatusCode :: ACCEPTED )
97162}
98163
0 commit comments