|
5 | 5 | import os |
6 | 6 | import shlex |
7 | 7 | import time |
8 | | -from typing import Callable, Optional |
| 8 | +from typing import Callable, Optional, Union |
9 | 9 | import sys |
10 | 10 | from elftools.elf.elffile import ELFFile |
11 | 11 | from elftools.elf.sections import SymbolTableSection |
@@ -39,17 +39,24 @@ def __init__(self, path_or_state, argv=None, workspace_url=None, policy="random" |
39 | 39 | initial_state = _make_initial_state(path_or_state, argv=argv, **kwargs) |
40 | 40 | else: |
41 | 41 | initial_state = path_or_state |
42 | | - |
43 | 42 | super().__init__(initial_state, workspace_url=workspace_url, policy=policy, **kwargs) |
44 | 43 |
|
45 | 44 | # Move the following into a linux plugin |
46 | 45 | self._assertions = {} |
47 | 46 | self.trace = None |
| 47 | + self._linux_machine_arch: str # used when looking up syscall numbers for sys hooks |
48 | 48 | # sugar for 'will_execute_instruction" |
49 | 49 | self._hooks = {} |
50 | 50 | self._after_hooks = {} |
| 51 | + self._sys_hooks = {} |
| 52 | + self._sys_after_hooks = {} |
51 | 53 | self._init_hooks = set() |
52 | 54 |
|
| 55 | + from ..platforms.linux import Linux |
| 56 | + |
| 57 | + if isinstance(initial_state.platform, Linux): |
| 58 | + self._linux_machine_arch = initial_state.platform.current.machine |
| 59 | + |
53 | 60 | # self.subscribe('will_generate_testcase', self._generate_testcase_callback) |
54 | 61 |
|
55 | 62 | ############################################################################ |
@@ -215,54 +222,91 @@ def init(self, f): |
215 | 222 | self.subscribe("will_run", self._init_callback) |
216 | 223 | return f |
217 | 224 |
|
218 | | - def hook(self, pc, after=False): |
| 225 | + def hook( |
| 226 | + self, pc_or_sys: Optional[Union[int, str]], after: bool = False, syscall: bool = False |
| 227 | + ): |
219 | 228 | """ |
220 | 229 | A decorator used to register a hook function for a given instruction address. |
221 | 230 | Equivalent to calling :func:`~add_hook`. |
222 | 231 |
|
223 | | - :param pc: Address of instruction to hook |
224 | | - :type pc: int or None |
| 232 | + :param pc_or_sys: Address of instruction, syscall number, or syscall name to remove hook from |
| 233 | + :type pc_or_sys: int or None if `syscall` = False. int, str, or None if `syscall` = True |
| 234 | + :param after: Hook after PC (or after syscall) executes? |
| 235 | + :param syscall: Catch a syscall invocation instead of instruction? |
225 | 236 | """ |
226 | 237 |
|
227 | 238 | def decorator(f): |
228 | | - self.add_hook(pc, f, after) |
| 239 | + self.add_hook(pc_or_sys, f, after, None, syscall) |
229 | 240 | return f |
230 | 241 |
|
231 | 242 | return decorator |
232 | 243 |
|
233 | 244 | def add_hook( |
234 | 245 | self, |
235 | | - pc: Optional[int], |
| 246 | + pc_or_sys: Optional[Union[int, str]], |
236 | 247 | callback: HookCallback, |
237 | 248 | after: bool = False, |
238 | 249 | state: Optional[State] = None, |
| 250 | + syscall: bool = False, |
239 | 251 | ): |
240 | 252 | """ |
241 | | - Add a callback to be invoked on executing a program counter. Pass `None` |
242 | | - for pc to invoke callback on every instruction. `callback` should be a callable |
243 | | - that takes one :class:`~manticore.core.state.State` argument. |
| 253 | + Add a callback to be invoked on executing a program counter (or syscall). Pass `None` |
| 254 | + for `pc_or_sys` to invoke callback on every instruction (or syscall). `callback` should |
| 255 | + be a callable that takes one :class:`~manticore.core.state.State` argument. |
244 | 256 |
|
245 | | - :param pc: Address of instruction to hook |
| 257 | + :param pc_or_sys: Address of instruction, syscall number, or syscall name to remove hook from |
| 258 | + :type pc_or_sys: int or None if `syscall` = False. int, str, or None if `syscall` = True |
246 | 259 | :param callback: Hook function |
247 | | - :param after: Hook after PC executes? |
| 260 | + :param after: Hook after PC (or after syscall) executes? |
248 | 261 | :param state: Optionally, add hook for this state only, else all states |
| 262 | + :param syscall: Catch a syscall invocation instead of instruction? |
249 | 263 | """ |
250 | | - if not (isinstance(pc, int) or pc is None): |
251 | | - raise TypeError(f"pc must be either an int or None, not {pc.__class__.__name__}") |
| 264 | + if not (isinstance(pc_or_sys, int) or pc_or_sys is None or syscall): |
| 265 | + raise TypeError(f"pc must be either an int or None, not {pc_or_sys.__class__.__name__}") |
| 266 | + elif not (isinstance(pc_or_sys, (int, str)) or pc_or_sys is None) and syscall: |
| 267 | + raise TypeError( |
| 268 | + f"syscall must be either an int, string, or None, not {pc_or_sys.__class__.__name__}" |
| 269 | + ) |
| 270 | + |
| 271 | + if isinstance(pc_or_sys, str): |
| 272 | + from ..platforms import linux_syscalls |
| 273 | + |
| 274 | + table = getattr(linux_syscalls, self._linux_machine_arch) |
| 275 | + for index, name in table.items(): |
| 276 | + if name == pc_or_sys: |
| 277 | + pc_or_sys = index |
| 278 | + break |
| 279 | + if isinstance(pc_or_sys, str): |
| 280 | + logger.warning( |
| 281 | + f"{pc_or_sys} is not a valid syscall name in architecture {self._linux_machine_arch}. " |
| 282 | + "Please refer to manticore/platforms/linux_syscalls.py to find the correct name." |
| 283 | + ) |
| 284 | + return |
252 | 285 |
|
253 | 286 | if state is None: |
254 | 287 | # add hook to all states |
255 | | - hooks, when, hook_callback = ( |
256 | | - (self._hooks, "will_execute_instruction", self._hook_callback) |
257 | | - if not after |
258 | | - else (self._after_hooks, "did_execute_instruction", self._after_hook_callback) |
259 | | - ) |
260 | | - hooks.setdefault(pc, set()).add(callback) |
| 288 | + if not syscall: |
| 289 | + hooks, when, hook_callback = ( |
| 290 | + (self._hooks, "will_execute_instruction", self._hook_callback) |
| 291 | + if not after |
| 292 | + else (self._after_hooks, "did_execute_instruction", self._after_hook_callback) |
| 293 | + ) |
| 294 | + else: |
| 295 | + hooks, when, hook_callback = ( |
| 296 | + (self._sys_hooks, "will_invoke_syscall", self._sys_hook_callback) |
| 297 | + if not after |
| 298 | + else ( |
| 299 | + self._sys_after_hooks, |
| 300 | + "did_invoke_syscall", |
| 301 | + self._sys_after_hook_callback, |
| 302 | + ) |
| 303 | + ) |
| 304 | + hooks.setdefault(pc_or_sys, set()).add(callback) |
261 | 305 | if hooks: |
262 | 306 | self.subscribe(when, hook_callback) |
263 | 307 | else: |
264 | 308 | # only hook for the specified state |
265 | | - state.add_hook(pc, callback, after) |
| 309 | + state.add_hook(pc_or_sys, callback, after, syscall) |
266 | 310 |
|
267 | 311 | def _hook_callback(self, state, pc, instruction): |
268 | 312 | "Invoke all registered generic hooks" |
@@ -293,6 +337,28 @@ def _after_hook_callback(self, state, last_pc, pc, instruction): |
293 | 337 | for cb in self._after_hooks.get(None, []): |
294 | 338 | cb(state) |
295 | 339 |
|
| 340 | + def _sys_hook_callback(self, state, syscall_num): |
| 341 | + "Invoke all registered generic hooks" |
| 342 | + |
| 343 | + # Invoke all syscall_num-specific hooks |
| 344 | + for cb in self._sys_hooks.get(syscall_num, []): |
| 345 | + cb(state) |
| 346 | + |
| 347 | + # Invoke all syscall_num-agnostic hooks |
| 348 | + for cb in self._sys_hooks.get(None, []): |
| 349 | + cb(state) |
| 350 | + |
| 351 | + def _sys_after_hook_callback(self, state, syscall_num): |
| 352 | + "Invoke all registered generic hooks" |
| 353 | + |
| 354 | + # Invoke all syscall_num-specific hooks |
| 355 | + for cb in self._sys_after_hooks.get(syscall_num, []): |
| 356 | + cb(state) |
| 357 | + |
| 358 | + # Invoke all syscall_num-agnostic hooks |
| 359 | + for cb in self._sys_after_hooks.get(None, []): |
| 360 | + cb(state) |
| 361 | + |
296 | 362 | def _init_callback(self, ready_states): |
297 | 363 | for cb in self._init_hooks: |
298 | 364 | # We _should_ only ever have one starting state. Right now we're putting |
|
0 commit comments