1- import datetime , logging , string
1+ import datetime
2+ import logging
3+ import string
4+ from itertools import chain
5+ from typing import Dict , Iterable , List
26
37from volatility3 .framework import constants , exceptions
4- from volatility3 .framework .interfaces import plugins
58from volatility3 .framework .configuration import requirements
6- from volatility3 .framework .renderers import format_hints , TreeGrid
9+ from volatility3 .framework .interfaces import plugins
10+ from volatility3 .framework .renderers import TreeGrid , format_hints
11+ from volatility3 .framework .symbols .windows import extensions
712from volatility3 .plugins .windows import (
813 handles ,
914 info ,
@@ -71,20 +76,19 @@ def _proc_name_to_string(self, proc):
7176 "string" , max_length = proc .ImageFileName .vol .count , errors = "replace"
7277 )
7378
74- def _is_valid_proc_name (self , str ):
75- for c in str :
76- if not c in self .valid_proc_name_chars :
77- return False
78- return True
79+ def _is_valid_proc_name (self , string : str ) -> bool :
80+ return all (c in self .valid_proc_name_chars for c in string )
7981
80- def _filter_garbage_procs (self , proc_list ):
82+ def _filter_garbage_procs (
83+ self , proc_list : Iterable [extensions .EPROCESS ]
84+ ) -> List [extensions .EPROCESS ]:
8185 return [
8286 p
8387 for p in proc_list
8488 if p .is_valid () and self ._is_valid_proc_name (self ._proc_name_to_string (p ))
8589 ]
8690
87- def _translate_offset (self , offset ) :
91+ def _translate_offset (self , offset : int ) -> int :
8892 if not self .config ["physical-offsets" ]:
8993 return offset
9094
@@ -100,21 +104,25 @@ def _translate_offset(self, offset):
100104
101105 return offset
102106
103- def _proc_list_to_dict (self , tasks ):
107+ def _proc_list_to_dict (
108+ self , tasks : Iterable [extensions .EPROCESS ]
109+ ) -> Dict [int , extensions .EPROCESS ]:
104110 tasks = self ._filter_garbage_procs (tasks )
105111 return {self ._translate_offset (proc .vol .offset ): proc for proc in tasks }
106112
107113 def _check_pslist (self , tasks ):
108114 return self ._proc_list_to_dict (tasks )
109115
110- def _check_psscan (self , layer_name , symbol_table ):
116+ def _check_psscan (
117+ self , layer_name : str , symbol_table : str
118+ ) -> Dict [int , extensions .EPROCESS ]:
111119 res = psscan .PsScan .scan_processes (
112120 context = self .context , layer_name = layer_name , symbol_table = symbol_table
113121 )
114122
115123 return self ._proc_list_to_dict (res )
116124
117- def _check_thrdscan (self ):
125+ def _check_thrdscan (self ) -> Dict [ int , extensions . EPROCESS ] :
118126 ret = []
119127
120128 for ethread in thrdscan .ThrdScan .scan_threads (
@@ -135,33 +143,38 @@ def _check_thrdscan(self):
135143
136144 return self ._proc_list_to_dict (ret )
137145
138- def _check_csrss_handles (self , tasks , layer_name , symbol_table ):
139- ret = []
146+ def _check_csrss_handles (
147+ self , tasks : Iterable [extensions .EPROCESS ], layer_name : str , symbol_table : str
148+ ) -> Dict [int , extensions .EPROCESS ]:
149+ ret : List [extensions .EPROCESS ] = []
150+
151+ handles_plugin = handles .Handles (
152+ context = self .context , config_path = self .config_path
153+ )
154+
155+ type_map = handles_plugin .get_type_map (self .context , layer_name , symbol_table )
156+
157+ cookie = handles_plugin .find_cookie (
158+ context = self .context ,
159+ layer_name = layer_name ,
160+ symbol_table = symbol_table ,
161+ )
140162
141163 for p in tasks :
142164 name = self ._proc_name_to_string (p )
143- if name == "csrss.exe" :
144- try :
145- if p .has_member ("ObjectTable" ):
146- handles_plugin = handles .Handles (
147- context = self .context , config_path = self .config_path
148- )
149- hndls = list (handles_plugin .handles (p .ObjectTable ))
150- for h in hndls :
151- if (
152- h .get_object_type (
153- handles_plugin .get_type_map (
154- self .context , layer_name , symbol_table
155- )
156- )
157- == "Process"
158- ):
159- ret .append (h .Body .cast ("_EPROCESS" ))
160-
161- except exceptions .InvalidAddressException :
162- vollog .log (
163- constants .LOGLEVEL_VVV , "Cannot access eprocess object table"
164- )
165+ if name != "csrss.exe" :
166+ continue
167+
168+ try :
169+ ret += [
170+ handle .Body .cast ("_EPROCESS" )
171+ for handle in handles_plugin .handles (p .ObjectTable )
172+ if handle .get_object_type (type_map , cookie ) == "Process"
173+ ]
174+ except exceptions .InvalidAddressException :
175+ vollog .log (
176+ constants .LOGLEVEL_VVV , "Cannot access eprocess object table"
177+ )
165178
166179 return self ._proc_list_to_dict (ret )
167180
@@ -178,7 +191,7 @@ def _generator(self):
178191 )
179192
180193 # get processes from each source
181- processes = {}
194+ processes : Dict [ str , Dict [ int , extensions . EPROCESS ]] = {}
182195
183196 processes ["pslist" ] = self ._check_pslist (kdbg_list_processes )
184197 processes ["psscan" ] = self ._check_psscan (layer_name , symbol_table )
@@ -187,27 +200,20 @@ def _generator(self):
187200 kdbg_list_processes , layer_name , symbol_table
188201 )
189202
190- # print results
191-
192- # list of lists of offsets
193- offsets = [list (processes [source ].keys ()) for source in processes ]
194-
195- # flatten to one list
196- offsets = sum (offsets , [])
197-
198- # remove duplicates
199- offsets = set (offsets )
203+ # Unique set of all offsets from all sources
204+ offsets = set (chain (* (mapping .keys () for mapping in processes .values ())))
200205
201206 for offset in offsets :
202- proc = None
207+ # We know there will be at least one process mapped to each offset
208+ proc : extensions .EPROCESS = next (
209+ mapping [offset ] for mapping in processes .values () if offset in mapping
210+ )
203211
204212 in_sources = {src : False for src in processes }
205213
206- for source in processes :
207- if offset in processes [ source ] :
214+ for source , process_mapping in processes . items () :
215+ if offset in process_mapping :
208216 in_sources [source ] = True
209- if not proc :
210- proc = processes [source ][offset ]
211217
212218 pid = proc .UniqueProcessId
213219 name = self ._proc_name_to_string (proc )
0 commit comments