3
3
import asyncio
4
4
import sys
5
5
from typing import ( # noqa
6
+ TYPE_CHECKING ,
6
7
Any ,
7
8
Awaitable ,
8
9
Callable ,
@@ -66,6 +67,8 @@ def __init__(
66
67
self .loop = loop
67
68
self ._on_chunk_sent : _T_OnChunkSent = on_chunk_sent
68
69
self ._on_headers_sent : _T_OnHeadersSent = on_headers_sent
70
+ self ._headers_buf : Optional [bytes ] = None
71
+ self ._headers_written : bool = False
69
72
70
73
@property
71
74
def transport (self ) -> Optional [asyncio .Transport ]:
@@ -106,14 +109,58 @@ def _writelines(self, chunks: Iterable[bytes]) -> None:
106
109
else :
107
110
transport .writelines (chunks )
108
111
112
+ def _write_chunked_payload (
113
+ self , chunk : Union [bytes , bytearray , "memoryview[int]" , "memoryview[bytes]" ]
114
+ ) -> None :
115
+ """Write a chunk with proper chunked encoding."""
116
+ chunk_len_pre = f"{ len (chunk ):x} \r \n " .encode ("ascii" )
117
+ self ._writelines ((chunk_len_pre , chunk , b"\r \n " ))
118
+
119
+ def _send_headers_with_payload (
120
+ self ,
121
+ chunk : Union [bytes , bytearray , "memoryview[int]" , "memoryview[bytes]" ],
122
+ is_eof : bool ,
123
+ ) -> None :
124
+ """Send buffered headers with payload, coalescing into single write."""
125
+ # Mark headers as written
126
+ self ._headers_written = True
127
+ headers_buf = self ._headers_buf
128
+ self ._headers_buf = None
129
+
130
+ if TYPE_CHECKING :
131
+ # Safe because callers (write() and write_eof()) only invoke this method
132
+ # after checking that self._headers_buf is truthy
133
+ assert headers_buf is not None
134
+
135
+ if not self .chunked :
136
+ # Non-chunked: coalesce headers with body
137
+ if chunk :
138
+ self ._writelines ((headers_buf , chunk ))
139
+ else :
140
+ self ._write (headers_buf )
141
+ return
142
+
143
+ # Coalesce headers with chunked data
144
+ if chunk :
145
+ chunk_len_pre = f"{ len (chunk ):x} \r \n " .encode ("ascii" )
146
+ if is_eof :
147
+ self ._writelines ((headers_buf , chunk_len_pre , chunk , b"\r \n 0\r \n \r \n " ))
148
+ else :
149
+ self ._writelines ((headers_buf , chunk_len_pre , chunk , b"\r \n " ))
150
+ elif is_eof :
151
+ self ._writelines ((headers_buf , b"0\r \n \r \n " ))
152
+ else :
153
+ self ._write (headers_buf )
154
+
109
155
async def write (
110
156
self ,
111
157
chunk : Union [bytes , bytearray , memoryview ],
112
158
* ,
113
159
drain : bool = True ,
114
160
LIMIT : int = 0x10000 ,
115
161
) -> None :
116
- """Writes chunk of data to a stream.
162
+ """
163
+ Writes chunk of data to a stream.
117
164
118
165
write_eof() indicates end of stream.
119
166
writer can't be used after write_eof() method being called.
@@ -142,31 +189,75 @@ async def write(
142
189
if not chunk :
143
190
return
144
191
192
+ # Handle buffered headers for small payload optimization
193
+ if self ._headers_buf and not self ._headers_written :
194
+ self ._send_headers_with_payload (chunk , False )
195
+ if drain and self .buffer_size > LIMIT :
196
+ self .buffer_size = 0
197
+ await self .drain ()
198
+ return
199
+
145
200
if chunk :
146
201
if self .chunked :
147
- self ._writelines (
148
- (f"{ len (chunk ):x} \r \n " .encode ("ascii" ), chunk , b"\r \n " )
149
- )
202
+ self ._write_chunked_payload (chunk )
150
203
else :
151
204
self ._write (chunk )
152
205
153
- if self .buffer_size > LIMIT and drain :
206
+ if drain and self .buffer_size > LIMIT :
154
207
self .buffer_size = 0
155
208
await self .drain ()
156
209
157
210
async def write_headers (
158
211
self , status_line : str , headers : "CIMultiDict[str]"
159
212
) -> None :
160
- """Write request/response status and headers ."""
213
+ """Write headers to the stream ."""
161
214
if self ._on_headers_sent is not None :
162
215
await self ._on_headers_sent (headers )
163
-
164
216
# status + headers
165
217
buf = _serialize_headers (status_line , headers )
166
- self ._write (buf )
218
+ self ._headers_written = False
219
+ self ._headers_buf = buf
220
+
221
+ def send_headers (self ) -> None :
222
+ """Force sending buffered headers if not already sent."""
223
+ if not self ._headers_buf or self ._headers_written :
224
+ return
225
+
226
+ self ._headers_written = True
227
+ headers_buf = self ._headers_buf
228
+ self ._headers_buf = None
229
+
230
+ if TYPE_CHECKING :
231
+ # Safe because we only enter this block when self._headers_buf is truthy
232
+ assert headers_buf is not None
233
+
234
+ self ._write (headers_buf )
167
235
168
236
def set_eof (self ) -> None :
169
237
"""Indicate that the message is complete."""
238
+ if self ._eof :
239
+ return
240
+
241
+ # If headers haven't been sent yet, send them now
242
+ # This handles the case where there's no body at all
243
+ if self ._headers_buf and not self ._headers_written :
244
+ self ._headers_written = True
245
+ headers_buf = self ._headers_buf
246
+ self ._headers_buf = None
247
+
248
+ if TYPE_CHECKING :
249
+ # Safe because we only enter this block when self._headers_buf is truthy
250
+ assert headers_buf is not None
251
+
252
+ # Combine headers and chunked EOF marker in a single write
253
+ if self .chunked :
254
+ self ._writelines ((headers_buf , b"0\r \n \r \n " ))
255
+ else :
256
+ self ._write (headers_buf )
257
+ elif self .chunked and self ._headers_written :
258
+ # Headers already sent, just send the final chunk marker
259
+ self ._write (b"0\r \n \r \n " )
260
+
170
261
self ._eof = True
171
262
172
263
async def write_eof (self , chunk : bytes = b"" ) -> None :
@@ -176,6 +267,7 @@ async def write_eof(self, chunk: bytes = b"") -> None:
176
267
if chunk and self ._on_chunk_sent is not None :
177
268
await self ._on_chunk_sent (chunk )
178
269
270
+ # Handle body/compression
179
271
if self ._compress :
180
272
chunks : List [bytes ] = []
181
273
chunks_len = 0
@@ -188,23 +280,61 @@ async def write_eof(self, chunk: bytes = b"") -> None:
188
280
chunks .append (flush_chunk )
189
281
assert chunks_len
190
282
283
+ # Send buffered headers with compressed data if not yet sent
284
+ if self ._headers_buf and not self ._headers_written :
285
+ self ._headers_written = True
286
+ headers_buf = self ._headers_buf
287
+ self ._headers_buf = None
288
+
289
+ if self .chunked :
290
+ # Coalesce headers with compressed chunked data
291
+ chunk_len_pre = f"{ chunks_len :x} \r \n " .encode ("ascii" )
292
+ self ._writelines (
293
+ (headers_buf , chunk_len_pre , * chunks , b"\r \n 0\r \n \r \n " )
294
+ )
295
+ else :
296
+ # Coalesce headers with compressed data
297
+ self ._writelines ((headers_buf , * chunks ))
298
+ await self .drain ()
299
+ self ._eof = True
300
+ return
301
+
302
+ # Headers already sent, just write compressed data
191
303
if self .chunked :
192
304
chunk_len_pre = f"{ chunks_len :x} \r \n " .encode ("ascii" )
193
305
self ._writelines ((chunk_len_pre , * chunks , b"\r \n 0\r \n \r \n " ))
194
306
elif len (chunks ) > 1 :
195
307
self ._writelines (chunks )
196
308
else :
197
309
self ._write (chunks [0 ])
198
- elif self .chunked :
310
+ await self .drain ()
311
+ self ._eof = True
312
+ return
313
+
314
+ # No compression - send buffered headers if not yet sent
315
+ if self ._headers_buf and not self ._headers_written :
316
+ # Use helper to send headers with payload
317
+ self ._send_headers_with_payload (chunk , True )
318
+ await self .drain ()
319
+ self ._eof = True
320
+ return
321
+
322
+ # Handle remaining body
323
+ if self .chunked :
199
324
if chunk :
200
- chunk_len_pre = f"{ len (chunk ):x} \r \n " .encode ("ascii" )
201
- self ._writelines ((chunk_len_pre , chunk , b"\r \n 0\r \n \r \n " ))
325
+ # Write final chunk with EOF marker
326
+ self ._writelines (
327
+ (f"{ len (chunk ):x} \r \n " .encode ("ascii" ), chunk , b"\r \n 0\r \n \r \n " )
328
+ )
202
329
else :
203
330
self ._write (b"0\r \n \r \n " )
204
- elif chunk :
205
- self ._write (chunk )
331
+ await self .drain ()
332
+ self ._eof = True
333
+ return
206
334
207
- await self .drain ()
335
+ if chunk :
336
+ self ._write (chunk )
337
+ await self .drain ()
208
338
209
339
self ._eof = True
210
340
0 commit comments