65
65
DHCP_HLENETHERNET = const (0x06 )
66
66
DHCP_HOPS = const (0x00 )
67
67
68
- MAGIC_COOKIE = const ( 0x63825363 )
68
+ MAGIC_COOKIE = b"c \x82 Sc" # Four bytes 99.130.83.99
69
69
MAX_DHCP_OPT = const (0x10 )
70
70
71
71
# Default DHCP Server port
@@ -179,7 +179,7 @@ def send_dhcp_message(
179
179
# Transaction ID (xid)
180
180
self ._initial_xid = htonl (self ._transaction_id )
181
181
self ._initial_xid = self ._initial_xid .to_bytes (4 , "big" )
182
- _BUFF [4 :7 ] = self ._initial_xid
182
+ _BUFF [4 :8 ] = self ._initial_xid
183
183
184
184
# seconds elapsed
185
185
_BUFF [8 ] = (int (time_elapsed ) & 0xFF00 ) >> 8
@@ -195,18 +195,15 @@ def send_dhcp_message(
195
195
# as they're already set to 0.0.0.0
196
196
# Except when renewing, then fill in ciaddr
197
197
if renew :
198
- _BUFF [12 :15 ] = bytes (self .local_ip )
198
+ _BUFF [12 :16 ] = bytes (self .local_ip )
199
199
200
200
# chaddr
201
201
_BUFF [28 :34 ] = self ._mac_address
202
202
203
203
# NOTE: 192 octets of 0's, BOOTP legacy
204
204
205
205
# Magic Cookie
206
- _BUFF [236 ] = (MAGIC_COOKIE >> 24 ) & 0xFF
207
- _BUFF [237 ] = (MAGIC_COOKIE >> 16 ) & 0xFF
208
- _BUFF [238 ] = (MAGIC_COOKIE >> 8 ) & 0xFF
209
- _BUFF [239 ] = MAGIC_COOKIE & 0xFF
206
+ _BUFF [236 :240 ] = MAGIC_COOKIE
210
207
211
208
# Option - DHCP Message Type
212
209
_BUFF [240 ] = 53
@@ -262,10 +259,10 @@ def send_dhcp_message(
262
259
# pylint: disable=too-many-branches, too-many-statements
263
260
def parse_dhcp_response (
264
261
self ,
265
- ) -> Union [ Tuple [int , bytes ], Tuple [ int , int ] ]:
262
+ ) -> Tuple [int , bytearray ]:
266
263
"""Parse DHCP response from DHCP server.
267
264
268
- :return Union[ Tuple[int, bytes], Tuple[int, int]] : DHCP packet type.
265
+ :return Tuple[int, bytearray] : DHCP packet type and ID .
269
266
"""
270
267
global _BUFF # pylint: disable=global-statement
271
268
# store packet in buffer
@@ -275,22 +272,23 @@ def parse_dhcp_response(
275
272
276
273
# -- Parse Packet, FIXED -- #
277
274
# Validate OP
278
- assert (
279
- _BUFF [ 0 ] == DHCP_BOOT_REPLY
280
- ), "Malformed Packet - \
275
+ if _BUFF [ 0 ] != DHCP_BOOT_REPLY :
276
+ raise RuntimeError (
277
+ "Malformed Packet - \
281
278
DHCP message OP is not expected BOOT Reply."
279
+ )
282
280
283
281
xid = _BUFF [4 :8 ]
284
- if bytes (xid ) < self ._initial_xid :
285
- print ("f" )
286
- return 0 , 0
282
+ if bytes (xid ) != self ._initial_xid :
283
+ raise ValueError ("DHCP response ID mismatch." )
287
284
288
285
self .local_ip = tuple (_BUFF [16 :20 ])
289
- if _BUFF [28 :34 ] == 0 :
290
- return 0 , 0
286
+ # Check that there is a server ID.
287
+ if _BUFF [28 :34 ] == b"\x00 \x00 \x00 \x00 \x00 \x00 " :
288
+ raise ValueError ("No DHCP server ID in the response." )
291
289
292
- if int . from_bytes ( _BUFF [235 :240 ], "big" ) != MAGIC_COOKIE :
293
- return 0 , 0
290
+ if _BUFF [236 :240 ] != MAGIC_COOKIE :
291
+ raise ValueError ( "No DHCP Magic Cookie in the response." )
294
292
295
293
# -- Parse Packet, VARIABLE -- #
296
294
ptr = 240
@@ -323,8 +321,8 @@ def parse_dhcp_response(
323
321
ptr += 1
324
322
opt_len = _BUFF [ptr ]
325
323
ptr += 1
326
- self .gateway_ip = tuple (_BUFF [ptr : ptr + opt_len ])
327
- ptr += opt_len
324
+ self .gateway_ip = tuple (_BUFF [ptr : ptr + 4 ])
325
+ ptr += opt_len # still increment even though we only read 1 addr.
328
326
elif _BUFF [ptr ] == DNS_SERVERS :
329
327
ptr += 1
330
328
opt_len = _BUFF [ptr ]
@@ -429,65 +427,79 @@ def _dhcp_state_machine(self) -> None:
429
427
if self ._sock ._available (): # pylint: disable=protected-access
430
428
if self ._debug :
431
429
print ("* DHCP: Parsing OFFER" )
432
- msg_type , xid = self .parse_dhcp_response ()
433
- if msg_type == DHCP_OFFER :
434
- # Check if transaction ID matches, otherwise it may be an offer
435
- # for another device
436
- if htonl (self ._transaction_id ) == int .from_bytes (xid , "big" ):
437
- if self ._debug :
438
- print (
439
- "* DHCP: Send request to {}" .format (self .dhcp_server_ip )
430
+ try :
431
+ msg_type , xid = self .parse_dhcp_response ()
432
+ except ValueError as error :
433
+ if self ._debug :
434
+ print (error )
435
+ else :
436
+ if msg_type == DHCP_OFFER :
437
+ # Check if transaction ID matches, otherwise it may be an offer
438
+ # for another device
439
+ if htonl (self ._transaction_id ) == int .from_bytes (xid , "big" ):
440
+ if self ._debug :
441
+ print (
442
+ "* DHCP: Send request to {}" .format (
443
+ self .dhcp_server_ip
444
+ )
445
+ )
446
+ self ._transaction_id = (
447
+ self ._transaction_id + 1
448
+ ) & 0x7FFFFFFF
449
+ self .send_dhcp_message (
450
+ DHCP_REQUEST , (time .monotonic () - self ._start_time )
440
451
)
441
- self ._transaction_id = (self ._transaction_id + 1 ) & 0x7FFFFFFF
442
- self .send_dhcp_message (
443
- DHCP_REQUEST , (time .monotonic () - self ._start_time )
444
- )
445
- self ._dhcp_state = STATE_DHCP_REQUEST
452
+ self ._dhcp_state = STATE_DHCP_REQUEST
453
+ else :
454
+ if self ._debug :
455
+ print ("* DHCP: Received OFFER with non-matching xid" )
446
456
else :
447
457
if self ._debug :
448
- print ("* DHCP: Received OFFER with non-matching xid" )
449
- else :
450
- if self ._debug :
451
- print ("* DHCP: Received DHCP Message is not OFFER" )
458
+ print ("* DHCP: Received DHCP Message is not OFFER" )
452
459
453
460
elif self ._dhcp_state == STATE_DHCP_REQUEST :
454
461
if self ._sock ._available (): # pylint: disable=protected-access
455
462
if self ._debug :
456
463
print ("* DHCP: Parsing ACK" )
457
- msg_type , xid = self .parse_dhcp_response ()
458
- # Check if transaction ID matches, otherwise it may be
459
- # for another device
460
- if htonl (self ._transaction_id ) == int .from_bytes (xid , "big" ):
461
- if msg_type == DHCP_ACK :
462
- if self ._debug :
463
- print ("* DHCP: Successful lease" )
464
- self ._sock .close ()
465
- self ._sock = None
466
- self ._dhcp_state = STATE_DHCP_LEASED
467
- self ._last_lease_time = self ._start_time
468
- if self ._lease_time == 0 :
469
- self ._lease_time = DEFAULT_LEASE_TIME
470
- if self ._t1 == 0 :
471
- # T1 is 50% of _lease_time
472
- self ._t1 = self ._lease_time >> 1
473
- if self ._t2 == 0 :
474
- # T2 is 87.5% of _lease_time
475
- self ._t2 = self ._lease_time - (self ._lease_time >> 3 )
476
- self ._renew_in_sec = self ._t1
477
- self ._rebind_in_sec = self ._t2
478
- self ._eth .ifconfig = (
479
- self .local_ip ,
480
- self .subnet_mask ,
481
- self .gateway_ip ,
482
- self .dns_server_ip ,
483
- )
484
- gc .collect ()
464
+ try :
465
+ msg_type , xid = self .parse_dhcp_response ()
466
+ except ValueError as error :
467
+ if self ._debug :
468
+ print (error )
469
+ else :
470
+ # Check if transaction ID matches, otherwise it may be
471
+ # for another device
472
+ if htonl (self ._transaction_id ) == int .from_bytes (xid , "big" ):
473
+ if msg_type == DHCP_ACK :
474
+ if self ._debug :
475
+ print ("* DHCP: Successful lease" )
476
+ self ._sock .close ()
477
+ self ._sock = None
478
+ self ._dhcp_state = STATE_DHCP_LEASED
479
+ self ._last_lease_time = self ._start_time
480
+ if self ._lease_time == 0 :
481
+ self ._lease_time = DEFAULT_LEASE_TIME
482
+ if self ._t1 == 0 :
483
+ # T1 is 50% of _lease_time
484
+ self ._t1 = self ._lease_time >> 1
485
+ if self ._t2 == 0 :
486
+ # T2 is 87.5% of _lease_time
487
+ self ._t2 = self ._lease_time - (self ._lease_time >> 3 )
488
+ self ._renew_in_sec = self ._t1
489
+ self ._rebind_in_sec = self ._t2
490
+ self ._eth .ifconfig = (
491
+ self .local_ip ,
492
+ self .subnet_mask ,
493
+ self .gateway_ip ,
494
+ self .dns_server_ip ,
495
+ )
496
+ gc .collect ()
497
+ else :
498
+ if self ._debug :
499
+ print ("* DHCP: Received DHCP Message is not ACK" )
485
500
else :
486
501
if self ._debug :
487
- print ("* DHCP: Received DHCP Message is not ACK" )
488
- else :
489
- if self ._debug :
490
- print ("* DHCP: Received non-matching xid" )
502
+ print ("* DHCP: Received non-matching xid" )
491
503
492
504
elif self ._dhcp_state == STATE_DHCP_WAIT :
493
505
if time .monotonic () > (self ._start_time + DHCP_WAIT_TIME ):
0 commit comments