Skip to content

Commit cbd4860

Browse files
jstuckeeuwint
authored andcommitted
feat: converted init_systems plugin to v1
1 parent f6c22d2 commit cbd4860

File tree

3 files changed

+259
-275
lines changed

3 files changed

+259
-275
lines changed
Lines changed: 176 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -1,155 +1,190 @@
1+
from __future__ import annotations
2+
13
import re
4+
from enum import Enum
5+
from pathlib import Path
6+
from typing import TYPE_CHECKING, Optional, Union
27

3-
from analysis.PluginBase import AnalysisBasePlugin
4-
from helperFunctions.data_conversion import make_unicode_string
5-
from objects.file import FileObject
8+
from pydantic import BaseModel, Field
9+
from semver import Version
610

7-
FILE_IGNORES = ['README', 'README.md', 'README.txt', 'INSTALL', 'VERSION']
11+
from analysis.plugin import AnalysisPluginV0
812

13+
if TYPE_CHECKING:
14+
from io import FileIO
915

10-
class AnalysisPlugin(AnalysisBasePlugin):
11-
"""
12-
This Plugin searches for Init-Scripts and lists the Services or Script-Files
13-
It displays a short description (if provided) or else the filename
16+
FILE_IGNORES = {'README', 'README.md', 'README.txt', 'INSTALL', 'VERSION'}
1417

15-
Credits:
16-
Original version by Stefan Viergutz created during Firmware Bootcamp WT16/17 at University of Bonn
17-
Refactored and improved by Fraunhofer FKIE
18-
"""
1918

20-
NAME = 'init_systems'
21-
DESCRIPTION = 'detect and analyze auto start services'
22-
DEPENDENCIES = ['file_type'] # noqa: RUF012
23-
VERSION = '0.4.2'
24-
FILE = __file__
19+
class InitType(str, Enum):
20+
init_tab = 'inittab'
21+
initscript = 'initscript'
22+
rc = 'rc'
23+
runit = 'RunIt'
24+
sys_v_init = 'SysVInit'
25+
systemd = 'SystemD'
26+
upstart = 'UpStart'
2527

26-
def additional_setup(self):
27-
self.content = None
2828

29-
@staticmethod
30-
def _is_text_file(file_object):
31-
return file_object.processed_analysis['file_type']['result']['mime'] in ['text/plain']
29+
class SystemDData(BaseModel):
30+
exec_start: Optional[str] = None
31+
description: Optional[str] = None
3232

33-
@staticmethod
34-
def _get_file_path(file_object: FileObject):
35-
return list(file_object.virtual_file_path.values())[0][0]
36-
37-
def _get_systemd_config(self, file_object):
38-
result = {}
39-
match_description = self._findall_regex(r'(?:Description=)(.*)', self.content)
40-
match_exec = self._findall_regex(r'(?<=ExecStart=).*', self.content)
41-
if match_exec:
42-
result['ExecStart'] = '\n'.join(match_exec)
43-
description = match_description if match_description else []
44-
description = self._add_quotes(description)
45-
result['description'] = description if description else [file_object.file_name]
46-
result['init_type'] = ['SystemD']
47-
result['summary'] = result['init_type']
48-
return result
49-
50-
def _get_rc_config(self, _):
51-
result = {}
52-
matches = self._findall_regex(r'^(?!#)(.+)', self.content)
53-
if matches:
54-
result['script'] = '\n'.join(matches)
55-
result['init_type'] = ['rc']
56-
result['summary'] = result['init_type']
57-
return result
58-
59-
def _get_inittab_config(self, _):
60-
result = {}
61-
matches_sysinit = self._findall_regex(r'^[^#].*(?<=sysinit:)([^#].*)', self.content)
62-
matches_respawn = self._findall_regex(r'^[^#].*(?<=respawn:)([^#].*)', self.content)
63-
all_matches = []
64-
all_matches.extend(list(matches_sysinit))
65-
all_matches.extend(list(matches_respawn))
66-
if all_matches:
67-
result['inittab'] = '\n'.join(all_matches)
68-
result['init_type'] = ['inittab']
69-
result['summary'] = result['init_type']
70-
return result
71-
72-
def _get_initscript_config(self, _):
73-
result = {}
74-
matches = self._findall_regex(r'^(?!#)(.+)', self.content)
75-
if matches:
76-
result['script'] = '\n'.join(matches)
77-
result['init_type'] = ['initscript']
78-
result['summary'] = result['init_type']
79-
return result
80-
81-
def _get_upstart_config(self, file_object):
82-
result = {}
83-
match_description = self._findall_regex(r'^[^#].*(?<=description)\s*(.*)', self.content)
84-
match_exec = self._findall_regex(r'[^#]^exec\s*((?:.*\\\n)*.*)', self.content)
85-
match_pre_start = self._findall_regex(
86-
r'(?<=pre-start script\n)(?:(?:[\S\s]*?)[\n]*)(?=\nend script)', self.content
33+
34+
class InitTabData(BaseModel):
35+
sysinit: Optional[str] = None
36+
respawn: Optional[str] = None
37+
38+
39+
class UpstartData(BaseModel):
40+
exec: Optional[str] = None
41+
pre_start: Optional[str] = None
42+
description: Optional[str] = None
43+
44+
45+
class SysVInitData(BaseModel):
46+
description: Optional[str] = None
47+
short_description: Optional[str] = None
48+
49+
50+
class AnalysisPlugin(AnalysisPluginV0):
51+
class Schema(BaseModel):
52+
init_type: Optional[InitType] = Field(
53+
None, description='The type of init system that was identified for this file'
54+
)
55+
data: Optional[Union[SystemDData, InitTabData, UpstartData, SysVInitData]] = Field(
56+
None,
57+
description='Optional meta information and init data contained in this init script',
58+
)
59+
is_init: bool = False
60+
61+
@classmethod
62+
def __get_validators__(cls):
63+
yield cls.validate
64+
65+
@classmethod
66+
def validate(cls, value):
67+
init_type = value.get('init_type')
68+
if init_type == InitType.systemd:
69+
value['data'] = SystemDData(**value['data'])
70+
elif init_type == InitType.init_tab:
71+
value['data'] = InitTabData(**value['data'])
72+
elif init_type == InitType.upstart:
73+
value['data'] = UpstartData(**value['data'])
74+
elif init_type == InitType.sys_v_init:
75+
value['data'] = SysVInitData(**value['data'])
76+
return cls(**value)
77+
78+
def __init__(self):
79+
super().__init__(
80+
metadata=(
81+
self.MetaData(
82+
name='init_systems',
83+
mime_whitelist=['text/plain'],
84+
description='detect and analyze initialization scripts',
85+
version=Version(1, 0, 0),
86+
Schema=self.Schema,
87+
)
88+
)
8789
)
88-
match_script = self._findall_regex(r'(?<=^script\n)(?:(?:[\S\s]*?)[\n]*)(?=\nend script)', self.content)
89-
result['description'] = match_description if match_description else [file_object.file_name]
90-
if match_exec:
91-
result['exec'] = '\n'.join(match_exec)
92-
if match_pre_start:
93-
result['pre-start'] = '\n'.join(match_pre_start)
94-
if match_script:
95-
result['script'] = '\n'.join(match_script)
96-
result['init_type'] = ['UpStart']
97-
result['summary'] = result['init_type']
98-
return result
99-
100-
def _get_runit_config(self, file_object):
101-
# TODO description = filepath
102-
result = {}
103-
match_exec = self._findall_regex(r'^([^#](?:.*\\\n)*.*)', self.content)
104-
if match_exec:
105-
result['script'] = '\n'.join(match_exec)
106-
result['description'] = [file_object.file_name]
107-
result['init_type'] = ['RunIt']
108-
result['summary'] = result['init_type']
109-
return result
110-
111-
def _get_sysvinit_config(self, file_object):
112-
result = {}
113-
match_desc1 = self._findall_regex(r'Short-Description:\s*(.*)', self.content)
114-
match_desc2 = self._findall_regex(r'DESC=\"*([^\"|\n]*)', self.content)
115-
matches = self._findall_regex(r'^(?!#)(.+)', self.content)
116-
description = match_desc1 if match_desc1 else match_desc2 if match_desc2 else []
117-
description_formatted = self._add_quotes(description)
118-
result['description'] = description_formatted if description_formatted else [file_object.file_name]
119-
if matches:
120-
result['script'] = '\n'.join(matches)
121-
result['init_type'] = ['rc']
122-
result['init_type'] = ['SysVInit']
123-
result['summary'] = result['init_type']
124-
return result
125-
126-
def process_object(self, file_object):
127-
if self._is_text_file(file_object) and (file_object.file_name not in FILE_IGNORES):
128-
file_path = self._get_file_path(file_object)
129-
self.content = make_unicode_string(file_object.binary)
130-
if '/inittab' in file_path:
131-
file_object.processed_analysis[self.NAME] = self._get_inittab_config(file_object)
132-
if 'systemd/system/' in file_path:
133-
file_object.processed_analysis[self.NAME] = self._get_systemd_config(file_object)
134-
if file_path.endswith(('etc/rc', 'etc/rc.local', 'etc/rc.firsttime', 'etc/rc.securelevel')):
135-
file_object.processed_analysis[self.NAME] = self._get_rc_config(file_object)
136-
if file_path.endswith('etc/initscript'):
137-
file_object.processed_analysis[self.NAME] = self._get_initscript_config(file_object)
138-
if 'etc/init/' in file_path or 'etc/event.d/' in file_path:
139-
file_object.processed_analysis[self.NAME] = self._get_upstart_config(file_object)
140-
if 'etc/service/' in file_path or 'etc/sv/' in file_path:
141-
file_object.processed_analysis[self.NAME] = self._get_runit_config(file_object)
142-
if 'etc/init.d/' in file_path or 'etc/rc.d/' in file_path:
143-
file_object.processed_analysis[self.NAME] = self._get_sysvinit_config(file_object)
144-
else:
145-
file_object.processed_analysis[self.NAME] = {'summary': []}
146-
return file_object
14790

148-
@staticmethod
149-
def _findall_regex(pattern, content):
150-
regex_compiled = re.compile(pattern, re.MULTILINE)
151-
return regex_compiled.findall(content)
91+
SYSTEMD_EXECSTART_REGEX = re.compile(r'ExecStart=(.*)')
92+
SYSTEMD_DESCRIPTION_REGEX = re.compile(r'Description=(.*)')
93+
94+
def _get_systemd_config(self, file_handle: FileIO) -> Schema:
95+
content = file_handle.read().decode(errors='ignore')
96+
return self.Schema(
97+
is_init=True,
98+
init_type=InitType.systemd,
99+
data=SystemDData(
100+
exec_start=_match(content, self.SYSTEMD_EXECSTART_REGEX),
101+
description=_match(content, self.SYSTEMD_DESCRIPTION_REGEX),
102+
),
103+
)
104+
105+
INITTAB_SYSINIT_REGEX = re.compile(r'^[^#].*(?<=sysinit:)([^#].*)', re.MULTILINE)
106+
INITTAB_RESPAWN_REGEX = re.compile(r'^[^#].*(?<=respawn:)([^#].*)', re.MULTILINE)
107+
108+
def _get_inittab_config(self, file_handle: FileIO) -> Schema:
109+
content = file_handle.read().decode(errors='ignore')
110+
return self.Schema(
111+
is_init=True,
112+
init_type=InitType.init_tab,
113+
data=InitTabData(
114+
sysinit=_match(content, self.INITTAB_SYSINIT_REGEX),
115+
respawn=_match(content, self.INITTAB_RESPAWN_REGEX),
116+
),
117+
)
118+
119+
UPSTART_DESCRIPTION_REGEX = re.compile(r'^[^#].*(?<=description)\s*(.*)', re.MULTILINE)
120+
UPSTART_EXEC_REGEX = re.compile(r'[^#]^exec\s*((?:.*\\\n)*.*)', re.MULTILINE)
121+
UPSTART_PRESTART_REGEX = re.compile(r'(?<=pre-start script\n)[\S\s]*?\n*(?=\nend script)', re.MULTILINE)
122+
123+
def _get_upstart_config(self, file_handle: FileIO) -> Schema:
124+
content = file_handle.read().decode(errors='ignore')
125+
return self.Schema(
126+
is_init=True,
127+
init_type=InitType.upstart,
128+
data=UpstartData(
129+
description=_match(content, self.UPSTART_DESCRIPTION_REGEX),
130+
exec=_match(content, self.UPSTART_EXEC_REGEX),
131+
pre_start=_match(content, self.UPSTART_PRESTART_REGEX),
132+
),
133+
)
134+
135+
SYSVINIT_SHORT_DESC_REGEX = re.compile(r'Short-Description:\s*(.*)', re.MULTILINE)
136+
SYSVINIT_DESC_REGEX = re.compile(r'DESC=\"*([^\"|\n]*)', re.MULTILINE)
137+
138+
def _get_sysvinit_config(self, file_handle: FileIO) -> Schema:
139+
content = file_handle.read().decode(errors='ignore')
140+
return self.Schema(
141+
is_init=True,
142+
init_type=InitType.sys_v_init,
143+
data=SysVInitData(
144+
description=_match(content, self.SYSVINIT_DESC_REGEX),
145+
short_description=_match(content, self.SYSVINIT_SHORT_DESC_REGEX),
146+
),
147+
)
148+
149+
def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel | dict]) -> Schema:
150+
del analyses
151+
file_path = list(virtual_file_path.values())[0][0]
152+
if Path(file_path).name not in FILE_IGNORES:
153+
result = self._get_script_type_from_path(file_path, file_handle)
154+
if result.is_init and not self._has_no_content(file_handle):
155+
return result
156+
return self.Schema(is_init=False)
157+
158+
def _get_script_type_from_path(self, file_path: str, file_handle: FileIO) -> Schema: # noqa: PLR0911
159+
if '/inittab' in file_path:
160+
return self._get_inittab_config(file_handle)
161+
if 'systemd/system/' in file_path:
162+
return self._get_systemd_config(file_handle)
163+
if file_path.endswith(('etc/rc', 'etc/rc.local', 'etc/rc.firsttime', 'etc/rc.securelevel')):
164+
return self.Schema(is_init=True, init_type=InitType.rc)
165+
if file_path.endswith('etc/initscript'):
166+
return self.Schema(is_init=True, init_type=InitType.initscript)
167+
if 'etc/init/' in file_path or 'etc/event.d/' in file_path:
168+
return self._get_upstart_config(file_handle)
169+
if 'etc/service/' in file_path or 'etc/sv/' in file_path:
170+
return self.Schema(is_init=True, init_type=InitType.runit)
171+
if 'etc/init.d/' in file_path or 'etc/rc.d/' in file_path:
172+
return self._get_sysvinit_config(file_handle)
173+
return self.Schema(is_init=False)
174+
175+
def summarize(self, result: Schema) -> list[str]:
176+
if result.is_init and result.init_type:
177+
return [result.init_type]
178+
return []
152179

153180
@staticmethod
154-
def _add_quotes(unquoted_list):
155-
return [f'"{element}"' for element in unquoted_list]
181+
def _has_no_content(file_handle: FileIO) -> bool:
182+
file_handle.seek(0)
183+
content = file_handle.read().decode(errors='ignore')
184+
return all(line.startswith('#') for line in content.splitlines() if line)
185+
186+
187+
def _match(content: str, regex: re.Pattern) -> str | None:
188+
if match := regex.findall(content):
189+
return '\n'.join(match)
190+
return None

0 commit comments

Comments
 (0)