1919 WindowUpdated ,
2020)
2121
22- from blacksheep import Content , Request , Response
22+ from blacksheep import Content , Request , Response , StreamedContent
2323
2424# Compatibility for asyncio.timeout (added in Python 3.11)
2525if sys .version_info >= (3 , 11 ):
@@ -423,10 +423,23 @@ async def send(self, request: Request) -> Response:
423423 # Convert request to HTTP/2 headers
424424 h2_headers = self ._convert_request_to_h2_headers (request )
425425
426- # Get request body if present
426+ # Determine if we should stream or materialize the body
427+ # Only StreamedContent and content with unknown length can be streamed
428+ use_streaming = (
429+ request .content
430+ and isinstance (request .content , StreamedContent )
431+ and (request .content .length < 0 or request .content .body is None )
432+ )
433+
434+ # Get request body if present (only for non-streaming content)
427435 body : bytes | None = None
428- if request .content :
429- body = await request .content .read ()
436+ if request .content and not use_streaming :
437+ if request .content .body is not None :
438+ body = request .content .body
439+ else :
440+ body = await request .content .read ()
441+
442+ has_body = body is not None or use_streaming
430443
431444 # Initialize stream tracking with completion event
432445 self .streams [stream_id ] = StreamState (
@@ -448,19 +461,19 @@ async def send(self, request: Request) -> Response:
448461 )
449462
450463 # Send headers (without end_stream if expecting 100-continue with body)
451- if expect_continue and body :
464+ if expect_continue and has_body :
452465 # Don't set end_stream, wait for 100 Continue
453466 self .h2_conn .send_headers (stream_id , h2_headers , end_stream = False )
454467 else :
455468 # Normal behavior
456469 self .h2_conn .send_headers (
457- stream_id , h2_headers , end_stream = ( body is None or len ( body ) == 0 )
470+ stream_id , h2_headers , end_stream = not has_body
458471 )
459472 self .writer .write (self .h2_conn .data_to_send ())
460473 await self .writer .drain ()
461474
462475 # Handle Expect: 100-continue
463- if expect_continue and body :
476+ if expect_continue and has_body :
464477 # Wait for 100 Continue response or error
465478 should_send_body = await self ._wait_for_100_continue_h2 (stream_id )
466479 if not should_send_body :
@@ -469,10 +482,27 @@ async def send(self, request: Request) -> Response:
469482 self .last_used = time .time ()
470483 return await self ._receive_response (stream_id )
471484
472- # Send body if present
473- if body :
485+ # Send body
486+ max_frame_size = self .h2_conn .max_outbound_frame_size
487+ if use_streaming :
488+ # Stream the content in chunks
489+ async for chunk in request .content .get_parts ():
490+ if chunk :
491+ # Send chunk in frame-sized pieces
492+ for i in range (0 , len (chunk ), max_frame_size ):
493+ frame_chunk = chunk [i : i + max_frame_size ]
494+ # Don't set end_stream yet, we don't know if this is the last chunk
495+ self .h2_conn .send_data (
496+ stream_id , frame_chunk , end_stream = False
497+ )
498+ self .writer .write (self .h2_conn .data_to_send ())
499+ await self .writer .drain ()
500+ # Send final empty frame with end_stream=True
501+ self .h2_conn .send_data (stream_id , b"" , end_stream = True )
502+ self .writer .write (self .h2_conn .data_to_send ())
503+ await self .writer .drain ()
504+ elif body :
474505 # Handle flow control for large bodies
475- max_frame_size = self .h2_conn .max_outbound_frame_size
476506 for i in range (0 , len (body ), max_frame_size ):
477507 chunk = body [i : i + max_frame_size ]
478508 is_last = i + max_frame_size >= len (body )
@@ -932,19 +962,39 @@ def _convert_request_to_h11(
932962 if not has_host and request .url .host :
933963 headers .insert (0 , (b"host" , request .url .host ))
934964
935- # Get body
965+ # Get body and handle headers
936966 body : bytes | None = None
967+ use_chunked = False
937968 if request .content :
938- # For h11, we need to handle content synchronously for now
939- # The caller should have already prepared the body
940- if request .content .body is not None :
969+ # Check if we should use chunked encoding (unknown length)
970+ if request .content .length < 0 :
971+ # Streaming content with unknown length - use chunked encoding
972+ use_chunked = True
973+ # Add transfer-encoding header if not present
974+ has_transfer_encoding = any (
975+ h [0 ].lower () == b"transfer-encoding" for h in headers
976+ )
977+ if not has_transfer_encoding :
978+ headers .append ((b"transfer-encoding" , b"chunked" ))
979+ elif request .content .body is not None :
980+ # Content with known length and body already available
941981 body = request .content .body
942982 # Add content-length if not present
943983 has_content_length = any (
944984 h [0 ].lower () == b"content-length" for h in headers
945985 )
946986 if not has_content_length :
947987 headers .append ((b"content-length" , str (len (body )).encode ()))
988+ else :
989+ # Content with known length but body not materialized yet
990+ # Add content-length from content.length if not present
991+ has_content_length = any (
992+ h [0 ].lower () == b"content-length" for h in headers
993+ )
994+ if not has_content_length and request .content .length >= 0 :
995+ headers .append (
996+ (b"content-length" , str (request .content .length ).encode ())
997+ )
948998
949999 # Create h11 Request
9501000 method = (
@@ -985,8 +1035,16 @@ async def send(self, request: Request) -> Response:
9851035 # Connection is in an unusable state, reconnect
9861036 self ._h11_conn = h11 .Connection (our_role = h11 .CLIENT )
9871037
988- # Read body if it's a coroutine/async content
989- if request .content and request .content .body is None :
1038+ # Determine if we need to stream or if we can send body directly
1039+ # Only StreamedContent can be streamed
1040+ use_streaming = (
1041+ request .content
1042+ and isinstance (request .content , StreamedContent )
1043+ and (request .content .length < 0 or request .content .body is None )
1044+ )
1045+
1046+ # For non-streaming content with no body, read it first
1047+ if request .content and request .content .body is None and not use_streaming :
9901048 body_data = await request .content .read ()
9911049 # Update content with read body
9921050 request .content = Content (request .content .type , body_data )
@@ -999,23 +1057,32 @@ async def send(self, request: Request) -> Response:
9991057 h [0 ].lower () == b"expect" and h [1 ].lower () == b"100-continue"
10001058 for h in h11_request .headers
10011059 )
1060+ has_body = body or use_streaming
10021061
10031062 # Send request headers
10041063 data = self ._h11_conn .send (h11_request )
10051064 self .writer .write (data )
10061065 await self .writer .drain ()
10071066
10081067 # Handle Expect: 100-continue
1009- if expect_continue and body :
1068+ if expect_continue and has_body :
10101069 # Wait for 100 Continue or error response
10111070 interim_response = await self ._wait_for_100_continue ()
10121071 if interim_response is not None :
10131072 # Got a final response (e.g., 417 Expectation Failed), don't send body
10141073 return interim_response
10151074 # Got 100 Continue or timeout, proceed to send body
10161075
1017- # Send body if present
1018- if body :
1076+ # Send body
1077+ if use_streaming :
1078+ # Stream the content in chunks
1079+ async for chunk in request .content .get_parts ():
1080+ if chunk :
1081+ data = self ._h11_conn .send (h11 .Data (data = chunk ))
1082+ self .writer .write (data )
1083+ await self .writer .drain ()
1084+ elif body :
1085+ # Send body directly
10191086 data = self ._h11_conn .send (h11 .Data (data = body ))
10201087 self .writer .write (data )
10211088
0 commit comments