44"""A module containing a plugin that verifies the operation function
55pointers of network protocols."""
66import logging
7- from typing import List
7+ from typing import List , Tuple , Generator
88
99from volatility3 .framework import exceptions , interfaces
1010from volatility3 .framework import renderers
1818class Check_afinfo (plugins .PluginInterface ):
1919 """Verifies the operation function pointers of network protocols."""
2020
21+ _version = (1 , 0 , 0 )
2122 _required_framework_version = (2 , 0 , 0 )
2223
2324 @classmethod
@@ -30,61 +31,80 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
3031 ),
3132 ]
3233
33- # returns whether the symbol is found within the kernel (system.map) or not
34- def _is_known_address (self , handler_addr ):
35- symbols = list (self .context .symbol_space .get_symbols_by_location (handler_addr ))
36-
37- return len (symbols ) > 0
34+ @classmethod
35+ def _check_members (
36+ cls ,
37+ context : interfaces .context .ContextInterface ,
38+ vmlinux_name : str ,
39+ var_ops : interfaces .objects .ObjectInterface ,
40+ var_name : str ,
41+ members : List [str ],
42+ ) -> Generator [Tuple [str , str , int ], None , None ]:
43+ """
44+ Yields any members that are not pointing inside the kernel
45+ """
46+
47+ vmlinux = context .modules [vmlinux_name ]
3848
39- def _check_members (self , var_ops , var_name , members ):
4049 for check in members :
4150 # redhat-specific garbage
4251 if check .startswith ("__UNIQUE_ID_rh_kabi_hide" ):
4352 continue
4453
45- if check == "write" :
46- addr = var_ops .member (attr = "write" )
47- else :
48- addr = getattr (var_ops , check )
49-
50- if addr and addr != 0 and not self ._is_known_address (addr ):
51- yield check , addr
52-
53- def _check_afinfo (self , var_name , var , op_members , seq_members ):
54- # check if object has a least one of the members used for analysis by this function
55- required_members = ["seq_fops" , "seq_ops" , "seq_show" ]
56- has_required_member = any (var .has_member (member ) for member in required_members )
57- if not has_required_member :
58- vollog .debug (
59- f"{ var_name } object at { hex (var .vol .offset )} had none of the required members: { ', ' .join ([member for member in required_members ])} "
60- )
61- raise exceptions .PluginRequirementException
54+ # These structures have members like `write` and `next`, which are built in Python functions
55+ addr = var_ops .member (attr = check )
6256
63- if var .has_member ("seq_fops" ):
64- for hooked_member , hook_address in self ._check_members (
65- var .seq_fops , var_name , op_members
66- ):
67- yield var_name , hooked_member , hook_address
57+ # Unimplemented handlers are set to 0
58+ if not addr :
59+ continue
6860
61+ if len (vmlinux .get_symbols_by_absolute_location (addr )) == 0 :
62+ yield var_name , check , addr
63+
64+ @classmethod
65+ def _check_pre_4_18_ops (
66+ cls ,
67+ context : interfaces .context .ContextInterface ,
68+ vmlinux_name : str ,
69+ var_name : str ,
70+ var : interfaces .objects .ObjectInterface ,
71+ op_members : List [str ],
72+ seq_members : List [str ],
73+ ):
74+ """
75+ Finds the correct way to reference `op_members`
76+ """
77+ vmlinux = context .modules [vmlinux_name ]
78+
79+ if var .has_member ("seq_fops" ):
80+ yield from cls ._check_members (
81+ context , vmlinux_name , var .seq_fops , var_name , op_members
82+ )
6983 # newer kernels
7084 if var .has_member ("seq_ops" ):
71- for hooked_member , hook_address in self ._check_members (
72- var .seq_ops , var_name , seq_members
73- ):
74- yield var_name , hooked_member , hook_address
85+ yield from cls ._check_members (
86+ context , vmlinux_name , var .seq_ops , var_name , seq_members
87+ )
7588
7689 # this is the most commonly hooked member by rootkits, so a force a check on it
90+ elif var .has_member ("seq_show" ):
91+ if len (vmlinux .get_symbols_by_location (var .seq_show )) == 0 :
92+ yield var_name , "show" , var .seq_show
7793 else :
78- if var .has_member ("seq_show" ):
79- if not self ._is_known_address (var .seq_show ):
80- yield var_name , "show" , var .seq_show
81-
82- def _generator (self ):
83- vmlinux = self .context .modules [self .config ["kernel" ]]
84-
85- op_members = vmlinux .get_type ("file_operations" ).members
86- seq_members = vmlinux .get_type ("seq_operations" ).members
94+ raise exceptions .VolatilityException (
95+ "_check_afinfo_pre_4_18: Unable to find sequence operations members for checking."
96+ )
8797
98+ @classmethod
99+ def _check_afinfo_pre_4_18 (
100+ cls ,
101+ context : interfaces .context .ContextInterface ,
102+ vmlinux_name : str ,
103+ seq_members : str ,
104+ ) -> Generator [Tuple [str , str , int ], None , None ]:
105+ """
106+ Checks the operations structures for network protocols of < 4.18 systems
107+ """
88108 tcp = ("tcp_seq_afinfo" , ["tcp6_seq_afinfo" , "tcp4_seq_afinfo" ])
89109 udp = (
90110 "udp_seq_afinfo" ,
@@ -97,39 +117,93 @@ def _generator(self):
97117 )
98118 protocols = [tcp , udp ]
99119
100- # used to track the calls to _check_afinfo and the
101- # number of errors produced due to missing members
102- symbols_checked = set ()
103- symbols_with_errors = set ()
120+ vmlinux = context .modules [vmlinux_name ]
121+
122+ op_members = vmlinux .get_type ("file_operations" ).members
104123
105124 # loop through all symbols
106125 for struct_type , global_vars in protocols :
107126 for global_var_name in global_vars :
108127 # this will lookup fail for the IPv6 protocols on kernels without IPv6 support
109128 try :
110- global_var = vmlinux .get_symbol (global_var_name )
129+ global_var = vmlinux .object_from_symbol (global_var_name )
111130 except exceptions .SymbolError :
112131 continue
113132
114- global_var = vmlinux .object (
115- object_type = struct_type , offset = global_var .address
133+ yield from cls ._check_pre_4_18_ops (
134+ context ,
135+ vmlinux_name ,
136+ global_var_name ,
137+ global_var ,
138+ op_members ,
139+ seq_members ,
116140 )
117141
118- symbols_checked .add (global_var_name )
119- try :
120- for name , member , address in self ._check_afinfo (
121- global_var_name , global_var , op_members , seq_members
122- ):
123- yield 0 , (name , member , format_hints .Hex (address ))
124- except exceptions .PluginRequirementException :
125- symbols_with_errors .add (global_var_name )
126-
127- # if every call to _check_afinfo failed show a warning
128- if symbols_checked == symbols_with_errors :
129- vollog .warning (
130- "This plugin was not able to check for hooks. This means you are either analyzing an unsupported kernel version or that your symbol table is corrupt."
142+ @classmethod
143+ def _check_afinfo_post_4_18 (
144+ cls ,
145+ context : interfaces .context .ContextInterface ,
146+ vmlinux_name : str ,
147+ seq_members : str ,
148+ ) -> Generator [Tuple [str , str , int ], None , None ]:
149+ """
150+ Checks the operations structures for network protocols of >= 4.18 systems
151+ """
152+ vmlinux = context .modules [vmlinux_name ]
153+
154+ ops_structs = [
155+ "raw_seq_ops" ,
156+ "udp_seq_ops" ,
157+ "arp_seq_ops" ,
158+ "unix_seq_ops" ,
159+ "udp6_seq_ops" ,
160+ "raw6_seq_ops" ,
161+ "tcp_seq_ops" ,
162+ "tcp4_seq_ops" ,
163+ "tcp6_seq_ops" ,
164+ "packet_seq_ops" ,
165+ ]
166+
167+ for protocol_ops_var in ops_structs :
168+ # These will fail if the particular kernel doesn't have support for a protocol like IPv6
169+ try :
170+ protocol_ops = vmlinux .object_from_symbol (protocol_ops_var )
171+ except exceptions .SymbolError :
172+ continue
173+
174+ yield from cls ._check_members (
175+ context , vmlinux_name , protocol_ops , protocol_ops_var , seq_members
131176 )
132177
178+ @classmethod
179+ def check_afinfo (
180+ cls , context : interfaces .context .ContextInterface , vmlinux_name
181+ ) -> Generator [Tuple [str , str , int ], None , None ]:
182+ """
183+ Walks the network protocol operations structures for common network protocols.
184+ Reports any initialized operations members that do not point inside the kernel.
185+ """
186+ vmlinux = context .modules [vmlinux_name ]
187+
188+ type_check = vmlinux .get_type ("tcp_seq_afinfo" )
189+ if type_check .has_member ("seq_fops" ):
190+ checker = cls ._check_afinfo_pre_4_18
191+ else :
192+ checker = cls ._check_afinfo_post_4_18
193+
194+ seq_members = vmlinux .get_type ("seq_operations" ).members
195+
196+ yield from checker (context , vmlinux_name , seq_members )
197+
198+ def _generator (self ):
199+ """
200+ A simple wrapper around `check_afino`
201+ """
202+ for name , member , address in self .check_afinfo (
203+ self .context , self .config ["kernel" ]
204+ ):
205+ yield 0 , (name , member , format_hints .Hex (address ))
206+
133207 def run (self ):
134208 return renderers .TreeGrid (
135209 [
0 commit comments