diff --git a/scripts/exporters/base_exporter.py b/scripts/exporters/base_exporter.py index 1929eb0a..92cf1fb9 100644 --- a/scripts/exporters/base_exporter.py +++ b/scripts/exporters/base_exporter.py @@ -1,12 +1,14 @@ import asyncio import logging +import os import re import shutil import sys import urllib.parse from pathlib import Path -from typing import List, Literal, Optional, Tuple +from typing import List, Literal, Optional, Tuple, Union +import aiohttp from plumbum import local from sqlalchemy import select @@ -54,6 +56,9 @@ def __init__( logging.StreamHandler(stream=sys.stdout), ], ) + self.web_server_headers = { + "Authorization": f"Bearer {settings.albs_jwt_token}", + } def regenerate_repo_metadata(self, repo_path: str): partial_path = re.sub( @@ -198,3 +203,107 @@ async def export_repositories(self, repo_ids: List[int]) -> List[str]: *(self._export_repository(e) for e in exporters) ) return [path for path in results if path] + + async def get_sign_keys(self): + endpoint = "sign-keys/" + return await self.make_request("GET", endpoint) + + async def get_sign_server_token(self) -> str: + body = { + 'email': settings.sign_server_username, + 'password': settings.sign_server_password, + } + endpoint = 'token' + method = 'POST' + response = await self.make_request( + method=method, + endpoint=endpoint, + body=body, + send_to='sign_server', + ) + return response['token'] + + async def sign_repomd_xml(self, path_to_file: str, key_id: str, token: str): + endpoint = "sign" + result = {"asc_content": None, "error": None} + try: + response = await self.make_request( + "POST", + endpoint, + params={"keyid": key_id}, + data={"file": Path(path_to_file).read_bytes()}, + user_headers={"Authorization": f"Bearer {token}"}, + send_to="sign_server", + ) + result["asc_content"] = response + except Exception as err: + result['error'] = err + return result + + async def repomd_signer(self, repodata_path, key_id, token): + string_repodata_path = str(repodata_path) + if key_id is None: + self.logger.info( + "Cannot sign repomd.xml in %s, missing GPG key", + string_repodata_path, + ) + return + + file_path = os.path.join(repodata_path, "repomd.xml") + result = await self.sign_repomd_xml(file_path, key_id, token) + self.logger.info('PGP key id: %s', key_id) + result_data = result.get("asc_content") + if result_data is None: + self.logger.error( + "repomd.xml in %s is failed to sign:\n%s", + string_repodata_path, + result["error"], + ) + return + + repodata_path = os.path.join(repodata_path, "repomd.xml.asc") + with open(repodata_path, "w") as file: + file.writelines(result_data) + self.logger.info("repomd.xml in %s is signed", string_repodata_path) + + async def make_request( + self, + method: str, + endpoint: str, + params: Optional[dict] = None, + body: Optional[dict] = None, + user_headers: Optional[dict] = None, + data: Optional[list] = None, + send_to: Literal['web_server', 'sign_server'] = 'web_server', + ) -> Union[dict, str]: + if send_to == 'web_server': + headers = {**self.web_server_headers} + full_url = urllib.parse.urljoin(settings.albs_api_url, endpoint) + elif send_to == 'sign_server': + headers = {} + full_url = urllib.parse.urljoin( + settings.sign_server_api_url, + endpoint, + ) + else: + raise ValueError( + "'send_to' param must be either 'web_server' or 'sign_server'" + ) + + if user_headers: + headers.update(user_headers) + + async with aiohttp.ClientSession( + headers=headers, + raise_for_status=True, + ) as session: + async with session.request( + method, + full_url, + json=body, + params=params, + data=data, + ) as response: + if response.headers['Content-Type'] == 'application/json': + return await response.json() + return await response.text() diff --git a/scripts/exporters/packages_exporter.py b/scripts/exporters/packages_exporter.py index f87ae46a..51c69e83 100644 --- a/scripts/exporters/packages_exporter.py +++ b/scripts/exporters/packages_exporter.py @@ -6,12 +6,11 @@ import pwd import re import sys -import urllib.parse from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from time import time -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Tuple import aiohttp import jmespath @@ -157,9 +156,6 @@ def __init__( export_path=export_path, ) - self.web_server_headers = { - "Authorization": f"Bearer {settings.albs_jwt_token}", - } self.osv_dir = osv_dir self.current_user = self.get_current_username() self.export_error_file = os.path.abspath( @@ -200,69 +196,6 @@ def process_osv_data( ) self.logger.debug("OSV data are generated") - async def make_request( - self, - method: str, - endpoint: str, - params: Optional[dict] = None, - body: Optional[dict] = None, - user_headers: Optional[dict] = None, - data: Optional[list] = None, - send_to: Literal['web_server', 'sign_server'] = 'web_server', - ) -> Union[dict, str]: - if send_to == 'web_server': - headers = {**self.web_server_headers} - full_url = urllib.parse.urljoin(settings.albs_api_url, endpoint) - elif send_to == 'sign_server': - headers = {} - full_url = urllib.parse.urljoin( - settings.sign_server_api_url, - endpoint, - ) - else: - raise ValueError( - "send_to parameter must be either web_server or sign_server" - ) - - if user_headers: - headers.update(user_headers) - - async with aiohttp.ClientSession( - headers=headers, - raise_for_status=True, - ) as session: - async with session.request( - method, - full_url, - json=body, - params=params, - data=data, - ) as response: - if response.headers['Content-Type'] == 'application/json': - return await response.json() - return await response.text() - - async def sign_repomd_xml(self, path_to_file: str, key_id: str, token: str): - endpoint = "sign" - result = {"asc_content": None, "error": None} - try: - response = await self.make_request( - "POST", - endpoint, - params={"keyid": key_id}, - data={"file": Path(path_to_file).read_bytes()}, - user_headers={"Authorization": f"Bearer {token}"}, - send_to="sign_server", - ) - result["asc_content"] = response - except Exception as err: - result['error'] = err - return result - - async def get_sign_keys(self): - endpoint = "sign-keys/" - return await self.make_request("GET", endpoint) - # TODO: Use direct function call to alws.crud.errata_get_oval_xml async def get_oval_xml( self, @@ -315,32 +248,6 @@ async def generate_rss(self, platform, modern_cache): return feed.rss_str(pretty=True).decode('utf-8') - async def repomd_signer(self, repodata_path, key_id, token): - string_repodata_path = str(repodata_path) - if key_id is None: - self.logger.info( - "Cannot sign repomd.xml in %s, missing GPG key", - string_repodata_path, - ) - return - - file_path = os.path.join(repodata_path, "repomd.xml") - result = await self.sign_repomd_xml(file_path, key_id, token) - self.logger.info('PGP key id: %s', key_id) - result_data = result.get("asc_content") - if result_data is None: - self.logger.error( - "repomd.xml in %s is failed to sign:\n%s", - string_repodata_path, - result["error"], - ) - return - - repodata_path = os.path.join(repodata_path, "repomd.xml.asc") - with open(repodata_path, "w") as file: - file.writelines(result_data) - self.logger.info("repomd.xml in %s is signed", string_repodata_path) - def check_rpms_signature(self, repository_path: str, sign_keys: list): self.logger.info("Checking signature for %s repo", repository_path) key_ids_lower = [i.keyid.lower() for i in sign_keys] @@ -525,21 +432,6 @@ async def export_repos_from_release( exported_paths = await self.export_repositories(repo_ids) return exported_paths, db_release.platform_id - async def get_sign_server_token(self) -> str: - body = { - 'email': settings.sign_server_username, - 'password': settings.sign_server_password, - } - endpoint = 'token' - method = 'POST' - response = await self.make_request( - method=method, - endpoint=endpoint, - body=body, - send_to='sign_server', - ) - return response['token'] - async def sign_repodata( exporter: PackagesExporter, @@ -548,8 +440,6 @@ async def sign_repodata( db_sign_keys: list, key_id_by_platform: Optional[str] = None, ): - repodata_paths = [] - tasks = [] token = await exporter.get_sign_server_token() @@ -560,8 +450,6 @@ async def sign_repodata( if not os.path.exists(repo_path): continue - repodata_paths.append(repodata) - key_id = key_id_by_platform or None for platform_id, platform_repos in platforms_dict.items(): for repo_export_path in platform_repos: diff --git a/scripts/exporters/products_exporter.py b/scripts/exporters/products_exporter.py index 7bd9de06..adab4e3c 100644 --- a/scripts/exporters/products_exporter.py +++ b/scripts/exporters/products_exporter.py @@ -1,5 +1,6 @@ import argparse import asyncio +import os import sys from pathlib import Path from typing import List, Literal @@ -66,6 +67,13 @@ def parse_args(): required=False, help="Method of exporting (choices: write, hardlink, symlink)", ) + parser.add_argument( + "-sw", + "--sign-with", + type=str, + required=True, + help="GPG key name to use when signing repodata.", + ) parser.add_argument( "-l", "--log", @@ -137,6 +145,17 @@ async def export_product_repos( list({repo.id for repo in product.repositories}) ) + async def get_sign_key_id(self, key_id): + sign_keys = await self.get_sign_keys() + return next( + ( + sign_key['keyid'] + for sign_key in sign_keys + if sign_key['keyid'] == key_id + ), + None, + ) + async def repo_post_processing( exporter: ProductExporter, @@ -152,6 +171,27 @@ async def repo_post_processing( return result +async def sign_repodata( + exporter: ProductExporter, + exported_paths: List[str], + key_id: str, +): + tasks = [] + token = await exporter.get_sign_server_token() + + for repo_path in exported_paths: + path = Path(repo_path) + parent_dir = path.parent + repodata = parent_dir / "repodata" + if not os.path.exists(repo_path): + continue + + exporter.logger.info('Key ID: %s', str(key_id)) + tasks.append(exporter.repomd_signer(repodata, key_id, token)) + + await asyncio.gather(*tasks) + + async def main(): args = parse_args() await setup_all() @@ -161,6 +201,14 @@ async def main(): export_method=args.export_method, log_file_path=args.log, ) + + sign_key_id = None + if args.sign_with: + sign_key_id = await exporter.get_sign_key_id(args.sign_with) + if not sign_key_id: + err = "Couldn't retrieve the '{args.sign_with}' sign key" + raise Exception(f'Aborting product export, error was: {err}') + pulp_client.PULP_SEMAPHORE = asyncio.Semaphore(10) exported_paths = await exporter.export_product_repos( product_name=args.product, @@ -171,6 +219,9 @@ async def main(): *(repo_post_processing(exporter, path) for path in exported_paths) ) + if sign_key_id: + await sign_repodata(exporter, exported_paths, sign_key_id) + if __name__ == '__main__': asyncio.run(main())