diff --git a/aiohttp/http2_parser.py b/aiohttp/http2_parser.py new file mode 100644 index 00000000000..1194f2bc406 --- /dev/null +++ b/aiohttp/http2_parser.py @@ -0,0 +1,192 @@ +import base64 # TODO: Pybase64 or libbase64 in Cython would be a good idea here... +from abc import ABC, abstractmethod +from types import GenericAlias +from typing import ( + TYPE_CHECKING, + Generic, + List, + NamedTuple, + Optional, + Tuple, + TypeVar, + Union, +) + +from h2.frame_buffer import FrameBuffer +from hpack.hpack import Decoder +from http_writer import HttpVersion, HttpVersion20 +from hyperframe.frame import ContinuationFrame, Frame, GoAwayFrame, HeadersFrame +from multidict import CIMultiDict, CIMultiDictProxy, istr +from yarl import URL + +from . import hdrs +from .compression_utils import ( + HAS_BROTLI, + HAS_ZSTD, + BrotliDecompressor, + ZLibDecompressor, + ZSTDDecompressor, +) +from .http_parser import RawRequestMessage, RawResponseMessage, _MsgT +from .streams import EMPTY_PAYLOAD, StreamReader +from .typedefs import RawHeaders + +# TODO: Something simillar to llhttp would be effective. +# A Parody of llparse for python also exists (Thanks to me, Vizonex) and making +# A http2 frameparser in C would be pretty simple. +DEFAULT_MAX_HEADER_LIST_SIZE = 2**16 + + +class PartialResponseMessage(NamedTuple): + """utilizes the Building blocks for a RawResponseMessage""" + + version: HttpVersion = HttpVersion20 + code: Optional[int] = None + reason: Optional[str] = None + headers: Optional[CIMultiDict[str]] = None + # raw_headers should be extendable with partials since Continuation frames can happen... + raw_headers: List[Tuple[bytes, bytes]] = list() + should_close: bool = False + compression: Optional[str] = None + upgrade: Optional[bool] = None + chunked: Optional[bool] = None + + +class PartialRequestMessage(NamedTuple): + """utilizes the Building blocks for a RawRequestMessage""" + + method: Optional[str] = None + path: Optional[str] = None + version: HttpVersion = HttpVersion20 + headers: "Optional[CIMultiDict[str]]" = None + raw_headers: List[Tuple[bytes, bytes]] = list() + should_close: bool = False + compression: Optional[str] + upgrade: Optional[bool] = None + chunked: Optional[bool] = None + url: Optional[URL] = None + + +_PartialMsgT = TypeVar("_PartialMsgT", PartialResponseMessage, PartialRequestMessage) + + +class AbstractFrameParser(ABC, Generic[_MsgT, _PartialMsgT]): + """inspired by the h2 this Parser parses http/2 frames + and other data until considered ready to send back a response""" + + __class_getitem__ = classmethod(GenericAlias) + + # NOTE: This is not in the AbstractFrameParser and must be initalized elsewhere. + _buffer: FrameBuffer + + @property + def should_disconnect(self): + """Immutable property for dealing with go-away frames (Server Related, Serves no use on the ClientFrameParser)""" + return self._go_away_issued + + @should_disconnect.setter + def should_disconnect(self, value: bool): + raise AttributeError("should_disconnect is immutable") + + def reset(self): + self._partial = self.create_partial() + self._response = None + + @abstractmethod + def parse_message(self) -> _MsgT: ... + + @abstractmethod + def create_buffer(self) -> FrameBuffer: ... + + @abstractmethod + def create_partial(self) -> _PartialMsgT: ... + + def __init__( + self, + max_line_size: int = 8190, + max_headers: int = 32768, + max_field_size: int = 8190, + ) -> None: + self.max_line_size = max_line_size + self.max_headers = max_headers + self.max_field_size = max_field_size + + self._buffer = self.create_buffer() + self._decoder = Decoder(max_headers) + self._partial = self.create_partial() + self._response: Optional[_MsgT] = None + self._go_away_issued = False + + def feed_data( + self, data: Union[bytes, bytearray, memoryview] + ) -> tuple[list[Frame], Optional[_MsgT], bool]: + """return a list of frames, response if it can be issued and parses + a given set of raw http/2 data""" + data_frames: list[Frame] = [] + self._buffer.add_data(data) + for frame in self._buffer: + if isinstance(frame, (HeadersFrame, ContinuationFrame)): + # Do not allow multiple HeadersFrames if already closed + + # As an aggressive measure against bad actors who wish to abuse the http/2 system, + # allow multiple header-frames and continuation-frames but don't reset parser until + # end developer says to. + if self._response is not None: + # XXX: Still under concept but throwing an exception at this point would be acceptable as we already + # Got frames from this response and don't need more unless parser was reset. + raise RuntimeError("Header frames were already obtained") + + self._partial.raw_headers.extend( + self._decoder.decode(frame.data, raw=True) + ) + # TODO: HPack Could use better typehint overloads and a pull request for that may suffice :) + for k, v in self._decoder.decode(frame.data, raw=False): + if TYPE_CHECKING: + # headers types are truthy and are really strings and not bytes since + # we set raw to False + assert isinstance(k, str) + assert isinstance(v, str) + self._partial.headers.add(k, v) + + if "END_HEADERS" in frame.flags: + # Headers are ready + self._response = self.parse_message() + + elif isinstance(frame, GoAwayFrame): + # Do not accept anymore requests after this one + # since server/client wants to disconnect this stream + self._partial.should_close = self._go_away_issued = True + else: + data_frames.append(frame) + + return data_frames, self._response, self._go_away_issued + + +class ClientFrameParser( + AbstractFrameParser[RawResponseMessage, PartialResponseMessage] +): + """Parses incoming http2 respones from a server""" + + def create_buffer(self): + return FrameBuffer(server=False) + + def create_partial(self): + return PartialResponseMessage() + + # TODO: Need to figure out how http2 headers work (will need to add custom hdrs things too) + def parse_message(self): + return super().parse_message() + + +class ServerFrameParser(AbstractFrameParser[RawRequestMessage, PartialRequestMessage]): + """Parses incomming http2 requests from a client""" + + def create_buffer(self): + return FrameBuffer(server=True) + + def create_partial(self): + return PartialRequestMessage() + + # TODO: Need to figure out how http2 headers work (will need to add custom hdrs things too) + def parse_message(self): + return super().parse_message() diff --git a/aiohttp/http2_writer.py b/aiohttp/http2_writer.py new file mode 100644 index 00000000000..5488c57c70a --- /dev/null +++ b/aiohttp/http2_writer.py @@ -0,0 +1,263 @@ +"""Http related parsers and protocol.""" + +import asyncio +import sys +from typing import Awaitable, Callable, Iterable, Optional, Sequence, Union + +# Reflects h2/connection from a lower-level prespective so +# socket writing can be a bit more optimized. +from hpack.hpack import Decoder, Encoder +from hyperframe.frame import ContinuationFrame, DataFrame, Frame, HeadersFrame +from multidict import CIMultiDict + +from .abc import AbstractStreamWriter +from .base_protocol import BaseProtocol +from .client_exceptions import ClientConnectionResetError +from .compression_utils import ZLibCompressor +from .helpers import NO_EXTENSIONS + +MIN_PAYLOAD_FOR_WRITELINES = 2048 +IS_PY313_BEFORE_313_2 = (3, 13, 0) <= sys.version_info < (3, 13, 2) +IS_PY_BEFORE_312_9 = sys.version_info < (3, 12, 9) +SKIP_WRITELINES = IS_PY313_BEFORE_313_2 or IS_PY_BEFORE_312_9 + +# Callbacks from aiosignal.Signal.send(...) +_T_OnChunkSent = Optional[Callable[[bytes], Awaitable[None]]] +_T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]] + +# Possible HTTP 2 Callback idea for TraceConfig +_T_OnFrameSent = Optional[Callable[[Frame], Awaitable[None]]] + + +# writelines is not safe for use +# on Python 3.12+ until 3.12.9 +# on Python 3.13+ until 3.13.2 +# and on older versions it not any faster than write +# CVE-2024-12254: https://github.com/python/cpython/pull/127656 + + +class H2StreamWriter(AbstractStreamWriter): + """Http 2 stream writer for aiohttp""" + + length: Optional[int] = None + chunked: bool = False + _eof: bool = False + _compress: Optional[ZLibCompressor] = None + _stream_id: Optional[int] = None + + def __init__( + self, + protocol: BaseProtocol, + loop: asyncio.AbstractEventLoop, + stream_id: int, + encoder: Optional[Encoder] = None, + on_chunk_sent: _T_OnChunkSent = None, + on_headers_sent: _T_OnHeadersSent = None, + on_frames_sent: _T_OnFrameSent = None, + max_frame_size: int = 0, + ) -> None: + """ + return `H2StreamWriter` initalized + + :param protocol: the BaseProtocol to utilize + :param stream_id: the stream id for the given writer to utilize. + :param encoder: the encoder to utilize for headers to be serlized with + :param on_chunk_sent: Corresponds to a on_chunk_sent callback (TraceConfig Signal) + :param on_headers_sent: Corresponds to a on_headers_sent callback (TraceConfig Signal) + :param on_frames_sent: Corresponds to a on_frames_sent callback (TraceConfig Signal) + :param max_frame_size: the amount to cutoff each frame at + """ + self._protocol = protocol + self.loop = loop + self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent + self._on_headers_sent: _T_OnHeadersSent = on_headers_sent + self._on_frames_sent: _T_OnFrameSent = on_frames_sent + self._headers_buf: Optional[bytes] = None + self._headers_written: bool = False + + if max_frame_size < 0: + raise ValueError("max_frame_size should be a positive integer") + self._max_frame_size = max_frame_size + self._stream_id = stream_id + self._encoder = encoder or Encoder() + + @property + def transport(self) -> Optional[asyncio.Transport]: + return self._protocol.transport + + @property + def protocol(self) -> BaseProtocol: + return self._protocol + + # TODO: h2 chunking? + def enable_chunking(self) -> None: + self.chunked = True + + # is_eof will help with a few things... + async def write_headers( + self, + # XXX: status_line may need to be replaced with something else + # or modified in the AbstractStreamWriter with + # status:int, reason: bytes | str | None Maybe? + # status_line: str, + headers: "CIMultiDict[str]", + *, + huffman: bool = True, + is_eof: bool = False, + drain: bool = False, + ) -> None: + """Writes a handful of provided headers: + + :param headers: the headers to transform into http2 frames + :param huffman: use huffman encoding + :param is_eof: signal the end of a given handful of given frames + :param drain: perform sendoff immediately + + """ + if self._on_headers_sent is not None: + await self._on_headers_sent(headers) + + # TODO: http/2 bytes wrtiter for Cythonized speedups + # for CIMultiDict[str] to Iterable[tuple[bytes, bytes]] or simillar? + + # We can go lower level with this one in the future. There's already plans to make a cython version of hpack + # for aiohttp and also for hyper / httpx + # I'm not in the mood to compete and I share with all intrested parties :)) + encoded_headers = self._encoder.encode(headers, huffman) + + # h2 comment: Slice into blocks of max_frame_size. Be careful with this: + # it only works right because we never send padded frames or priority + # information on the frames. Revisit this if we do. + if self._max_frame_size: + header_blocks = [ + encoded_headers[i : i + (self._max_frame_size or 0)] + for i in range( + 0, + len(encoded_headers), + (self._max_frame_size or 0), + ) + ] + frames = [HeadersFrame(self._stream_id, header_blocks[0])] + frames.extend( + ContinuationFrame(self._stream_id, block) for block in header_blocks[1:] + ) + if is_eof: + frames[-1].flags.add("END_STREAM") + + await self.send_frames(frames, drain) + + else: + frame = HeadersFrame(self._stream_id, encoded_headers) + if is_eof: + frame.flags.add("END_STREAM") + + await self.send_frame(frame, drain) + + def _writelines(self, chunks: Iterable[bytes]) -> None: + size = 0 + for chunk in chunks: + size += len(chunk) + self.buffer_size += size + self.output_size += size + transport = self._protocol.transport + if transport is None or transport.is_closing(): + raise ClientConnectionResetError("Cannot write to closing transport") + if SKIP_WRITELINES or size < MIN_PAYLOAD_FOR_WRITELINES: + transport.write(b"".join(chunks)) + else: + transport.writelines(chunks) + + # Custom (Vizonex additions) + async def send_frames(self, frames: Sequence[Frame], drain: bool = True) -> None: + """Writes a Sequence of frames + :param frames: the frames that should be written. + :param drain: Sends off data to destination immediately. + """ + if self._on_frames_sent is not None: + for f in frames: + await self._on_frames_sent(f) + self._writelines([f.serialize() for f in frames]) + if drain: + await self.drain() + + def _write(self, chunk: Union[bytes, bytearray, memoryview]) -> None: + size = len(chunk) + self.buffer_size += size + self.output_size += size + transport = self._protocol.transport + if transport is None or transport.is_closing(): + raise ClientConnectionResetError("Cannot write to closing transport") + transport.write(chunk) + + async def send_frame(self, frame: Frame, drain: bool = True) -> None: + """Writes a single frame""" + if self._on_frames_sent is not None: + await self._on_frames_sent(frame) + self._write(frame.serialize()) + if drain: + await self.drain() + + async def send_data( + self, + chunk: Union[bytes, bytearray, memoryview], + is_eof: bool = False, + pad_length: Optional[int] = None, + drain: bool = False, + ) -> None: + """Sends a single data-frame, This also takes compression into account if compression is utilized""" + # Smarter to put the chunk here than anywhere else incase + if self._on_chunk_sent is not None: + await self._on_chunk_sent(chunk) + + if isinstance(chunk, memoryview): + if chunk.nbytes != len(chunk): + # just reshape it + chunk = chunk.cast("c") + + if self._compress is not None: + chunk = await self._compress.compress(chunk) + if not chunk: + return + + if self.length is not None: + chunk_len = len(chunk) + if self.length >= chunk_len: + self.length = self.length - chunk_len + else: + chunk = chunk[: self.length] + self.length = 0 + + if not chunk: + return + + df = DataFrame(self._stream_id, data=chunk) + if pad_length: + df.flags.add("PADDED") + if is_eof: + df.flags.add("END_STREAM") + + await self.send_frame(chunk, drain) + + async def write( + self, chunk: Union[bytes, bytearray, memoryview], *, drain: bool = True + ) -> None: + """ + Writes chunk of data to a stream. + + write_eof() indicates end of stream. + writer can't be used after write_eof() method being called. + write() return drain future. + """ + await self.send_data(chunk, drain) + + async def drain(self) -> None: + """Flush the write buffer. + + The intended use is to write:: + + await w.write(data) + await w.drain() + """ + protocol = self._protocol + if protocol.transport is not None and protocol._paused: + await protocol._drain_helper()