@@ -66,242 +66,3 @@ class Tag(NamedTuple):
6666 nr : TNumber
6767 typ : TType
6868 cls : TClass
69-
70-
71- class Error (Exception ):
72- pass
73-
74-
75- class Decoder :
76- __slots__ = ("m_stack" , "m_tag" )
77-
78- def __init__ (self , data : bytes ) -> None :
79- self .m_stack : List [List ] = [[0 , data ]]
80- self .m_tag : Optional [Tag ] = None
81-
82- def peek (self ) -> Tag :
83- """This method returns the current ASN.1 tag (i.e. the tag that a
84- subsequent `Decoder.read()` call would return) without updating the
85- decoding offset. In case no more data is available from the input,
86- this method returns ``None`` to signal end-of-file.
87-
88- This method is useful if you don't know whether the next tag will be a
89- primitive or a constructed tag. Depending on the return value
90- of `peek`, you would decide to either issue a `Decoder.read()` in case
91- of a primitive type, or an `Decoder.enter()` in case of a constructed
92- type.
93-
94- Note:
95- Because this method does not advance the current offset in the
96- input, calling it multiple times in a row will return the same
97- value for all calls.
98-
99- Returns:
100- `Tag`: The current ASN.1 tag.
101-
102- Raises:
103- `Error`
104- """
105- if self ._end_of_input ():
106- raise Error ("Input is empty." )
107- if self .m_tag is None :
108- self .m_tag = self ._read_tag ()
109- return self .m_tag
110-
111- def read (self , nr : Optional [TNumber ] = None ) -> Tuple [Tag , Any ]:
112- """This method decodes one ASN.1 tag from the input and returns it as a
113- ``(tag, value)`` tuple. ``tag`` is a 3-tuple ``(nr, typ, cls)``,
114- while ``value`` is a Python object representing the ASN.1 value.
115- The offset in the input is increased so that the next `Decoder.read()`
116- call will return the next tag. In case no more data is available from
117- the input, this method returns ``None`` to signal end-of-file.
118-
119- Returns:
120- `Tag`, value: The current ASN.1 tag and its value.
121-
122- Raises:
123- `Error`
124- """
125- if self ._end_of_input ():
126- raise Error ("Input is empty." )
127- tag = self .peek ()
128- length = self ._read_length ()
129- if nr is None :
130- nr = tag .nr | tag .cls
131- value = self ._read_value (nr , length )
132- self .m_tag = None
133- return tag , value
134-
135- def eof (self ) -> bool :
136- """Return True if we are at the end of input.
137-
138- Returns:
139- bool: True if all input has been decoded, and False otherwise.
140- """
141- return self ._end_of_input ()
142-
143- @contextmanager
144- def enter (self ) -> Iterator [None ]:
145- """This method enters the constructed type that is at the current
146- decoding offset.
147-
148- Note:
149- It is an error to call `Decoder.enter()` if the to be decoded ASN.1
150- tag is not of a constructed type.
151-
152- Returns:
153- None
154- """
155- tag = self .peek ()
156- if tag .typ != Type .Constructed :
157- raise Error ("Cannot enter a non-constructed tag." )
158- length = self ._read_length ()
159- bytes_data = self ._read_bytes (length )
160- self .m_stack .append ([0 , bytes_data ])
161- self .m_tag = None
162-
163- yield
164-
165- if len (self .m_stack ) == 1 :
166- raise Error ("Tag stack is empty." )
167- del self .m_stack [- 1 ]
168- self .m_tag = None
169-
170- def _read_tag (self ) -> Tag :
171- """Read a tag from the input."""
172- byte = self ._read_byte ()
173- cls = byte & 0xC0
174- typ = byte & 0x20
175- nr = byte & 0x1F
176- if nr == 0x1F : # Long form of tag encoding
177- nr = 0
178- while True :
179- byte = self ._read_byte ()
180- nr = (nr << 7 ) | (byte & 0x7F )
181- if not byte & 0x80 :
182- break
183- return Tag (nr = nr , typ = typ , cls = cls )
184-
185- def _read_length (self ) -> int :
186- """Read a length from the input."""
187- byte = self ._read_byte ()
188- if byte & 0x80 :
189- count = byte & 0x7F
190- if count == 0x7F :
191- raise Error ("ASN1 syntax error" )
192- bytes_data = self ._read_bytes (count )
193- length = 0
194- for byte in bytes_data :
195- length = (length << 8 ) | int (byte )
196- try :
197- length = int (length )
198- except OverflowError :
199- pass
200- else :
201- length = byte
202- return length
203-
204- def _read_value (self , nr : TNumber , length : int ) -> Any :
205- """Read a value from the input."""
206- bytes_data = self ._read_bytes (length )
207- if nr == Number .Boolean :
208- return self ._decode_boolean (bytes_data )
209- elif nr in (
210- Number .Integer ,
211- Number .Enumerated ,
212- Number .TimeTicks ,
213- Number .Gauge32 ,
214- Number .Counter32 ,
215- Number .Counter64 ,
216- ):
217- return self ._decode_integer (bytes_data )
218- elif nr == Number .Null :
219- return self ._decode_null (bytes_data )
220- elif nr == Number .ObjectIdentifier :
221- return self ._decode_object_identifier (bytes_data )
222- elif nr in (
223- Number .EndOfMibView ,
224- Number .NoSuchObject ,
225- Number .NoSuchInstance ):
226- return None
227- return bytes_data
228-
229- def _read_byte (self ) -> int :
230- """Return the next input byte, or raise an error on end-of-input."""
231- index , input_data = self .m_stack [- 1 ]
232- try :
233- byte : int = input_data [index ]
234- except IndexError :
235- raise Error ("Premature end of input." )
236- self .m_stack [- 1 ][0 ] += 1
237- return byte
238-
239- def _read_bytes (self , count : int ) -> bytes :
240- """Return the next ``count`` bytes of input. Raise error on
241- end-of-input."""
242- index , input_data = self .m_stack [- 1 ]
243- bytes_data : bytes = input_data [index : index + count ]
244- if len (bytes_data ) != count :
245- raise Error ("Premature end of input." )
246- self .m_stack [- 1 ][0 ] += count
247- return bytes_data
248-
249- def _end_of_input (self ) -> bool :
250- """Return True if we are at the end of input."""
251- index , input_data = self .m_stack [- 1 ]
252- assert not index > len (input_data )
253- return cast (int , index ) == len (input_data )
254-
255- @staticmethod
256- def _decode_boolean (bytes_data : bytes ) -> bool :
257- if len (bytes_data ) != 1 :
258- raise Error ("ASN1 syntax error" )
259- return not bytes_data [0 ] == 0
260-
261- @staticmethod
262- def _decode_integer (bytes_data : bytes ) -> int :
263- values = [int (b ) for b in bytes_data ]
264- negative = values [0 ] & 0x80
265- if negative :
266- # make positive by taking two's complement
267- for i in range (len (values )):
268- values [i ] = 0xFF - values [i ]
269- for i in range (len (values ) - 1 , - 1 , - 1 ):
270- values [i ] += 1
271- if values [i ] <= 0xFF :
272- break
273- assert i > 0
274- values [i ] = 0x00
275- value = 0
276- for val in values :
277- value = (value << 8 ) | val
278- if negative :
279- value = - value
280- try :
281- value = int (value )
282- except OverflowError :
283- pass
284- return value
285-
286- @staticmethod
287- def _decode_null (bytes_data : bytes ) -> None :
288- if len (bytes_data ) != 0 :
289- raise Error ("ASN1 syntax error" )
290-
291- @staticmethod
292- def _decode_object_identifier (bytes_data : bytes ) -> TOid :
293- result : List [int ] = []
294- value : int = 0
295- for i in range (len (bytes_data )):
296- byte = int (bytes_data [i ])
297- if value == 0 and byte == 0x80 :
298- raise Error ("ASN1 syntax error" )
299- value = (value << 7 ) | (byte & 0x7F )
300- if not byte & 0x80 :
301- result .append (value )
302- value = 0
303- if len (result ) == 0 or result [0 ] > 1599 :
304- raise Error ("ASN1 syntax error" )
305- result = [result [0 ] // 40 , result [0 ] % 40 ] + result [1 :]
306- # return '.'.join(str(x) for x in result)
307- return tuple (result )
0 commit comments