6060from dataclasses import dataclass
6161from typing import Any , final
6262
63- from typing_extensions import Self , TypeIs , assert_never
63+ from typing_extensions import Self
6464
6565import aiokafka .codec as codecs
6666from aiokafka .codec import (
8282 DefaultRecordMetadataProtocol ,
8383 DefaultRecordProtocol ,
8484)
85- from ._types import (
86- CodecGzipT ,
87- CodecLz4T ,
88- CodecMaskT ,
89- CodecNoneT ,
90- CodecSnappyT ,
91- CodecZstdT ,
92- )
9385from .util import calc_crc32c , decode_varint , encode_varint , size_of_varint
9486
9587
@@ -116,12 +108,12 @@ class DefaultRecordBase:
116108 CRC_OFFSET = struct .calcsize (">qiib" )
117109 AFTER_LEN_OFFSET = struct .calcsize (">qi" )
118110
119- CODEC_MASK : CodecMaskT = 0x07
120- CODEC_NONE : CodecNoneT = 0x00
121- CODEC_GZIP : CodecGzipT = 0x01
122- CODEC_SNAPPY : CodecSnappyT = 0x02
123- CODEC_LZ4 : CodecLz4T = 0x03
124- CODEC_ZSTD : CodecZstdT = 0x04
111+ CODEC_MASK = 0x07
112+ CODEC_NONE = 0x00
113+ CODEC_GZIP = 0x01
114+ CODEC_SNAPPY = 0x02
115+ CODEC_LZ4 = 0x03
116+ CODEC_ZSTD = 0x04
125117 TIMESTAMP_TYPE_MASK = 0x08
126118 TRANSACTIONAL_MASK = 0x10
127119 CONTROL_MASK = 0x20
@@ -131,9 +123,7 @@ class DefaultRecordBase:
131123
132124 NO_PARTITION_LEADER_EPOCH = - 1
133125
134- def _assert_has_codec (
135- self , compression_type : int
136- ) -> TypeIs [CodecGzipT | CodecSnappyT | CodecLz4T | CodecZstdT ]:
126+ def _assert_has_codec (self , compression_type : int ) -> bool :
137127 if compression_type == self .CODEC_GZIP :
138128 checker , name = codecs .has_gzip , "gzip"
139129 elif compression_type == self .CODEC_SNAPPY :
@@ -240,7 +230,10 @@ def _maybe_uncompress(self) -> None:
240230 elif compression_type == self .CODEC_ZSTD :
241231 uncompressed = zstd_decode (data .tobytes ())
242232 else :
243- assert_never (compression_type )
233+ # Must not be possible
234+ raise RuntimeError (
235+ f"Invalid compression codec { compression_type :#04x} "
236+ )
244237 self ._buffer = bytearray (uncompressed )
245238 self ._pos = 0
246239 self ._decompressed = True
@@ -560,7 +553,10 @@ def _maybe_compress(self) -> bool:
560553 elif self ._compression_type == self .CODEC_ZSTD :
561554 compressed = zstd_encode (data )
562555 else :
563- assert_never (self ._compression_type )
556+ # Must not be possible
557+ raise RuntimeError (
558+ f"Invalid compression codec { self ._compression_type :#04x} "
559+ )
564560 compressed_size = len (compressed )
565561 if len (data ) <= compressed_size :
566562 # We did not get any benefit from compression, lets send
0 commit comments