1
1
# Copyright (C) 2015 Brent Baude <[email protected] >
2
+ # Copyright (C) 2019 Dominique Blaze <[email protected] >
2
3
#
3
4
# This library is free software; you can redistribute it and/or
4
5
# modify it under the terms of the GNU Lesser General Public
28
29
import sys
29
30
import docker
30
31
import collections
32
+ from oscap_docker_python .oscap_docker_util_noatomic import OscapDockerScan
33
+ from oscap_docker_python .oscap_docker_common import oscap_chroot , get_dist , \
34
+ OscapResult , OscapError
35
+
36
+ atomic_loaded = False
37
+
38
+
39
+ class AtomicError (Exception ):
40
+ """Exception raised when an error happens in atomic import
41
+ """
42
+ def __init__ (self , message ):
43
+ self .message = message
44
+
31
45
32
46
try :
33
47
from Atomic .mount import DockerMount
34
48
from Atomic .mount import MountError
35
49
import inspect
36
50
37
51
if "mnt_mkdir" not in inspect .getargspec (DockerMount .__init__ ).args :
38
- sys . stderr . write (
52
+ raise AtomicError (
39
53
"\" Atomic.mount.DockerMount\" has been successfully imported but "
40
54
"it doesn't support the mnt_mkdir argument. Please upgrade your "
41
55
"Atomic installation to 1.4 or higher.\n "
42
56
)
43
- sys .exit (1 )
44
57
45
58
# we only care about method names
46
59
member_methods = [
47
60
x [0 ] for x in
48
61
inspect .getmembers (
49
- DockerMount , predicate = lambda member : inspect .isfunction (member ) or inspect .ismethod (member )
62
+ DockerMount , predicate = lambda member :
63
+ inspect .isfunction (member ) or inspect .ismethod (member )
50
64
)
51
65
]
52
66
53
67
if "_clean_temp_container_by_path" not in member_methods :
54
- sys . stderr . write (
68
+ raise AtomicError (
55
69
"\" Atomic.mount.DockerMount\" has been successfully imported but "
56
70
"it doesn't have the _clean_temp_container_by_path method. Please "
57
71
"upgrade your Atomic installation to 1.4 or higher.\n "
58
72
)
59
- sys .exit (1 )
73
+
74
+ # if all imports are ok we can use atomic
75
+ atomic_loaded = True
60
76
61
77
except ImportError :
62
78
sys .stderr .write (
63
79
"Failed to import \" Atomic.mount.DockerMount\" . It seems Atomic has "
64
80
"not been installed.\n "
65
81
)
66
- sys .exit (1 )
67
82
83
+ except AtomicError as err :
84
+ sys .stderr .write (err .message )
68
85
69
- class OscapError (Exception ):
70
- ''' oscap Error'''
71
- pass
72
86
73
-
74
- OscapResult = collections . namedtuple ( "OscapResult" , ( "returncode" , "stdout" , "stderr" ))
87
+ def isAtomicLoaded ():
88
+ return atomic_loaded
75
89
76
90
77
91
class OscapHelpers (object ):
78
92
''' oscap class full of helpers for scanning '''
79
93
CPE = 'oval:org.open-scap.cpe.rhel:def:'
80
- DISTS = ["7" , "6" , "5" ]
94
+ DISTS = ["8" , " 7" , "6" , "5" ]
81
95
82
96
def __init__ (self , cve_input_dir , oscap_binary ):
83
97
self .cve_input_dir = cve_input_dir
@@ -100,21 +114,6 @@ def _rm_tmp_dir(tmp_dir):
100
114
'''
101
115
shutil .rmtree (tmp_dir )
102
116
103
- def _get_dist (self , chroot , target ):
104
- '''
105
- Test the chroot and determine what RHEL dist it is; returns
106
- an integer representing the dist
107
- '''
108
- cpe_dict = '/usr/share/openscap/cpe/openscap-cpe-oval.xml'
109
- if not os .path .exists (cpe_dict ):
110
- raise OscapError ()
111
- for dist in self .DISTS :
112
- result = self .oscap_chroot (chroot , target , 'oval' , 'eval' ,
113
- '--id' , self .CPE + dist , cpe_dict ,
114
- '2>&1' , '>' , '/dev/null' )
115
- if "{0}{1}: true" .format (self .CPE , dist ) in result .stdout :
116
- return dist
117
-
118
117
def _get_target_name_and_config (self , target ):
119
118
'''
120
119
Determines if target is image or container. For images returns full
@@ -143,40 +142,30 @@ def _get_target_name_and_config(self, target):
143
142
except docker .errors .NotFound :
144
143
return "unknown" , {}
145
144
146
- def oscap_chroot (self , chroot_path , target , * oscap_args ):
147
- '''
148
- Wrapper function for executing oscap in a subprocess
149
- '''
150
- os .environ ["OSCAP_PROBE_ARCHITECTURE" ] = platform .processor ()
151
- os .environ ["OSCAP_PROBE_ROOT" ] = os .path .join (chroot_path )
152
- os .environ ["OSCAP_PROBE_OS_NAME" ] = platform .system ()
153
- os .environ ["OSCAP_PROBE_OS_VERSION" ] = platform .release ()
154
- name , conf = self ._get_target_name_and_config (target )
155
- os .environ ["OSCAP_EVALUATION_TARGET" ] = name
156
- for var in config .get ("Env" , []):
157
- vname , val = var .split ("=" , 1 )
158
- os .environ ["OSCAP_OFFLINE_" + vname ] = val
159
- cmd = [self .oscap_binary ] + [x for x in oscap_args ]
160
- oscap_process = subprocess .Popen (cmd , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
161
- oscap_stdout , oscap_stderr = oscap_process .communicate ()
162
- return OscapResult (oscap_process .returncode ,
163
- oscap_stdout .decode ("utf-8" ), oscap_stderr .decode ("utf-8" ))
164
-
165
145
def _scan_cve (self , chroot , target , dist , scan_args ):
166
146
'''
167
147
Scan a chroot for cves
168
148
'''
169
149
cve_input = getInputCVE .dist_cve_name .format (dist )
170
- tmp_tuple = ('oval' , 'eval' ) + tuple (scan_args ) + \
171
- (os .path .join (self .cve_input_dir , cve_input ),)
172
- return self .oscap_chroot (chroot , target , * tmp_tuple )
150
+
151
+ args = ("oval" , "eval" )
152
+ for a in scan_args :
153
+ args += (a ,)
154
+ args += (os .path .join (self .cve_input_dir , cve_input ),)
155
+
156
+ name , conf = self ._get_target_name_and_config (target )
157
+
158
+ return oscap_chroot (chroot , self .oscap_binary , args , name ,
159
+ conf .get ("Env" , []) or [])
173
160
174
161
def _scan (self , chroot , target , scan_args ):
175
162
'''
176
163
Scan a container or image
177
164
'''
178
- tmp_tuple = tuple (scan_args )
179
- return self .oscap_chroot (chroot , target , * tmp_tuple )
165
+
166
+ name , conf = self ._get_target_name_and_config (target )
167
+ return oscap_chroot (chroot , self .oscap_binary , scan_args , name ,
168
+ conf .get ("Env" , []) or [])
180
169
181
170
def resolve_image (self , image ):
182
171
'''
@@ -209,7 +198,7 @@ def mount_image_filesystem():
209
198
_tmp_mnt_dir = DM .mount (image )
210
199
211
200
212
- class OscapScan (object ):
201
+ class OscapAtomicScan (object ):
213
202
def __init__ (self , tmp_dir = tempfile .gettempdir (), mnt_dir = None ,
214
203
hours_old = 2 , oscap_binary = '' ):
215
204
self .tmp_dir = tmp_dir
@@ -264,7 +253,8 @@ def scan_cve(self, image, scan_args):
264
253
chroot = self ._find_chroot_path (_tmp_mnt_dir )
265
254
266
255
# Figure out which RHEL dist is in the chroot
267
- dist = self .helper ._get_dist (chroot , image )
256
+ name , conf = self .helper ._get_target_name_and_config (image )
257
+ dist = get_dist (chroot , self .helper .oscap_binary , conf .get ("Env" , []) or [])
268
258
269
259
if dist is None :
270
260
sys .stderr .write ("{0} is not based on RHEL\n " .format (image ))
0 commit comments