Skip to content

Commit 1458b7d

Browse files
committed
Linux: Add vast quantities of missing type information
1 parent bfd2102 commit 1458b7d

File tree

1 file changed

+58
-45
lines changed
  • volatility3/framework/symbols/linux/extensions

1 file changed

+58
-45
lines changed

volatility3/framework/symbols/linux/extensions/net.py

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Dict, List, Optional, Union
2+
from typing import Dict, Generator, List, Optional, Union
33

44
from volatility3.framework import objects, exceptions, renderers, interfaces, constants
55
from volatility3.framework.objects import utility
@@ -14,7 +14,7 @@
1414

1515

1616
class net(objects.StructType):
17-
def get_inode(self):
17+
def get_inode(self) -> int:
1818
"""Get the namespace id for this network namespace.
1919
2020
Raises:
@@ -44,7 +44,7 @@ def get_device_name(self) -> str:
4444
"""
4545
return utility.array_to_string(self.name)
4646

47-
def _format_as_mac_address(self, hwaddr):
47+
def _format_as_mac_address(self, hwaddr) -> str:
4848
return ":".join([f"{x:02x}" for x in hwaddr[: self.addr_len]])
4949

5050
def get_mac_address(self) -> Optional[str]:
@@ -71,7 +71,7 @@ def get_mac_address(self) -> Optional[str]:
7171

7272
return self._format_as_mac_address(hwaddr)
7373

74-
def _get_flag_choices(self) -> Dict:
74+
def _get_flag_choices(self) -> Dict[str, int]:
7575
"""Return the net_device flags as a list of strings"""
7676
vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self)
7777
try:
@@ -84,7 +84,9 @@ def _get_flag_choices(self) -> Dict:
8484

8585
return choices
8686

87-
def _get_net_device_flag_value(self, name):
87+
def _get_net_device_flag_value(
88+
self, name
89+
) -> Union[int, interfaces.renderers.BaseAbsentValue]:
8890
"""Return the net_device flag value based on the flag name"""
8991
return self._get_flag_choices().get(name, renderers.UnparsableValue())
9092

@@ -189,7 +191,7 @@ def get_flag_names(self) -> List[str]:
189191
return sorted(net_device_flags)
190192

191193
@property
192-
def promisc(self):
194+
def promisc(self) -> bool:
193195
"""Return if this network interface is in promiscuous mode.
194196
195197
Returns:
@@ -244,7 +246,9 @@ def get_queue_length(self) -> int:
244246

245247

246248
class in_device(objects.StructType):
247-
def get_addresses(self):
249+
def get_addresses(
250+
self,
251+
) -> Generator[interfaces.objects.ObjectInterface, None, None]:
248252
"""Yield the IPv4 ifaddr addresses
249253
250254
Yields:
@@ -257,7 +261,7 @@ def get_addresses(self):
257261

258262

259263
class inet6_dev(objects.StructType):
260-
def get_addresses(self):
264+
def get_addresses(self) -> Generator[interfaces.objects.ObjectInterface]:
261265
"""Yield the IPv6 ifaddr addresses
262266
263267
Yields:
@@ -298,7 +302,7 @@ class in_ifaddr(objects.StructType):
298302
"RT_SCOPE_SITE": "site",
299303
}
300304

301-
def get_scope_type(self):
305+
def get_scope_type(self) -> str:
302306
"""Get the scope type for this IPv4 address
303307
304308
Returns:
@@ -315,15 +319,15 @@ def get_scope_type(self):
315319

316320
return self._rtnl_rtscope_tab.get(rt_scope, "unknown")
317321

318-
def get_address(self):
322+
def get_address(self) -> str:
319323
"""Get an string with the IPv4 address
320324
321325
Returns:
322326
str: the IPv4 address
323327
"""
324328
return conversion.convert_ipv4(self.ifa_address)
325329

326-
def get_prefix_len(self):
330+
def get_prefix_len(self) -> int:
327331
"""Get the IPv4 address prefix len
328332
329333
Returns:
@@ -333,7 +337,7 @@ def get_prefix_len(self):
333337

334338

335339
class inet6_ifaddr(objects.StructType):
336-
def get_scope_type(self):
340+
def get_scope_type(self) -> str:
337341
"""Get the scope type for this IPv6 address
338342
339343
Returns:
@@ -348,15 +352,15 @@ def get_scope_type(self):
348352
else:
349353
return "global"
350354

351-
def get_address(self):
355+
def get_address(self) -> str:
352356
"""Get an string with the IPv6 address
353357
354358
Returns:
355359
str: the IPv6 address
356360
"""
357361
return conversion.convert_ipv6(self.addr.in6_u.u6_addr32)
358362

359-
def get_prefix_len(self):
363+
def get_prefix_len(self) -> int:
360364
"""Get the IPv6 address prefix len
361365
362366
Returns:
@@ -366,7 +370,7 @@ def get_prefix_len(self):
366370

367371

368372
class socket(objects.StructType):
369-
def _get_vol_kernel(self):
373+
def _get_vol_kernel(self) -> interfaces.context.ModuleInterface:
370374
symbol_table_arr = self.vol.type_name.split("!", 1)
371375
symbol_table = symbol_table_arr[0] if len(symbol_table_arr) == 2 else None
372376

@@ -382,7 +386,7 @@ def _get_vol_kernel(self):
382386
kernel = self._context.modules[kernel_module_name]
383387
return kernel
384388

385-
def get_inode(self):
389+
def get_inode(self) -> int:
386390
try:
387391
kernel = self._get_vol_kernel()
388392
except ValueError:
@@ -396,94 +400,100 @@ def get_inode(self):
396400

397401
return vfs_inode.i_ino
398402

399-
def get_state(self):
403+
def get_state(self) -> str:
400404
socket_state_idx = self.state
401405
if 0 <= socket_state_idx < len(linux_constants.SOCKET_STATES):
402406
return linux_constants.SOCKET_STATES[socket_state_idx]
407+
return "Unknown socket state"
403408

404409

405410
class sock(objects.StructType):
406-
def get_family(self):
411+
def get_family(self) -> str:
407412
family_idx = self.__sk_common.skc_family
408413
if 0 <= family_idx < len(linux_constants.SOCK_FAMILY):
409414
return linux_constants.SOCK_FAMILY[family_idx]
415+
return "Unknown socket family"
410416

411-
def get_type(self):
417+
def get_type(self) -> str:
412418
return linux_constants.SOCK_TYPES.get(self.sk_type, "")
413419

414-
def get_inode(self):
420+
def get_inode(self) -> int:
415421
if not self.sk_socket:
416422
return 0
417423
return self.sk_socket.get_inode()
418424

419-
def get_protocol(self):
425+
def get_protocol(self) -> Optional[str]:
420426
return None
421427

422-
def get_state(self):
428+
def get_state(self) -> str:
423429
# Return the generic socket state
424430
if self.has_member("sk"):
425431
return self.sk.sk_socket.get_state()
426432
return self.sk_socket.get_state()
427433

428434

429435
class unix_sock(objects.StructType):
430-
def get_name(self):
436+
def get_name(self) -> Optional[str]:
431437
if not self.addr:
432438
return None
433439
sockaddr_un = self.addr.name.cast("sockaddr_un")
434440
saddr = str(utility.array_to_string(sockaddr_un.sun_path))
435441
return saddr
436442

437-
def get_protocol(self):
443+
def get_protocol(self) -> Optional[str]:
438444
return None
439445

440-
def get_state(self):
446+
def get_state(self) -> str:
441447
"""Return a string representing the sock state."""
442448

443449
# Unix socket states reuse (a subset) of the inet_sock states contants
444450
if self.sk.get_type() == "STREAM":
445451
state_idx = self.sk.__sk_common.skc_state
446452
if 0 <= state_idx < len(linux_constants.TCP_STATES):
447453
return linux_constants.TCP_STATES[state_idx]
448-
else:
449-
# Return the generic socket state
450-
return self.sk.sk_socket.get_state()
454+
else:
455+
return "Unknown unix_sock stream state"
456+
# Return the generic socket state
457+
return self.sk.sk_socket.get_state()
451458

452-
def get_inode(self):
459+
def get_inode(self) -> int:
453460
return self.sk.get_inode()
454461

455462

456463
class inet_sock(objects.StructType):
457-
def get_family(self):
464+
def get_family(self) -> str:
458465
family_idx = self.sk.__sk_common.skc_family
459466
if 0 <= family_idx < len(linux_constants.SOCK_FAMILY):
460467
return linux_constants.SOCK_FAMILY[family_idx]
468+
return "Unknown inet_sock family"
461469

462-
def get_protocol(self):
470+
def get_protocol(self) -> Optional[str]:
463471
# If INET6 family and a proto is defined, we use that specific IPv6 protocol.
464472
# Otherwise, we use the standard IP protocol.
465473
protocol = linux_constants.IP_PROTOCOLS.get(self.sk.sk_protocol)
466474
if self.get_family() == "AF_INET6":
467475
protocol = linux_constants.IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol)
468476
return protocol
469477

470-
def get_state(self):
478+
def get_state(self) -> str:
471479
"""Return a string representing the sock state."""
472480

473481
if self.sk.get_type() == "STREAM":
474482
state_idx = self.sk.__sk_common.skc_state
475483
if 0 <= state_idx < len(linux_constants.TCP_STATES):
476484
return linux_constants.TCP_STATES[state_idx]
477-
else:
478-
# Return the generic socket state
479-
return self.sk.sk_socket.get_state()
485+
else:
486+
return "Unknown inet_sock stream state"
487+
# Return the generic socket state
488+
return self.sk.sk_socket.get_state()
480489

481-
def get_src_port(self):
490+
def get_src_port(self) -> Optional[int]:
482491
sport_le = getattr(self, "sport", getattr(self, "inet_sport", None))
483492
if sport_le is not None:
484493
return socket_module.htons(sport_le)
494+
return None
485495

486-
def get_dst_port(self):
496+
def get_dst_port(self) -> Optional[int]:
487497
sk_common = self.sk.__sk_common
488498
if hasattr(sk_common, "skc_portpair"):
489499
dport_le = sk_common.skc_portpair & 0xFFFF
@@ -497,7 +507,7 @@ def get_dst_port(self):
497507
return None
498508
return socket_module.htons(dport_le)
499509

500-
def get_src_addr(self):
510+
def get_src_addr(self) -> Optional[str]:
501511
sk_common = self.sk.__sk_common
502512
family = sk_common.skc_family
503513
if family == socket_module.AF_INET:
@@ -523,7 +533,7 @@ def get_src_addr(self):
523533
return None
524534
return socket_module.inet_ntop(family, addr_bytes)
525535

526-
def get_dst_addr(self):
536+
def get_dst_addr(self) -> Optional[str]:
527537
sk_common = self.sk.__sk_common
528538
family = sk_common.skc_family
529539
if family == socket_module.AF_INET:
@@ -554,16 +564,17 @@ def get_dst_addr(self):
554564

555565

556566
class netlink_sock(objects.StructType):
557-
def get_protocol(self):
567+
def get_protocol(self) -> str:
558568
protocol_idx = self.sk.sk_protocol
559569
if 0 <= protocol_idx < len(linux_constants.NETLINK_PROTOCOLS):
560570
return linux_constants.NETLINK_PROTOCOLS[protocol_idx]
571+
return "Unknown netlink_sock protocol"
561572

562573
def get_state(self):
563574
# Return the generic socket state
564575
return self.sk.sk_socket.get_state()
565576

566-
def get_portid(self):
577+
def get_portid(self) -> int:
567578
if self.has_member("pid"):
568579
# kernel < 3.7.10
569580
return self.pid
@@ -573,7 +584,7 @@ def get_portid(self):
573584
else:
574585
raise AttributeError("Unable to find a source port id")
575586

576-
def get_dst_portid(self):
587+
def get_dst_portid(self) -> int:
577588
if self.has_member("dst_pid"):
578589
# kernel < 3.7.10
579590
return self.dst_pid
@@ -595,7 +606,7 @@ def get_state(self):
595606

596607

597608
class packet_sock(objects.StructType):
598-
def get_protocol(self):
609+
def get_protocol(self) -> Optional[str]:
599610
eth_proto = socket_module.htons(self.num)
600611
if eth_proto == 0:
601612
return None
@@ -610,15 +621,17 @@ def get_state(self):
610621

611622

612623
class bt_sock(objects.StructType):
613-
def get_protocol(self):
624+
def get_protocol(self) -> Optional[str]:
614625
type_idx = self.sk.sk_protocol
615626
if 0 <= type_idx < len(linux_constants.BLUETOOTH_PROTOCOLS):
616627
return linux_constants.BLUETOOTH_PROTOCOLS[type_idx]
628+
return None
617629

618-
def get_state(self):
630+
def get_state(self) -> Optional[str]:
619631
state_idx = self.sk.__sk_common.skc_state
620632
if 0 <= state_idx < len(linux_constants.BLUETOOTH_STATES):
621633
return linux_constants.BLUETOOTH_STATES[state_idx]
634+
return None
622635

623636

624637
class xdp_sock(objects.StructType):

0 commit comments

Comments
 (0)