This repository was archived by the owner on Jul 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 54
Expand file tree
/
Copy pathimage.py
More file actions
424 lines (350 loc) · 17.5 KB
/
image.py
File metadata and controls
424 lines (350 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
import sys
import json
import click
import logging
import time
import base64
import anchorecli.clients.apiexternal
import anchorecli.cli.utils
from collections import OrderedDict
config = {}
_logger = logging.getLogger(__name__)
@click.group(name='image', short_help='Image operations')
@click.pass_obj
def image(ctx_config):
global config
config = ctx_config
try:
anchorecli.cli.utils.check_access(config)
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image', {}, err))
sys.exit(2)
@image.command(short_help="Wait for an image to analyze")
@click.argument('input_image')
@click.option('--timeout', type=float, default=-1.0, help="Time to wait, in seconds. If < 0, wait forever, if 0, do not wait (default=-1)")
@click.option('--interval', type=float, default=5.0, help="Interval between checks, in seconds (default=5)")
def wait(input_image, timeout, interval):
"""
Wait for an image to go to analyzed or analysis_failed status with a specific timeout
:param input_image:
:param timeout:
:return:
"""
ecode = 0
try:
itype = anchorecli.cli.utils.discover_inputimage_format(config, input_image)
image = input_image
#timeout = float(timeout)
t1 = time.time()
while timeout < 0 or time.time() - t1 < timeout:
_logger.debug("discovery from input: %s : %s", str(itype), str(image))
if itype == 'tag':
ret = anchorecli.clients.apiexternal.get_image(config, tag=image, history=False)
elif itype == 'imageid':
ret = anchorecli.clients.apiexternal.get_image(config, image_id=image, history=False)
elif itype == 'imageDigest':
ret = anchorecli.clients.apiexternal.get_image(config, imageDigest=image, history=False)
else:
ecode = 1
raise Exception("cannot use input image string: no discovered imageDigest")
if ret['payload'] and ret['payload'][0]['analysis_status'] in ['analyzed', 'analysis_failed']:
break
else:
if not ret['payload']:
raise Exception('Requested image not found in system')
print('Status: {}'.format(ret['payload'][0]['analysis_status']))
if timeout > 0:
print('Waiting {} seconds for next retry. Total timeout remaining: {}'.format(interval, int(timeout - (time.time() - t1))))
else:
print('Waiting {} seconds for next retry.'.format(interval))
time.sleep(interval)
else:
raise Exception('Timed-out waiting for analyis status to reach terminal state (analyzed or analysis_failed)')
if ret:
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_get', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_get', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='add', short_help="Add an image")
@click.argument('input_image', nargs=1)
@click.option('--force', is_flag=True, help="Force reanalysis of image")
@click.option('--dockerfile', type=click.Path(exists=True), metavar='<Dockerfile>', help="Submit image's dockerfile for analysis")
@click.option('--annotation', nargs=1, multiple=True)
@click.option('--noautosubscribe', is_flag=True, help="If set, instruct the engine to disable tag_update subscription for the added tag.")
def add(input_image, force, dockerfile, annotation, noautosubscribe):
"""
INPUT_IMAGE: Input image can be in the following formats: registry/repo:tag
"""
ecode = 0
try:
itype = anchorecli.cli.utils.discover_inputimage_format(config, input_image)
dockerfile_contents = None
if dockerfile:
with open(dockerfile, 'r') as FH:
dockerfile_contents = base64.b64encode(FH.read().encode('utf-8')).decode('utf-8')
autosubscribe = not noautosubscribe
if itype == 'tag':
annotations = {}
if annotation:
for a in annotation:
try:
(k,v) = a.split('=', 1)
if k and v:
annotations[k] = v
else:
raise Exception("found null in key or value")
except Exception:
raise Exception("annotation format error - annotations must be of the form (--annotation key=value), found: {}".format(a))
ret = anchorecli.clients.apiexternal.add_image(config, tag=input_image, force=force, dockerfile=dockerfile_contents, annotations=annotations, autosubscribe=autosubscribe)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_add', {}, ret['payload']))
else:
raise Exception( json.dumps(ret['error'], indent=4))
else:
raise Exception("can only add a tag")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_add', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='import', short_help="Import an image from anchore scanner export")
@click.option('--infile', required=True, type=click.Path(exists=True), metavar='<file.json>')
def import_image(infile):
ecode = 0
try:
with open(infile, 'r') as FH:
anchore_data = json.loads(FH.read())
ret = anchorecli.clients.apiexternal.import_image(config, anchore_data=anchore_data)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_import', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_import', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='get', short_help="Get an image")
@click.argument('input_image', nargs=1)
@click.option('--show-history', is_flag=True, help="Show history of images that match the input image, if input image is of the form registry/repo:tag")
def get(input_image, show_history):
"""
INPUT_IMAGE: Input image can be in the following formats: Image Digest, ImageID or registry/repo:tag
"""
ecode = 0
try:
itype = anchorecli.cli.utils.discover_inputimage_format(config, input_image)
image = input_image
_logger.debug("discovery from input: %s : %s", str(itype), str(image))
if itype == 'tag':
ret = anchorecli.clients.apiexternal.get_image(config, tag=image, history=show_history)
elif itype == 'imageid':
ret = anchorecli.clients.apiexternal.get_image(config, image_id=image, history=False)
elif itype == 'imageDigest':
ret = anchorecli.clients.apiexternal.get_image(config, imageDigest=image, history=False)
else:
ecode = 1
raise Exception("cannot use input image string: no discovered imageDigest")
if ret:
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_get', {}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_get', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='list', short_help="List all images")
@click.option('--full', is_flag=True, help="Show full row output for each image")
@click.option('--show-all', is_flag=True, help="Show all images in the system instead of just the latest for a given tag")
def imagelist(full, show_all):
ecode = 0
try:
ret = anchorecli.clients.apiexternal.get_images(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_list', {'full':full, 'show_all':show_all}, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_list', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='content', short_help="Get contents of image")
@click.argument('input_image', nargs=1)
@click.argument('content_type', nargs=1, required=False)
def query_content(input_image, content_type):
"""
INPUT_IMAGE: Input image can be in the following formats: Image Digest, ImageID or registry/repo:tag
CONTENT_TYPE: The content type can be one of the following types:
- os: Operating System Packages
- npm: Node.JS NPM Module
- gem: Ruby GEM
- files: Files
"""
ecode = 0
try:
itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)
_logger.debug("discovery from input: %s : %s : %s", str(itype), str(image), str(imageDigest))
if not imageDigest:
ecode = 1
raise Exception("cannot use input image string (no discovered imageDigest)")
else:
ret = anchorecli.clients.apiexternal.query_image(config, imageDigest=imageDigest, query_group='content', query_type=content_type)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret:
if ret['success']:
output = anchorecli.cli.utils.format_output(config, 'image_content', {'query_type':content_type}, ret['payload'])
if not output:
raise Exception('There is no image content (%s) to provide for %s' % (content_type, input_image))
print(output)
else:
raise Exception (json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_content', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='metadata', short_help="Get metadata about an image")
@click.argument('input_image', nargs=1)
@click.argument('metadata_type', nargs=1, required=False)
def query_metadata(input_image, metadata_type):
"""
INPUT_IMAGE: Input image can be in the following formats: Image Digest, ImageID or registry/repo:tag
METADATA_TYPE: The metadata type can be one of the types returned by running without a type specified
"""
ecode = 0
try:
itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)
_logger.debug("discovery from input: %s : %s : %s", str(itype), str(image), str(imageDigest))
if not imageDigest:
ecode = 1
raise Exception("cannot use input image string (no discovered imageDigest)")
else:
ret = anchorecli.clients.apiexternal.query_image(config, imageDigest=imageDigest, query_group='metadata', query_type=metadata_type)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret:
if ret['success']:
if metadata_type in ['manifest', 'docker_history']:
o = json.loads(anchorecli.cli.utils.format_output(config, 'image_metadata', {'query_type':metadata_type}, ret['payload']))
print(json.dumps(o, indent=4))
else:
print(anchorecli.cli.utils.format_output(config, 'image_metadata', {'query_type':metadata_type}, ret['payload']))
else:
raise Exception (json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_metadata', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='vuln', short_help="Get image vulnerabilities")
@click.argument('input_image', nargs=1)
@click.argument('vuln_type', nargs=1, required=False)
@click.option('--vendor-only', default=True, type=bool, help="Show only vulnerabilities marked by upstream vendor as applicable (default=True)")
def query_vuln(input_image, vuln_type, vendor_only):
"""
INPUT_IMAGE: Input image can be in the following formats: Image Digest, ImageID or registry/repo:tag
VULN_TYPE: VULN_TYPE: Vulnerability type can be one of the following types:
- os: CVE/distro vulnerabilities against operating system packages
"""
ecode = 0
try:
itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)
if not imageDigest:
ecode = 1
raise Exception("cannot use input image string (no discovered imageDigest)")
else:
ret = anchorecli.clients.apiexternal.query_image(config, imageDigest=imageDigest, query_group='vuln', query_type=vuln_type, vendor_only=vendor_only)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret:
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_vuln', {'query_type': vuln_type}, ret['payload']))
else:
if 'analysis_status: analyzing' in ret['error']['message']:
ecode = 100
elif 'analysis_status: not_analyzed' in ret['error']['message']:
ecode = 101
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
except Exception as err:
print(anchorecli.cli.utils.format_error_output(config, 'image_vuln', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)
@image.command(name='del', short_help="Delete one or more images")
@click.argument('input_images', required=False, nargs=-1)
@click.option('--force', is_flag=True, help="Force deletion of image by cancelling any subscription/notification settings prior to image delete")
@click.option('--all', is_flag=True, help="Delete all images")
def delete(input_images, force, all):
"""
INPUT_IMAGE: Input image can be in the following formats: Image Digest, ImageID or registry/repo:tag
"""
ecode = 0
try:
image_digests = set() # gathering image digests to be bundled into delete request
input_list = list() # for preserving same order as input in the output
if all:
ret = anchorecli.clients.apiexternal.get_images(config)
ecode = anchorecli.cli.utils.get_ecode(ret)
if not ret['success']:
raise Exception(json.dumps(ret['error'], indent=4))
for image in ret['payload']:
if image['imageDigest']:
image_digests.add(image['imageDigest'])
for image_detail in image['image_detail']:
fulltag = image_detail.pop('registry', "None") + "/" + image_detail.pop('repo', "None") + ":" + image_detail.pop('tag', "None")
input_list.append((fulltag, image['imageDigest']))
else:
if not input_images:
raise Exception("Missing argument INPUT_IMAGE")
for input_image in OrderedDict.fromkeys(input_images).keys():
itype, image, imageDigest = anchorecli.cli.utils.discover_inputimage(config, input_image)
if imageDigest:
# ret = anchorecli.clients.apiexternal.delete_image(config, imageDigest=imageDigest, force=force)
input_list.append((image, imageDigest))
image_digests.add(imageDigest)
else:
input_list.append((image, imageDigest))
if image_digests:
# TODO batch them into groups of 100 or so?
ret = anchorecli.clients.apiexternal.delete_images(config, imageDigests=list(image_digests), force=force)
ecode = anchorecli.cli.utils.get_ecode(ret)
if ret:
if ret['success']:
print(anchorecli.cli.utils.format_output(config, 'image_delete', input_list, ret['payload']))
else:
raise Exception(json.dumps(ret['error'], indent=4))
else:
raise Exception("operation failed with empty response")
elif input_list:
print(anchorecli.cli.utils.format_output(config, 'image_delete', input_list, []))
else:
raise Exception("operation failed")
except Exception as err:
if all:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete_all', {}, err))
else:
print(anchorecli.cli.utils.format_error_output(config, 'image_delete', {}, err))
if not ecode:
ecode = 2
anchorecli.cli.utils.doexit(ecode)