Skip to content

Skip base layer #289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions examples/client/multi-image/manage_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
Sub-projects could be specified in a text file with -ssf --subproject-spec-file parameter
Each line will have to contain full image specification.
Specification will be parsed and image name prefixed by -str parameter will be used as a sub-project name
In this mode any image that is not residing on ciena.com repository will be skipped.
In this mode any image that is not residing on //your domain for example abc.com// repository will be skipped.
If -str parameter is empty, Project Name will be used instead.

Container image name scanned will be written into project version nickname field
Expand Down Expand Up @@ -84,6 +84,7 @@ def __init__(self, args):
else:
self.init_project_data(args)
self.serialize = args.serialize
self.skip_group=args.skip_group

def connect(self):
self.client = Client(base_url=self.base_url, token=self.access_token, verify=self.no_verify, timeout=60.0, retries=4)
Expand Down Expand Up @@ -314,15 +315,17 @@ def process_text_spec_file(self,args):
prefix = args.string_to_put_in_front_of_subproject_name
if not prefix:
prefix = args.project_name
with open(args.subproject_spec_file, "r") as f:
lines = f.read().splitlines()
for line in lines:
image_name = line.split('/')[-1].split(':')[0] # Don't look at me, you wrote it!
sub_project_name = "_".join((prefix, image_name))
spec_line = ":".join((sub_project_name, line))
# if "ciena.com" in spec_line:
project_list.append(spec_line)
return (project_list)
if args.subproject_spec_file is not None :
with open(args.subproject_spec_file, "r") as f:
lines = f.read().splitlines()
for line in lines:
#print(line)
image_name = line.split('/')[-1].split(':')[0] # Don't look at me, you wrote it!
sub_project_name = "_".join((prefix, image_name))
spec_line = ":".join((sub_project_name, line))
# if "//your domain ex. abc.com //" in spec_line:
project_list.append(spec_line)
return (project_list)

def get_child_spec_list(self,args):
if args.subproject_list:
Expand Down Expand Up @@ -459,7 +462,8 @@ def scan_container_images(self):
parent_version,
detect_options,
hub=hub,
binary=self.binary
binary=self.binary,
skip_group = self.skip_group
)
child['scan_results'] = results
except Exception as e:
Expand Down Expand Up @@ -537,6 +541,7 @@ def parse_command_args():
parser.add_argument("-ifm", "--individual-file-matching", action='store_true', help="Turn Individual file matching on")
parser.add_argument("--reprocess-run-file", help="Reprocess Failures from previous run report.")
parser.add_argument("--serialize", action='store_true', help="Serialize scan submissions by adding --detect.wait.for.results=true to scan invocations")
parser.add_argument("--skip-group", required=False, help="exclude layers belog to specific groups, ex. 'base' ")
args = parser.parse_args()
if not args.reprocess_run_file and not (args.project_name and args.version_name):
parser.error("[ -p/--project-name and -pv/--version-name ] or --reprocess-run-file are required")
Expand Down Expand Up @@ -565,3 +570,5 @@ def main():

if __name__ == "__main__":
sys.exit(main())


73 changes: 42 additions & 31 deletions examples/client/multi-image/scan_docker_image_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ContainerImageScanner():

def __init__(
self, hub, container_image_name, workdir='/tmp/workdir',
grouping=None, base_image=None, dockerfile=None, detect_options=None):
grouping=None, base_image=None, dockerfile=None, detect_options=None, skip_group=None):
self.hub = hub
self.hub_detect = Detector(hub)
self.docker = DockerWrapper(workdir)
Expand All @@ -251,6 +251,11 @@ def __init__(
if detect_options:
self.extra_options = detect_options.split(" ")
self.binary = False
if skip_group:
self.skip_group=skip_group.split(",")
else:
self.skip_group=[]


def prepare_container_image(self):
self.docker.initdir()
Expand Down Expand Up @@ -344,7 +349,7 @@ def process_container_image_by_base_image_info(self):
layer['name'] = self.project_name + "_" + self.project_version + "_layer_" + str(num)
self.layers.append(layer)
num = num + 1
# print (json.dumps(self.layers, indent=4))
#print (json.dumps(self.layers, indent=4))

def process_oci_container_image_by_user_defined_groups(self):
self.manifest = self.docker.read_manifest()
Expand Down Expand Up @@ -373,7 +378,7 @@ def process_oci_container_image_by_user_defined_groups(self):
layer['name'] = self.project_name + "_" + self.project_version + "_layer_" + str(layer['index'])
if not layer.get('empty_layer', False):
layer['path'] = layer_paths.pop(0)
# print (json.dumps(self.layers, indent=4))
#print (json.dumps(self.layers, indent=4))

def get_group_name(self, groups, index):
group_name = 'undefined'
Expand Down Expand Up @@ -408,34 +413,39 @@ def process_oci_container_image(self):

def submit_layer_scans(self):
for layer in self.layers:
if not layer.get('empty_layer', False):
options = []
options.append('--detect.project.name={}'.format(layer['project_name']))
options.append('--detect.project.version.name="{}"'.format(layer['project_version']))
options.append('--detect.code.location.name={}_{}_code_{}'.format(layer['name'],self.image_version,layer['path']))
if self.binary:
options.append('--detect.tools=BINARY_SCAN')
options.append('--detect.binary.scan.file.path={}/{}'.format(self.docker.imagedir, layer['path']))
else:
options.append('--detect.tools=SIGNATURE_SCAN')
if self.oci_layout:
options.append('--detect.source.path={}/{}'.format(self.docker.imagedir, layer['path']))

#print(f"layer group name={layer['group_name']} skip_group ={self.skip_group}")

if layer['group_name'] not in self.skip_group:

if not layer.get('empty_layer', False):
options = []
options.append('--detect.project.name={}'.format(layer['project_name']))
options.append('--detect.project.version.name="{}"'.format(layer['project_version']))
options.append('--detect.code.location.name={}_{}_code_{}'.format(layer['name'],self.image_version,layer['path']))
if self.binary:
options.append('--detect.tools=BINARY_SCAN')
options.append('--detect.binary.scan.file.path={}/{}'.format(self.docker.imagedir, layer['path']))
else:
options.append('--detect.source.path={}/{}'.format(self.docker.imagedir, layer['path'].split('/')[0]))
if self.base_image or self.grouping or self.dockerfile:
options.extend(self.adorn_extra_options(layer))
else:
options.extend(self.extra_options)
logging.debug(f"Submitting scan for {layer['name']}")
completed = self.hub_detect.detect_run(options)
scan_results = dict()
for key, value in vars(completed).items():
if type(value) is bytes:
scan_results[key] = value.decode('utf-8')
options.append('--detect.tools=SIGNATURE_SCAN')
if self.oci_layout:
options.append('--detect.source.path={}/{}'.format(self.docker.imagedir, layer['path']))
else:
options.append('--detect.source.path={}/{}'.format(self.docker.imagedir, layer['path'].split('/')[0]))
if self.base_image or self.grouping or self.dockerfile:
options.extend(self.adorn_extra_options(layer))
else:
scan_results[key] = value
layer['scan_results'] = scan_results
logging.debug(f"Detect run for {layer['name']} completed with returncode {completed.returncode}")
options.extend(self.extra_options)
logging.debug(f"Submitting scan for {layer['name']}")
completed = self.hub_detect.detect_run(options)
scan_results = dict()
for key, value in vars(completed).items():
if type(value) is bytes:
scan_results[key] = value.decode('utf-8')
else:
scan_results[key] = value
layer['scan_results'] = scan_results
logging.debug(f"Detect run for {layer['name']} completed with returncode {completed.returncode}")

def adorn_extra_options(self, layer):
result = list()
Expand Down Expand Up @@ -486,15 +496,15 @@ def get_base_layers(self):

def scan_container_image(
imagespec, grouping=None, base_image=None, dockerfile=None,
project_name=None, project_version=None, detect_options=None, hub=None, binary=False):
project_name=None, project_version=None, detect_options=None, hub=None, binary=False, skip_group=None ):

if hub:
hub = hub
else:
hub = HubInstance()
scanner = ContainerImageScanner(
hub, imagespec, grouping=grouping, base_image=base_image,
dockerfile=dockerfile, detect_options=detect_options)
dockerfile=dockerfile, detect_options=detect_options, skip_group=skip_group)
if project_name:
scanner.project_name = project_name
if project_version:
Expand All @@ -507,6 +517,7 @@ def scan_container_image(
if binary:
scanner.binary = True
logging.info(f"Scanning image {imagespec}")

scanner.prepare_container_image()
scanner.process_container_image()
scanner.submit_layer_scans()
Expand Down
Loading