3
3
import asyncio
4
4
import sys
5
5
from typing import ( # noqa
6
+ TYPE_CHECKING ,
6
7
Any ,
7
8
Awaitable ,
8
9
Callable ,
@@ -71,6 +72,8 @@ def __init__(
71
72
self .loop = loop
72
73
self ._on_chunk_sent : _T_OnChunkSent = on_chunk_sent
73
74
self ._on_headers_sent : _T_OnHeadersSent = on_headers_sent
75
+ self ._headers_buf : Optional [bytes ] = None
76
+ self ._headers_written : bool = False
74
77
75
78
@property
76
79
def transport (self ) -> Optional [asyncio .Transport ]:
@@ -118,14 +121,58 @@ def _writelines(
118
121
else :
119
122
transport .writelines (chunks ) # type: ignore[arg-type]
120
123
124
+ def _write_chunked_payload (
125
+ self , chunk : Union [bytes , bytearray , "memoryview[int]" , "memoryview[bytes]" ]
126
+ ) -> None :
127
+ """Write a chunk with proper chunked encoding."""
128
+ chunk_len_pre = f"{ len (chunk ):x} \r \n " .encode ("ascii" )
129
+ self ._writelines ((chunk_len_pre , chunk , b"\r \n " ))
130
+
131
+ def _send_headers_with_payload (
132
+ self ,
133
+ chunk : Union [bytes , bytearray , "memoryview[int]" , "memoryview[bytes]" ],
134
+ is_eof : bool ,
135
+ ) -> None :
136
+ """Send buffered headers with payload, coalescing into single write."""
137
+ # Mark headers as written
138
+ self ._headers_written = True
139
+ headers_buf = self ._headers_buf
140
+ self ._headers_buf = None
141
+
142
+ if TYPE_CHECKING :
143
+ # Safe because callers (write() and write_eof()) only invoke this method
144
+ # after checking that self._headers_buf is truthy
145
+ assert headers_buf is not None
146
+
147
+ if not self .chunked :
148
+ # Non-chunked: coalesce headers with body
149
+ if chunk :
150
+ self ._writelines ((headers_buf , chunk ))
151
+ else :
152
+ self ._write (headers_buf )
153
+ return
154
+
155
+ # Coalesce headers with chunked data
156
+ if chunk :
157
+ chunk_len_pre = f"{ len (chunk ):x} \r \n " .encode ("ascii" )
158
+ if is_eof :
159
+ self ._writelines ((headers_buf , chunk_len_pre , chunk , b"\r \n 0\r \n \r \n " ))
160
+ else :
161
+ self ._writelines ((headers_buf , chunk_len_pre , chunk , b"\r \n " ))
162
+ elif is_eof :
163
+ self ._writelines ((headers_buf , b"0\r \n \r \n " ))
164
+ else :
165
+ self ._write (headers_buf )
166
+
121
167
async def write (
122
168
self ,
123
169
chunk : Union [bytes , bytearray , "memoryview[int]" , "memoryview[bytes]" ],
124
170
* ,
125
171
drain : bool = True ,
126
172
LIMIT : int = 0x10000 ,
127
173
) -> None :
128
- """Writes chunk of data to a stream.
174
+ """
175
+ Writes chunk of data to a stream.
129
176
130
177
write_eof() indicates end of stream.
131
178
writer can't be used after write_eof() method being called.
@@ -154,31 +201,75 @@ async def write(
154
201
if not chunk :
155
202
return
156
203
204
+ # Handle buffered headers for small payload optimization
205
+ if self ._headers_buf and not self ._headers_written :
206
+ self ._send_headers_with_payload (chunk , False )
207
+ if drain and self .buffer_size > LIMIT :
208
+ self .buffer_size = 0
209
+ await self .drain ()
210
+ return
211
+
157
212
if chunk :
158
213
if self .chunked :
159
- self ._writelines (
160
- (f"{ len (chunk ):x} \r \n " .encode ("ascii" ), chunk , b"\r \n " )
161
- )
214
+ self ._write_chunked_payload (chunk )
162
215
else :
163
216
self ._write (chunk )
164
217
165
- if self .buffer_size > LIMIT and drain :
218
+ if drain and self .buffer_size > LIMIT :
166
219
self .buffer_size = 0
167
220
await self .drain ()
168
221
169
222
async def write_headers (
170
223
self , status_line : str , headers : "CIMultiDict[str]"
171
224
) -> None :
172
- """Write request/response status and headers ."""
225
+ """Write headers to the stream ."""
173
226
if self ._on_headers_sent is not None :
174
227
await self ._on_headers_sent (headers )
175
-
176
228
# status + headers
177
229
buf = _serialize_headers (status_line , headers )
178
- self ._write (buf )
230
+ self ._headers_written = False
231
+ self ._headers_buf = buf
232
+
233
+ def send_headers (self ) -> None :
234
+ """Force sending buffered headers if not already sent."""
235
+ if not self ._headers_buf or self ._headers_written :
236
+ return
237
+
238
+ self ._headers_written = True
239
+ headers_buf = self ._headers_buf
240
+ self ._headers_buf = None
241
+
242
+ if TYPE_CHECKING :
243
+ # Safe because we only enter this block when self._headers_buf is truthy
244
+ assert headers_buf is not None
245
+
246
+ self ._write (headers_buf )
179
247
180
248
def set_eof (self ) -> None :
181
249
"""Indicate that the message is complete."""
250
+ if self ._eof :
251
+ return
252
+
253
+ # If headers haven't been sent yet, send them now
254
+ # This handles the case where there's no body at all
255
+ if self ._headers_buf and not self ._headers_written :
256
+ self ._headers_written = True
257
+ headers_buf = self ._headers_buf
258
+ self ._headers_buf = None
259
+
260
+ if TYPE_CHECKING :
261
+ # Safe because we only enter this block when self._headers_buf is truthy
262
+ assert headers_buf is not None
263
+
264
+ # Combine headers and chunked EOF marker in a single write
265
+ if self .chunked :
266
+ self ._writelines ((headers_buf , b"0\r \n \r \n " ))
267
+ else :
268
+ self ._write (headers_buf )
269
+ elif self .chunked and self ._headers_written :
270
+ # Headers already sent, just send the final chunk marker
271
+ self ._write (b"0\r \n \r \n " )
272
+
182
273
self ._eof = True
183
274
184
275
async def write_eof (self , chunk : bytes = b"" ) -> None :
@@ -188,6 +279,7 @@ async def write_eof(self, chunk: bytes = b"") -> None:
188
279
if chunk and self ._on_chunk_sent is not None :
189
280
await self ._on_chunk_sent (chunk )
190
281
282
+ # Handle body/compression
191
283
if self ._compress :
192
284
chunks : List [bytes ] = []
193
285
chunks_len = 0
@@ -200,23 +292,61 @@ async def write_eof(self, chunk: bytes = b"") -> None:
200
292
chunks .append (flush_chunk )
201
293
assert chunks_len
202
294
295
+ # Send buffered headers with compressed data if not yet sent
296
+ if self ._headers_buf and not self ._headers_written :
297
+ self ._headers_written = True
298
+ headers_buf = self ._headers_buf
299
+ self ._headers_buf = None
300
+
301
+ if self .chunked :
302
+ # Coalesce headers with compressed chunked data
303
+ chunk_len_pre = f"{ chunks_len :x} \r \n " .encode ("ascii" )
304
+ self ._writelines (
305
+ (headers_buf , chunk_len_pre , * chunks , b"\r \n 0\r \n \r \n " )
306
+ )
307
+ else :
308
+ # Coalesce headers with compressed data
309
+ self ._writelines ((headers_buf , * chunks ))
310
+ await self .drain ()
311
+ self ._eof = True
312
+ return
313
+
314
+ # Headers already sent, just write compressed data
203
315
if self .chunked :
204
316
chunk_len_pre = f"{ chunks_len :x} \r \n " .encode ("ascii" )
205
317
self ._writelines ((chunk_len_pre , * chunks , b"\r \n 0\r \n \r \n " ))
206
318
elif len (chunks ) > 1 :
207
319
self ._writelines (chunks )
208
320
else :
209
321
self ._write (chunks [0 ])
210
- elif self .chunked :
322
+ await self .drain ()
323
+ self ._eof = True
324
+ return
325
+
326
+ # No compression - send buffered headers if not yet sent
327
+ if self ._headers_buf and not self ._headers_written :
328
+ # Use helper to send headers with payload
329
+ self ._send_headers_with_payload (chunk , True )
330
+ await self .drain ()
331
+ self ._eof = True
332
+ return
333
+
334
+ # Handle remaining body
335
+ if self .chunked :
211
336
if chunk :
212
- chunk_len_pre = f"{ len (chunk ):x} \r \n " .encode ("ascii" )
213
- self ._writelines ((chunk_len_pre , chunk , b"\r \n 0\r \n \r \n " ))
337
+ # Write final chunk with EOF marker
338
+ self ._writelines (
339
+ (f"{ len (chunk ):x} \r \n " .encode ("ascii" ), chunk , b"\r \n 0\r \n \r \n " )
340
+ )
214
341
else :
215
342
self ._write (b"0\r \n \r \n " )
216
- elif chunk :
217
- self ._write (chunk )
343
+ await self .drain ()
344
+ self ._eof = True
345
+ return
218
346
219
- await self .drain ()
347
+ if chunk :
348
+ self ._write (chunk )
349
+ await self .drain ()
220
350
221
351
self ._eof = True
222
352
0 commit comments