|
| 1 | +import argparse |
| 2 | +import asyncio |
| 3 | +import copy |
| 4 | +import logging |
| 5 | +import os |
| 6 | +import pprint |
| 7 | +import subprocess |
| 8 | +import sys |
| 9 | +import tempfile |
| 10 | + |
| 11 | +from fastapi_sqla import open_async_session |
| 12 | +from immudb_wrapper import ImmudbWrapper |
| 13 | +from sqlalchemy import select |
| 14 | +from sqlalchemy.orm import selectinload |
| 15 | + |
| 16 | +sys.path.append(os.path.dirname(os.path.dirname(__file__))) |
| 17 | + |
| 18 | +from alws import models |
| 19 | +from alws.dependencies import get_async_db_key |
| 20 | +from alws.utils.fastapi_sqla_setup import setup_all |
| 21 | +from alws.utils.pulp_client import PulpClient |
| 22 | + |
| 23 | +logging.basicConfig( |
| 24 | + format="%(asctime)s %(levelname)-8s %(message)s", |
| 25 | + level=logging.INFO, |
| 26 | + datefmt="%Y-%m-%d %H:%M:%S", |
| 27 | + handlers=[ |
| 28 | + logging.StreamHandler(), |
| 29 | + ], |
| 30 | +) |
| 31 | + |
| 32 | + |
| 33 | +def parse_args(): |
| 34 | + parser = argparse.ArgumentParser( |
| 35 | + "build-notarizer", |
| 36 | + description="Creates records in immudb for the further usage in generating SBOM data", |
| 37 | + formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| 38 | + ) |
| 39 | + parser.add_argument( |
| 40 | + '--build-id', |
| 41 | + type=int, |
| 42 | + help='Build ID to process', |
| 43 | + required=True, |
| 44 | + ) |
| 45 | + parser.add_argument( |
| 46 | + '--pulp-host', |
| 47 | + type=str, |
| 48 | + help='Pulp host', |
| 49 | + default=os.getenv('PULP_HOST'), |
| 50 | + required='PULP_HOST' not in os.environ, |
| 51 | + ) |
| 52 | + parser.add_argument( |
| 53 | + '--pulp-username', |
| 54 | + type=str, |
| 55 | + help='Pulp username', |
| 56 | + default=os.getenv('PULP_USER'), |
| 57 | + required='PULP_USER' not in os.environ, |
| 58 | + ) |
| 59 | + parser.add_argument( |
| 60 | + '--pulp-password', |
| 61 | + type=str, |
| 62 | + help='Pulp password', |
| 63 | + default=os.getenv('PULP_PASSWORD'), |
| 64 | + required='PULP_PASSWORD' not in os.environ, |
| 65 | + ) |
| 66 | + parser.add_argument( |
| 67 | + '--immudb-address', |
| 68 | + type=str, |
| 69 | + help='Immudb address', |
| 70 | + default=os.getenv('IMMUDB_ADDRESS'), |
| 71 | + required='IMMUDB_ADDRESS' not in os.environ, |
| 72 | + ) |
| 73 | + parser.add_argument( |
| 74 | + '--immudb-username', |
| 75 | + type=str, |
| 76 | + help='Immudb username', |
| 77 | + required=True, |
| 78 | + ) |
| 79 | + parser.add_argument( |
| 80 | + '--immudb-password', |
| 81 | + type=str, |
| 82 | + help='Immudb password', |
| 83 | + required=True, |
| 84 | + ) |
| 85 | + parser.add_argument( |
| 86 | + '--immudb-database', |
| 87 | + type=str, |
| 88 | + help='Immudb database', |
| 89 | + default=os.getenv('IMMUDB_DATABASE'), |
| 90 | + required='IMMUDB_DATABASE' not in os.environ, |
| 91 | + ) |
| 92 | + return parser.parse_args() |
| 93 | + |
| 94 | + |
| 95 | +def extract_git_metadata( |
| 96 | + task: models.BuildTask, |
| 97 | + immudb_wrapper: ImmudbWrapper, |
| 98 | +) -> dict: |
| 99 | + metadata = { |
| 100 | + 'source_type': 'git', |
| 101 | + 'git_url': task.ref.url, |
| 102 | + 'git_ref': task.ref.git_ref, |
| 103 | + 'git_commit': task.ref.git_commit_hash, |
| 104 | + } |
| 105 | + if not task.ref.git_commit_hash: |
| 106 | + return metadata |
| 107 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 108 | + subprocess.run( |
| 109 | + args=('git', 'clone', task.ref.url, '.'), |
| 110 | + cwd=tmpdir, |
| 111 | + check=True, |
| 112 | + capture_output=True, |
| 113 | + encoding='utf-8', |
| 114 | + ) |
| 115 | + subprocess.run( |
| 116 | + args=('git', 'checkout', task.ref.git_commit_hash), |
| 117 | + cwd=tmpdir, |
| 118 | + check=True, |
| 119 | + capture_output=True, |
| 120 | + encoding='utf-8', |
| 121 | + ) |
| 122 | + result = immudb_wrapper.authenticate_git_repo(tmpdir) |
| 123 | + if result: |
| 124 | + metadata['alma_commit_sbom_hash'] = ( |
| 125 | + result.get('value', {}) |
| 126 | + .get('Metadata', {}) |
| 127 | + .get('git', {}) |
| 128 | + .get('Commit') |
| 129 | + ) |
| 130 | + return metadata |
| 131 | + |
| 132 | + |
| 133 | +async def main( |
| 134 | + need_to_re_notarize: bool = False, |
| 135 | +): |
| 136 | + args = parse_args() |
| 137 | + pulp_client = PulpClient( |
| 138 | + host=args.pulp_host, |
| 139 | + username=args.pulp_username, |
| 140 | + password=args.pulp_password, |
| 141 | + ) |
| 142 | + immudb_wrapper = ImmudbWrapper( |
| 143 | + username=args.immudb_username, |
| 144 | + password=args.immudb_password, |
| 145 | + database=args.immudb_database, |
| 146 | + immudb_address=args.immudb_address, |
| 147 | + ) |
| 148 | + await setup_all() |
| 149 | + async with open_async_session(get_async_db_key()) as session: |
| 150 | + build = await session.execute( |
| 151 | + select(models.Build) |
| 152 | + .where(models.Build.id == args.build_id) |
| 153 | + .options( |
| 154 | + selectinload(models.Build.owner), |
| 155 | + selectinload(models.Build.tasks).options( |
| 156 | + selectinload(models.BuildTask.artifacts), |
| 157 | + selectinload(models.BuildTask.ref), |
| 158 | + ), |
| 159 | + ), |
| 160 | + ) |
| 161 | + build = build.scalars().first() |
| 162 | + for task in build.tasks: |
| 163 | + cas_metadata = { |
| 164 | + 'build_id': task.build_id, |
| 165 | + 'build_arch': task.arch, |
| 166 | + 'built_by': f'{build.owner.username} <{build.owner.email}>', |
| 167 | + 'sbom_api_ver': '0.2', |
| 168 | + } |
| 169 | + if task.ref.git_ref: |
| 170 | + cas_metadata.update( |
| 171 | + **extract_git_metadata(task, immudb_wrapper), |
| 172 | + ) |
| 173 | + # TODO: implement that later |
| 174 | + else: |
| 175 | + logging.warning( |
| 176 | + 'notarizing for projects from SRPMs is not implemented yet, skipping' |
| 177 | + ) |
| 178 | + continue |
| 179 | + for artifact in task.artifacts: |
| 180 | + if ( |
| 181 | + artifact.type != 'rpm' |
| 182 | + or artifact.cas_hash |
| 183 | + and not need_to_re_notarize |
| 184 | + ): |
| 185 | + continue |
| 186 | + try: |
| 187 | + artifact_info = await pulp_client.get_artifact( |
| 188 | + artifact.href, |
| 189 | + exclude_fields=['changelogs'], |
| 190 | + ) |
| 191 | + except Exception: |
| 192 | + # TODO: we probably should actualize pulp_hrefs in such cases |
| 193 | + logging.warning( |
| 194 | + 'cannot get pkg %s info, skipping', artifact.name |
| 195 | + ) |
| 196 | + continue |
| 197 | + if not artifact_info: |
| 198 | + raise ValueError('Cannot get artifact info') |
| 199 | + metadata = { |
| 200 | + 'Name': artifact_info['location_href'], |
| 201 | + 'Kind': 'file', |
| 202 | + 'Size': immudb_wrapper.get_size_format( |
| 203 | + artifact_info['size_package'], |
| 204 | + ), |
| 205 | + 'Hash': artifact_info['sha256'], |
| 206 | + 'Signer': immudb_wrapper.username, |
| 207 | + 'Metadata': { |
| 208 | + **copy.deepcopy(cas_metadata), |
| 209 | + 'build_host': artifact_info['rpm_buildhost'], |
| 210 | + 'name': artifact_info['name'], |
| 211 | + 'epoch': artifact_info['epoch'], |
| 212 | + 'version': artifact_info['version'], |
| 213 | + 'release': artifact_info['release'], |
| 214 | + 'arch': artifact_info['arch'], |
| 215 | + 'sourcerpm': ( |
| 216 | + None |
| 217 | + if artifact_info['arch'] == 'src' |
| 218 | + else artifact_info['rpm_sourcerpm'] |
| 219 | + ), |
| 220 | + }, |
| 221 | + } |
| 222 | + logging.debug('immudb payload:\n%s', pprint.pformat(metadata)) |
| 223 | + immudb_wrapper.notarize(metadata['Hash'], metadata) |
| 224 | + artifact.cas_hash = metadata['Hash'] |
| 225 | + logging.info('artifact %s processed', artifact.name) |
| 226 | + await session.commit() |
| 227 | + |
| 228 | + |
| 229 | +asyncio.run(main()) |
0 commit comments