diff --git a/scapy/fields.py b/scapy/fields.py index 958824cd0cb..443a2d13124 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -1696,7 +1696,7 @@ def __init__( cbk(pkt:Packet, lst:List[Packet], cur:Optional[Packet], - remain:str + remain:bytes, ) -> Optional[Type[Packet]] The pkt argument contains a reference to the Packet instance diff --git a/scapy/layers/quic.py b/scapy/layers/quic.py new file mode 100644 index 00000000000..7252128c593 --- /dev/null +++ b/scapy/layers/quic.py @@ -0,0 +1,316 @@ +# SPDX-License-Identifier: GPL-2.0-only +# This file is part of Scapy +# See https://scapy.net/ for more information +# Copyright (C) Gabriel Potter + +""" +QUIC + +The draft of a very basic implementation of the structures from [RFC 9000]. +This isn't binded to UDP by default as currently too incomplete. + +TODO: +- payloads. +- encryption. +- automaton. +- etc. +""" + +import struct + +from scapy.packet import ( + Packet, +) +from scapy.fields import ( + _EnumField, + BitEnumField, + BitField, + ByteEnumField, + ByteField, + EnumField, + Field, + FieldLenField, + FieldListField, + IntField, + MultipleTypeField, + ShortField, + StrLenField, +) + +# Typing imports +from typing import ( + Any, + Optional, + Tuple, +) + +# RFC9000 table 3 +_quic_payloads = { + 0x00: "PADDING", + 0x01: "PING", + 0x02: "ACK", + 0x04: "RESET_STREAM", + 0x05: "STOP_SENDING", + 0x06: "CRYPTO", + 0x07: "NEW_TOKEN", + 0x08: "STREAM", + 0x10: "MAX_DATA", + 0x11: "MAX_STREAM_DATA", + 0x12: "MAX_STREAMS", + 0x14: "DATA_BLOCKED", + 0x15: "STREAM_DATA_BLOCKED", + 0x16: "STREAMS_BLOCKED", + 0x18: "NEW_CONNECTION_ID", + 0x19: "RETIRE_CONNECTION_ID", + 0x1A: "PATH_CHALLENGE", + 0x1B: "PATH_RESPONSE", + 0x1C: "CONNECTION_CLOSE", + 0x1E: "HANDSHAKE_DONE", +} + + +# RFC9000 sect 16 +class QuicVarIntField(Field[int, int]): + def addfield(self, pkt: Packet, s: bytes, val: Optional[int]): + val = self.i2m(pkt, val) + if val < 0 or val > 0x3FFFFFFFFFFFFFFF: + raise struct.error("requires 0 <= number <= 4611686018427387903") + if val < 0x40: + return s + struct.pack("!B", val) + elif val < 0x4000: + return s + struct.pack("!H", val | 0x4000) + elif val < 0x40000000: + return s + struct.pack("!I", val | 0x40000000) + else: + return s + struct.pack("!Q", val | 0x4000000000000000) + + def getfield(self, pkt: Packet, s: bytes) -> Tuple[bytes, int]: + length = (s[0] & 0xC0) >> 6 + if length == 0: + return s[1:], struct.unpack("!B", s[:1])[0] & 0x3F + elif length == 1: + return s[2:], struct.unpack("!H", s[:2])[0] & 0x3FFF + elif length == 2: + return s[4:], struct.unpack("!I", s[:4])[0] & 0x3FFFFFFF + elif length == 3: + return s[8:], struct.unpack("!Q", s[:8])[0] & 0x3FFFFFFFFFFFFFFF + else: + raise Exception("Impossible.") + + +class QuicVarLenField(FieldLenField, QuicVarIntField): + pass + + +class QuicVarEnumField(QuicVarIntField, _EnumField[int]): + __slots__ = EnumField.__slots__ + + def __init__(self, name, default, enum): + # type: (str, Optional[int], Any, int) -> None + _EnumField.__init__(self, name, default, enum) # type: ignore + QuicVarIntField.__init__(self, name, default) + + def any2i(self, pkt, x): + # type: (Optional[Packet], Any) -> int + return _EnumField.any2i(self, pkt, x) # type: ignore + + def i2repr( + self, + pkt, # type: Optional[Packet] + x, # type: int + ): + # type: (...) -> Any + return _EnumField.i2repr(self, pkt, x) + + +# -- Headers -- + + +# RFC9000 sect 17.2 +_quic_long_hdr = { + 0: "Short", + 1: "Long", +} + +_quic_long_pkttyp = { + # RFC9000 table 5 + 0x00: "Initial", + 0x01: "0-RTT", + 0x02: "Handshake", + 0x03: "Retry", +} + +# RFC9000 sect 17 abstraction + + +class QUIC(Packet): + match_subclass = True + + @classmethod + def dispatch_hook(cls, _pkt=None, *args, **kargs): + """ + Returns the right class for the given data. + """ + if _pkt: + hdr = _pkt[0] + if hdr & 0x80: + # Long Header packets + if hdr & 0x40 == 0: + return QUIC_Version + else: + typ = (hdr & 0x30) >> 4 + return { + 0: QUIC_Initial, + 1: QUIC_0RTT, + 2: QUIC_Handshake, + 3: QUIC_Retry, + }[typ] + else: + # Short Header packets + return QUIC_1RTT + return QUIC_Initial + + def mysummary(self): + return self.name + + +# RFC9000 sect 17.2.1 + + +class QUIC_Version(QUIC): + name = "QUIC - Version Negotiation" + fields_desc = [ + BitEnumField("HeaderForm", 1, 1, _quic_long_hdr), + BitField("Unused", 0, 7), + IntField("Version", 0), + FieldLenField("DstConnIDLen", None, length_of="DstConnID", fmt="B"), + StrLenField("DstConnID", "", length_from=lambda pkt: pkt.DstConnIDLen), + FieldLenField("SrcConnIDLen", None, length_of="DstConnID", fmt="B"), + StrLenField("SrcConnID", "", length_from=lambda pkt: pkt.SrcConnIDLen), + FieldListField("SupportedVersions", [], IntField("", 0)), + ] + + +# RFC9000 sect 17.2.2 + +QuicPacketNumberField = lambda name, default: MultipleTypeField( + [ + (ByteField(name, default), lambda pkt: pkt.PacketNumberLen == 0), + (ShortField(name, default), lambda pkt: pkt.PacketNumberLen == 1), + (IntField(name, default), lambda pkt: pkt.PacketNumberLen == 2), + ], + ByteField(name, default), +) + + +class QuicPacketNumberBitFieldLenField(BitField): + def i2m(self, pkt, x): + if x is None and pkt is not None: + PacketNumber = pkt.PacketNumber or 0 + if PacketNumber < 0 or PacketNumber > 0xFFFFFFFF: + raise struct.error("requires 0 <= number <= 0xFFFFFFFF") + if PacketNumber < 0x100: + return 0 + elif PacketNumber < 0x10000: + return 1 + elif PacketNumber < 0x100000000: + return 2 + else: + return 3 + elif x is None: + return 0 + return x + + +class QUIC_Initial(QUIC): + name = "QUIC - Initial" + Version = 0x00000001 + fields_desc = ( + [ + BitEnumField("HeaderForm", 1, 1, _quic_long_hdr), + BitField("FixedBit", 1, 1), + BitEnumField("LongPacketType", 0, 2, _quic_long_pkttyp), + BitField("Reserved", 0, 2), + QuicPacketNumberBitFieldLenField("PacketNumberLen", None, 2), + ] + + QUIC_Version.fields_desc[2:7] + + [ + QuicVarLenField("TokenLen", None, length_of="Token"), + StrLenField("Token", "", length_from=lambda pkt: pkt.TokenLen), + QuicVarIntField("Length", 0), + QuicPacketNumberField("PacketNumber", 0), + ] + ) + + +# RFC9000 sect 17.2.3 +class QUIC_0RTT(QUIC): + name = "QUIC - 0-RTT" + LongPacketType = 1 + fields_desc = QUIC_Initial.fields_desc[:10] + [ + QuicVarIntField("Length", 0), + QuicPacketNumberField("PacketNumber", 0), + ] + + +# RFC9000 sect 17.2.4 +class QUIC_Handshake(QUIC): + name = "QUIC - Handshake" + LongPacketType = 2 + fields_desc = QUIC_0RTT.fields_desc + + +# RFC9000 sect 17.2.5 +class QUIC_Retry(QUIC): + name = "QUIC - Retry" + LongPacketType = 3 + Version = 0x00000001 + fields_desc = ( + QUIC_Initial.fields_desc[:3] + + [ + BitField("Unused", 0, 4), + ] + + QUIC_Version.fields_desc[2:7] + ) + + +# RFC9000 sect 17.3 +class QUIC_1RTT(QUIC): + name = "QUIC - 1-RTT" + fields_desc = [ + BitEnumField("HeaderForm", 0, 1, _quic_long_hdr), + BitField("FixedBit", 1, 1), + BitField("SpinBit", 0, 1), + BitField("Reserved", 0, 2), + BitField("KeyPhase", 0, 1), + QuicPacketNumberBitFieldLenField("PacketNumberLen", None, 2), + # FIXME - Destination Connection ID + QuicPacketNumberField("PacketNumber", 0), + ] + + +# RFC9000 sect 19.1 +class QUIC_PADDING(Packet): + fields_desc = [ + ByteEnumField("Type", 0x00, _quic_payloads), + ] + + +# RFC9000 sect 19.2 +class QUIC_PING(Packet): + fields_desc = [ + ByteEnumField("Type", 0x01, _quic_payloads), + ] + + +# RFC9000 sect 19.3 +class QUIC_ACK(Packet): + fields_desc = [ + ByteEnumField("Type", 0x02, _quic_payloads), + ] + + +# Bindings +# bind_bottom_up(UDP, QUIC, dport=443) +# bind_bottom_up(UDP, QUIC, sport=443) +# bind_layers(UDP, QUIC, dport=443, sport=443) diff --git a/scapy/layers/tls/extensions.py b/scapy/layers/tls/extensions.py index 42dfa6f6048..e52e5a0d024 100644 --- a/scapy/layers/tls/extensions.py +++ b/scapy/layers/tls/extensions.py @@ -35,6 +35,7 @@ from scapy.layers.tls.session import _GenericTLSSessionInheritance from scapy.layers.tls.crypto.groups import _tls_named_groups from scapy.layers.tls.crypto.suites import _tls_cipher_suites +from scapy.layers.tls.quic import _QuicTransportParametersField from scapy.themes import AnsiColorTheme from scapy.compat import raw from scapy.config import conf @@ -93,6 +94,7 @@ 0x31: "post_handshake_auth", 0x32: "signature_algorithms_cert", 0x33: "key_share", + 0x39: "quic_transport_parameters", # RFC 9000 0x3374: "next_protocol_negotiation", # RFC-draft-agl-tls-nextprotoneg-03 0xff01: "renegotiation_info", # RFC 5746 @@ -697,6 +699,15 @@ class TLS_Ext_RecordSizeLimit(TLS_Ext_Unknown): # RFC 8449 ShortField("record_size_limit", None)] +class TLS_Ext_QUICTransportParameters(TLS_Ext_Unknown): # RFC9000 + name = "TLS Extension - QUIC Transport Parameters" + fields_desc = [ShortEnumField("type", 0x39, _tls_ext), + FieldLenField("len", None, length_of="params"), + _QuicTransportParametersField("params", + None, + length_from=lambda pkt: pkt.len)] + + _tls_ext_cls = {0: TLS_Ext_ServerName, 1: TLS_Ext_MaxFragLen, 2: TLS_Ext_ClientCertURL, @@ -730,6 +741,7 @@ class TLS_Ext_RecordSizeLimit(TLS_Ext_Unknown): # RFC 8449 0x33: TLS_Ext_KeyShare, # 0x2f: TLS_Ext_CertificateAuthorities, #XXX # 0x30: TLS_Ext_OIDFilters, #XXX + 0x39: TLS_Ext_QUICTransportParameters, 0x3374: TLS_Ext_NPN, 0xff01: TLS_Ext_RenegotiationInfo, 0xffce: TLS_Ext_EncryptedServerName diff --git a/scapy/layers/tls/quic.py b/scapy/layers/tls/quic.py new file mode 100644 index 00000000000..6580bd887a1 --- /dev/null +++ b/scapy/layers/tls/quic.py @@ -0,0 +1,217 @@ +# SPDX-License-Identifier: GPL-2.0-or-later +# This file is part of Scapy +# See https://scapy.net/ for more information + +""" +RFC9000 QUIC Transport Parameters +""" +import struct + +from scapy.config import conf +from scapy.fields import ( + PacketListField, + FieldLenField, + StrLenField, +) +from scapy.packet import Packet + +from scapy.layers.quic import ( + QuicVarIntField, + QuicVarLenField, + QuicVarEnumField, +) + + +_QUIC_TP_type = { + 0x00: "original_destination_connection_id", + 0x01: "max_idle_timeout", + 0x02: "stateless_reset_token", + 0x03: "max_udp_payload_size", + 0x04: "initial_max_data", + 0x05: "initial_max_stream_data_bidi_local", + 0x06: "initial_max_stream_data_bidi_remote", + 0x07: "initial_max_stream_data_uni", + 0x08: "initial_max_streams_bidi", + 0x09: "initial_max_streams_uni", + 0x0A: "ack_delay_exponent", + 0x0B: "max_ack_delay", + 0x0C: "disable_active_migration", + 0x0D: "preferred_address", + 0x0E: "active_connection_id_limit", + 0x0F: "initial_source_connection_id", + 0x10: "retry_source_connection_id", +} + +# Generic values + + +class QUIC_TP_Unknown(Packet): + name = "QUIC Transport Parameter - Scapy Unknown" + fields_desc = [ + QuicVarEnumField("type", None, _QUIC_TP_type), + QuicVarLenField("len", None, length_of="value"), + StrLenField("value", None, length_from=lambda pkt: pkt.len), + ] + + def default_payload_class(self, _): + return conf.padding_layer + + +class _QUIC_VarInt_Len(FieldLenField): + def i2m(self, pkt, x): + if x is None and pkt is not None: + fld, fval = pkt.getfield_and_val(self.length_of) + value = fld.i2len(pkt, fval) or 0 + if value < 0 or value > 0xFFFFFFFF: + raise struct.error("requires 0 <= number <= 0xFFFFFFFF") + if value < 0x100: + return 1 + elif value < 0x10000: + return 2 + elif value < 0x100000000: + return 3 + else: + return 4 + elif x is None: + return 1 + return x + + +class _QUIC_TP_VarIntValue(QUIC_TP_Unknown): + fields_desc = [ + QuicVarEnumField("type", None, _QUIC_TP_type), + _QUIC_VarInt_Len("len", None, length_of="value", fmt="B"), + QuicVarIntField("value", None), + ] + + +# RFC 9000 sect 18.2 + + +class QUIC_TP_OriginalDestinationConnectionId(QUIC_TP_Unknown): + name = "QUIC Transport Parameters - Original Destination Connection Id" + type = 0x00 + + +class QUIC_TP_MaxIdleTimeout(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Max Idle Timeout" + type = 0x01 + + +class QUIC_TP_StatelessResetToken(QUIC_TP_Unknown): + name = "QUIC Transport Parameters - Stateless Reset Token" + type = 0x02 + + +class QUIC_TP_MaxUdpPayloadSize(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Max Udp Payload Size" + type = 0x03 + + +class QUIC_TP_InitialMaxData(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Initial Max Data" + type = 0x04 + + +class QUIC_TP_InitialMaxStreamDataBidiLocal(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Initial Max Stream Data Bidi Local" + type = 0x05 + + +class QUIC_TP_InitialMaxStreamDataBidiRemote(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Initial Max Stream Data Bidi Remote" + type = 0x06 + + +class QUIC_TP_InitialMaxStreamDataUni(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Initial Max Stream Data Uni" + type = 0x07 + + +class QUIC_TP_InitialMaxStreamsBidi(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Initial Max Streams Bidi" + type = 0x08 + + +class QUIC_TP_InitialMaxStreamsUni(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Initial Max Streams Uni" + type = 0x09 + + +class QUIC_TP_AckDelayExponent(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Ack Delay Exponent" + type = 0x0A + + +class QUIC_TP_MaxAckDelay(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Max Ack Delay" + type = 0x0B + + +class QUIC_TP_DisableActiveMigration(QUIC_TP_Unknown): + name = "QUIC Transport Parameters - Disable Active Migration" + fields_desc = [ + QuicVarEnumField("type", 0x0C, _QUIC_TP_type), + QuicVarIntField("len", 0), + ] + + +class QUIC_TP_PreferredAddress(QUIC_TP_Unknown): + name = "QUIC Transport Parameters - Preferred Address" + type = 0x0D + + +class QUIC_TP_ActiveConnectionIdLimit(_QUIC_TP_VarIntValue): + name = "QUIC Transport Parameters - Active Connection Id Limit" + type = 0x0E + + +class QUIC_TP_InitialSourceConnectionId(QUIC_TP_Unknown): + name = "QUIC Transport Parameters - Initial Source Connection Id" + type = 0x0F + + +class QUIC_TP_RetrySourceConnectionId(QUIC_TP_Unknown): + name = "QUIC Transport Parameters - Retry Source Connection Id" + type = 0x10 + + +_QUIC_TP_cls = { + 0x00: QUIC_TP_OriginalDestinationConnectionId, + 0x01: QUIC_TP_MaxIdleTimeout, + 0x02: QUIC_TP_StatelessResetToken, + 0x03: QUIC_TP_MaxUdpPayloadSize, + 0x04: QUIC_TP_InitialMaxData, + 0x05: QUIC_TP_InitialMaxStreamDataBidiLocal, + 0x06: QUIC_TP_InitialMaxStreamDataBidiRemote, + 0x07: QUIC_TP_InitialMaxStreamDataUni, + 0x08: QUIC_TP_InitialMaxStreamsBidi, + 0x09: QUIC_TP_InitialMaxStreamsUni, + 0x0A: QUIC_TP_AckDelayExponent, + 0x0B: QUIC_TP_MaxAckDelay, + 0x0C: QUIC_TP_DisableActiveMigration, + 0x0D: QUIC_TP_PreferredAddress, + 0x0E: QUIC_TP_ActiveConnectionIdLimit, + 0x0F: QUIC_TP_InitialSourceConnectionId, + 0x10: QUIC_TP_RetrySourceConnectionId, +} + + +class _QuicTransportParametersField(PacketListField): + _varfield = QuicVarIntField("", 0) + + def __init__(self, name, default, **kwargs): + kwargs["next_cls_cb"] = self.cls_from_quictptype + super(_QuicTransportParametersField, self).__init__( + name, + default, + **kwargs, + ) + + @classmethod + def cls_from_quictptype(cls, pkt, lst, cur, remain): + _, typ = cls._varfield.getfield(None, remain) + return _QUIC_TP_cls.get( + typ, + QUIC_TP_Unknown, + ) diff --git a/test/scapy/layers/quic.uts b/test/scapy/layers/quic.uts new file mode 100644 index 00000000000..b868a6fc848 --- /dev/null +++ b/test/scapy/layers/quic.uts @@ -0,0 +1,50 @@ +% Scapy QUIC layer tests + ++ QUIC dissection / build + +% We use the examples from https://quic.xargs.org/. Big props & kudos to them ! +% FIXME TODO: THIS IS VERY INCOMPLETE. + += QUIC - Dissect Client Initial Packet + +from scapy.layers.quic import * + +pkt = QUIC(bytes.fromhex("c00000000108000102030405060705635f636964004103001c36a7ed78716be9711ba498b7ed868443bb2e0c514d4d848eadcc7a00d25ce9f9afa483978088de836be68c0b32a24595d7813ea5414a9199329a6d9f7f760dd8bb249bf3f53d9a77fbb7b395b8d66d7879a51fe59ef9601f79998eb3568e1fdc789f640acab3858a82ef2930fa5ce14b5b9ea0bdb29f4572da85aa3def39b7efafffa074b9267070d50b5d07842e49bba3bc787ff295d6ae3b514305f102afe5a047b3fb4c99eb92a274d244d60492c0e2e6e212cef0f9e3f62efd0955e71c768aa6bb3cd80bbb3755c8b7ebee32712f40f2245119487021b4b84e1565e3ca31967ac8604d4032170dec280aeefa095d08b3b7241ef6646a6c86e5c62ce08be099")) +assert QUIC_Initial in pkt +assert pkt.LongPacketType == 0 +assert pkt.DstConnID == b"\x00\x01\x02\x03\x04\x05\x06\x07" +assert pkt.SrcConnID == b"c_cid" +assert pkt.Length == 259 +assert len(pkt.load) + 1 == 259 +assert pkt.PacketNumber == 0 + += QUIC - Dissect Server Initial Packet + +from scapy.layers.quic import * + +pkt = QUIC(bytes.fromhex("c00000000105635f63696405735f63696400407500836855d5d9c823d07c616882ca770279249864b556e51632257e2d8ab1fd0dc04b18b9203fb919d8ef5a33f378a627db674d3c7fce6ca5bb3e8cf90109cbb955665fc1a4b93d05f6eb83252f6631bcadc7402c10f65c52ed15b4429c9f64d84d64fa406cf0b517a926d62a54a9294136b143b033")) +assert QUIC_Initial in pkt +assert pkt.LongPacketType == 0 +assert pkt.DstConnID == b"c_cid" +assert pkt.SrcConnID == b"s_cid" +assert pkt.Length == 117 +assert len(pkt.load) + 1 == 117 +assert pkt.PacketNumber == 0 + += QUIC - Dissect Server Handshake Packet + +from scapy.layers.quic import * + +pkt = QUIC(bytes.fromhex("e00000000105635f63696405735f63696440cf014420f919681c3f0f102a30f5e647a3399abf54bc8e80453134996ba33099056242f3b8e662bbfce42f3ef2b6ba87159147489f8479e849284e983fd905320a62fc7d67e9587797096ca60101d0b2685d8747811178133ad9172b7ff8ea83fd81a814bae27b953a97d57ebff4b4710dba8df82a6b49d7d7fa3d8179cbdb8683d4bfa832645401e5a56a76535f71c6fb3e616c241bb1f43bc147c296f591402997ed49aa0c55e31721d03e14114af2dc458ae03944de5126fe08d66a6ef3ba2ed1025f98fea6d6024998184687dc06")) +assert QUIC_Handshake in pkt +assert pkt.LongPacketType == 2 +assert pkt.DstConnID == b"c_cid" +assert pkt.SrcConnID == b"s_cid" +assert pkt.PacketNumber == 1 + += QUIC - Build Client Initial Packet + +from scapy.layers.quic import * + +pkt = QUIC_Initial(PacketNumberLen=3, DstConnID=b'p\xa2\x8e@\x96\xc5}\xd0\xff\xb6\xc3\xd8\x1b\xcaR', SrcConnID=b'\xf7\x10Q', PacketNumber=116) +assert bytes(pkt) == b'\xc3\x00\x00\x00\x01\x0fp\xa2\x8e@\x96\xc5}\xd0\xff\xb6\xc3\xd8\x1b\xcaR\x0f\xf7\x10Q\x00\x00t' diff --git a/test/scapy/layers/tls/tls13.uts b/test/scapy/layers/tls/tls13.uts index 12d58560874..130723c1998 100644 --- a/test/scapy/layers/tls/tls13.uts +++ b/test/scapy/layers/tls/tls13.uts @@ -1,7 +1,7 @@ % Tests for TLS 1.3 # # Try me with : -# bash test/run_tests -t test/tls13.uts -F +# bash test/run_tests -t test/scapy/layers/tls/tls13.uts -F ~ libressl @@ -1221,3 +1221,89 @@ ch = TLS(b'\x16\x03\x01\x01\x1a\x01\x00\x01\x16\x03\x03\xec\x9c>\xb2\x9e|B\x05\x assert isinstance(ch.msg[0].ext[9], TLS_Ext_PreSharedKey_CH) assert ch.msg[0].ext[9].identities[0].identity.load == b'Client_identity' assert ch.msg[0].ext[9].identities[0].obfuscated_ticket_age == 0 + ++ QUIC Transport Parameters + += QUIC Transport Parameters - Parse hex stream +~ quic + +from scapy.layers.tls.quic import * +from scapy.layers.tls.all import TLS13ClientHello, TLS_Ext_QUICTransportParameters + +ch_data = bytes.fromhex("010001e403034f417babafc5dc240c744225bb09b0c5067618b7501ef4bf7ea73c64249e5d0c000006130213011303010001b50033010c010a00170041048497f2dd89fb1d341b02894edd154ebd5ee5e55594d7935d99d2c05733991cccc9af02200e53bcc80208fa1498c5c88ccf643d598cb05c5fde37a1e468cd593200180061045bff37b0fde67fcfc50b7ab6eb139f51998bdb859632138b30caf96882ef871b27aaf534cce0dcfa157be21343fd6b0db5cc306564f19c46d3c9e175e3dbbb594fe7c393e35de695fc84f64ec4a59ee3cea26a0599a61d6dfc18568fb5c0cb85001d00205af975b0ec59288a578c94890d3264f9ac025ab86f7cd718112da6b923b2e54d001e0038f989efd52e4e8ab64491bfd8b8d30481d854b9394f517148dc8d5a50a43ebbdcca6e4b27229acd2f20b6633632d32e9be6999a40d30561e2002b0003020304000d00140012040308040401050308050501020108070808000a000a000800170018001d001e002d000201010000000e000c00000968332d7365727665720010000500030268330039005301048000ea600404801000000508c0000001000000000608c0000001000000000708c00000010000000008024080090240800a01030b01190e01080f087f317d3033e6423e110c00000001000000016b3343cf") + +ch = TLS13ClientHello(ch_data) +tp = ch.ext[-1] +assert isinstance(tp, TLS_Ext_QUICTransportParameters) +assert isinstance(tp.params[0], QUIC_TP_MaxIdleTimeout) +assert tp.params[0].value == 60000 +assert isinstance(tp.params[1], QUIC_TP_InitialMaxData) +assert tp.params[1].value == 1048576 +assert isinstance(tp.params[2], QUIC_TP_InitialMaxStreamDataBidiLocal) +assert tp.params[2].value == 4294967296 +assert isinstance(tp.params[3], QUIC_TP_InitialMaxStreamDataBidiRemote) +assert tp.params[3].value == 4294967296 +assert isinstance(tp.params[4], QUIC_TP_InitialMaxStreamDataUni) +assert tp.params[4].value == 4294967296 +assert isinstance(tp.params[5], QUIC_TP_InitialMaxStreamsBidi) +assert tp.params[5].value == 128 +assert isinstance(tp.params[6], QUIC_TP_InitialMaxStreamsUni) +assert tp.params[6].value == 128 +assert isinstance(tp.params[7], QUIC_TP_AckDelayExponent) +assert tp.params[7].value == 3 +assert isinstance(tp.params[8], QUIC_TP_MaxAckDelay) +assert tp.params[8].value == 25 +assert isinstance(tp.params[9], QUIC_TP_ActiveConnectionIdLimit) +assert tp.params[9].value == 8 +assert isinstance(tp.params[10], QUIC_TP_InitialSourceConnectionId) +assert tp.params[10].value == bytes.fromhex("7f317d3033e6423e") + += QUIC Transport Parameters - Build packet +~ quic + +from scapy.layers.tls.quic import * +from scapy.layers.tls.all import TLS_Ext_QUICTransportParameters + +tp = TLS_Ext_QUICTransportParameters(params=[ + QUIC_TP_MaxIdleTimeout(value=5000), + QUIC_TP_MaxUdpPayloadSize(value=1350), + QUIC_TP_InitialMaxData(value=10000000), + QUIC_TP_InitialMaxStreamDataBidiLocal(value=1000000), + QUIC_TP_InitialMaxStreamDataBidiRemote(value=1000000), + QUIC_TP_InitialMaxStreamDataUni(value=1000000), + QUIC_TP_InitialMaxStreamsBidi(value=100), + QUIC_TP_InitialMaxStreamsUni(value=100), + QUIC_TP_AckDelayExponent(value=3), + QUIC_TP_MaxAckDelay(value=25), + QUIC_TP_DisableActiveMigration(), + QUIC_TP_InitialSourceConnectionId(value=bytes.fromhex("2173071905d778f98e367b8ad8eeb526484e8f5d")), +]) +actual = tp.build() + +# the expected data is extracted from the ClientHello above +expect = bytes.fromhex("0039004601015388030145460401409896800501400f42400601400f42400701400f424008014064090140640a01030b01190c000f142173071905d778f98e367b8ad8eeb526484e8f5d") + +assert actual == expect + += QUIC Transport Parameters - Build empty packet +~ quic + +from scapy.layers.tls.all import TLS_Ext_QUICTransportParameters + +p = TLS_Ext_QUICTransportParameters(params=[]) +actual = p.build() + +assert actual == b'\x009\x00\x00' + += QUIC Transport Parameters - Throw error if value is a big integer +~ quic + +from scapy.layers.tls.quic import QUIC_TP_InitialMaxData + +# A 62-bit left shift results in an integer of 63 bits +p = QUIC_TP_InitialMaxData(value=1<<62) +try: + p.build() + assert False, "QUIC cannot decode integers with more than 62 bits" +except struct.error: + pass