Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion scripts/exporters/base_exporter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import asyncio
import logging
import os
import re
import shutil
import sys
import urllib.parse
from pathlib import Path
from typing import List, Literal, Optional, Tuple
from typing import List, Literal, Optional, Tuple, Union

import aiohttp
from plumbum import local
from sqlalchemy import select

Expand Down Expand Up @@ -54,6 +56,9 @@ def __init__(
logging.StreamHandler(stream=sys.stdout),
],
)
self.web_server_headers = {
"Authorization": f"Bearer {settings.albs_jwt_token}",
}

def regenerate_repo_metadata(self, repo_path: str):
partial_path = re.sub(
Expand Down Expand Up @@ -198,3 +203,107 @@ async def export_repositories(self, repo_ids: List[int]) -> List[str]:
*(self._export_repository(e) for e in exporters)
)
return [path for path in results if path]

async def get_sign_keys(self):
endpoint = "sign-keys/"
return await self.make_request("GET", endpoint)

async def get_sign_server_token(self) -> str:
body = {
'email': settings.sign_server_username,
'password': settings.sign_server_password,
}
endpoint = 'token'
method = 'POST'
response = await self.make_request(
method=method,
endpoint=endpoint,
body=body,
send_to='sign_server',
)
return response['token']

async def sign_repomd_xml(self, path_to_file: str, key_id: str, token: str):
endpoint = "sign"
result = {"asc_content": None, "error": None}
try:
response = await self.make_request(
"POST",
endpoint,
params={"keyid": key_id},
data={"file": Path(path_to_file).read_bytes()},
user_headers={"Authorization": f"Bearer {token}"},
send_to="sign_server",
)
result["asc_content"] = response
except Exception as err:
result['error'] = err
return result

async def repomd_signer(self, repodata_path, key_id, token):
string_repodata_path = str(repodata_path)
if key_id is None:
self.logger.info(
"Cannot sign repomd.xml in %s, missing GPG key",
string_repodata_path,
)
return

file_path = os.path.join(repodata_path, "repomd.xml")
result = await self.sign_repomd_xml(file_path, key_id, token)
self.logger.info('PGP key id: %s', key_id)
result_data = result.get("asc_content")
if result_data is None:
self.logger.error(
"repomd.xml in %s is failed to sign:\n%s",
string_repodata_path,
result["error"],
)
return

repodata_path = os.path.join(repodata_path, "repomd.xml.asc")
with open(repodata_path, "w") as file:
file.writelines(result_data)
self.logger.info("repomd.xml in %s is signed", string_repodata_path)

async def make_request(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
body: Optional[dict] = None,
user_headers: Optional[dict] = None,
data: Optional[list] = None,
send_to: Literal['web_server', 'sign_server'] = 'web_server',
) -> Union[dict, str]:
if send_to == 'web_server':
headers = {**self.web_server_headers}
full_url = urllib.parse.urljoin(settings.albs_api_url, endpoint)
elif send_to == 'sign_server':
headers = {}
full_url = urllib.parse.urljoin(
settings.sign_server_api_url,
endpoint,
)
else:
raise ValueError(
"'send_to' param must be either 'web_server' or 'sign_server'"
)

if user_headers:
headers.update(user_headers)

async with aiohttp.ClientSession(
headers=headers,
raise_for_status=True,
) as session:
async with session.request(
method,
full_url,
json=body,
params=params,
data=data,
) as response:
if response.headers['Content-Type'] == 'application/json':
return await response.json()
return await response.text()
114 changes: 1 addition & 113 deletions scripts/exporters/packages_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
import pwd
import re
import sys
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from time import time
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from typing import Any, Dict, List, Literal, Optional, Tuple

import aiohttp
import jmespath
Expand Down Expand Up @@ -157,9 +156,6 @@ def __init__(
export_path=export_path,
)

self.web_server_headers = {
"Authorization": f"Bearer {settings.albs_jwt_token}",
}
self.osv_dir = osv_dir
self.current_user = self.get_current_username()
self.export_error_file = os.path.abspath(
Expand Down Expand Up @@ -200,69 +196,6 @@ def process_osv_data(
)
self.logger.debug("OSV data are generated")

async def make_request(
self,
method: str,
endpoint: str,
params: Optional[dict] = None,
body: Optional[dict] = None,
user_headers: Optional[dict] = None,
data: Optional[list] = None,
send_to: Literal['web_server', 'sign_server'] = 'web_server',
) -> Union[dict, str]:
if send_to == 'web_server':
headers = {**self.web_server_headers}
full_url = urllib.parse.urljoin(settings.albs_api_url, endpoint)
elif send_to == 'sign_server':
headers = {}
full_url = urllib.parse.urljoin(
settings.sign_server_api_url,
endpoint,
)
else:
raise ValueError(
"send_to parameter must be either web_server or sign_server"
)

if user_headers:
headers.update(user_headers)

async with aiohttp.ClientSession(
headers=headers,
raise_for_status=True,
) as session:
async with session.request(
method,
full_url,
json=body,
params=params,
data=data,
) as response:
if response.headers['Content-Type'] == 'application/json':
return await response.json()
return await response.text()

async def sign_repomd_xml(self, path_to_file: str, key_id: str, token: str):
endpoint = "sign"
result = {"asc_content": None, "error": None}
try:
response = await self.make_request(
"POST",
endpoint,
params={"keyid": key_id},
data={"file": Path(path_to_file).read_bytes()},
user_headers={"Authorization": f"Bearer {token}"},
send_to="sign_server",
)
result["asc_content"] = response
except Exception as err:
result['error'] = err
return result

async def get_sign_keys(self):
endpoint = "sign-keys/"
return await self.make_request("GET", endpoint)

# TODO: Use direct function call to alws.crud.errata_get_oval_xml
async def get_oval_xml(
self,
Expand Down Expand Up @@ -315,32 +248,6 @@ async def generate_rss(self, platform, modern_cache):

return feed.rss_str(pretty=True).decode('utf-8')

async def repomd_signer(self, repodata_path, key_id, token):
string_repodata_path = str(repodata_path)
if key_id is None:
self.logger.info(
"Cannot sign repomd.xml in %s, missing GPG key",
string_repodata_path,
)
return

file_path = os.path.join(repodata_path, "repomd.xml")
result = await self.sign_repomd_xml(file_path, key_id, token)
self.logger.info('PGP key id: %s', key_id)
result_data = result.get("asc_content")
if result_data is None:
self.logger.error(
"repomd.xml in %s is failed to sign:\n%s",
string_repodata_path,
result["error"],
)
return

repodata_path = os.path.join(repodata_path, "repomd.xml.asc")
with open(repodata_path, "w") as file:
file.writelines(result_data)
self.logger.info("repomd.xml in %s is signed", string_repodata_path)

def check_rpms_signature(self, repository_path: str, sign_keys: list):
self.logger.info("Checking signature for %s repo", repository_path)
key_ids_lower = [i.keyid.lower() for i in sign_keys]
Expand Down Expand Up @@ -525,21 +432,6 @@ async def export_repos_from_release(
exported_paths = await self.export_repositories(repo_ids)
return exported_paths, db_release.platform_id

async def get_sign_server_token(self) -> str:
body = {
'email': settings.sign_server_username,
'password': settings.sign_server_password,
}
endpoint = 'token'
method = 'POST'
response = await self.make_request(
method=method,
endpoint=endpoint,
body=body,
send_to='sign_server',
)
return response['token']


async def sign_repodata(
exporter: PackagesExporter,
Expand All @@ -548,8 +440,6 @@ async def sign_repodata(
db_sign_keys: list,
key_id_by_platform: Optional[str] = None,
):
repodata_paths = []

tasks = []
token = await exporter.get_sign_server_token()

Expand All @@ -560,8 +450,6 @@ async def sign_repodata(
if not os.path.exists(repo_path):
continue

repodata_paths.append(repodata)

key_id = key_id_by_platform or None
for platform_id, platform_repos in platforms_dict.items():
for repo_export_path in platform_repos:
Expand Down
51 changes: 51 additions & 0 deletions scripts/exporters/products_exporter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import asyncio
import os
import sys
from pathlib import Path
from typing import List, Literal
Expand Down Expand Up @@ -66,6 +67,13 @@ def parse_args():
required=False,
help="Method of exporting (choices: write, hardlink, symlink)",
)
parser.add_argument(
"-sw",
"--sign-with",
type=str,
required=True,
help="GPG key name to use when signing repodata.",
)
parser.add_argument(
"-l",
"--log",
Expand Down Expand Up @@ -137,6 +145,17 @@ async def export_product_repos(
list({repo.id for repo in product.repositories})
)

async def get_sign_key_id(self, key_id):
sign_keys = await self.get_sign_keys()
return next(
(
sign_key['keyid']
for sign_key in sign_keys
if sign_key['keyid'] == key_id
),
None,
)


async def repo_post_processing(
exporter: ProductExporter,
Expand All @@ -152,6 +171,27 @@ async def repo_post_processing(
return result


async def sign_repodata(
exporter: ProductExporter,
exported_paths: List[str],
key_id: str,
):
tasks = []
token = await exporter.get_sign_server_token()

for repo_path in exported_paths:
path = Path(repo_path)
parent_dir = path.parent
repodata = parent_dir / "repodata"
if not os.path.exists(repo_path):
continue

exporter.logger.info('Key ID: %s', str(key_id))
tasks.append(exporter.repomd_signer(repodata, key_id, token))

await asyncio.gather(*tasks)


async def main():
args = parse_args()
await setup_all()
Expand All @@ -161,6 +201,14 @@ async def main():
export_method=args.export_method,
log_file_path=args.log,
)

sign_key_id = None
if args.sign_with:
sign_key_id = await exporter.get_sign_key_id(args.sign_with)
if not sign_key_id:
err = "Couldn't retrieve the '{args.sign_with}' sign key"
raise Exception(f'Aborting product export, error was: {err}')

pulp_client.PULP_SEMAPHORE = asyncio.Semaphore(10)
exported_paths = await exporter.export_product_repos(
product_name=args.product,
Expand All @@ -171,6 +219,9 @@ async def main():
*(repo_post_processing(exporter, path) for path in exported_paths)
)

if sign_key_id:
await sign_repodata(exporter, exported_paths, sign_key_id)


if __name__ == '__main__':
asyncio.run(main())
Loading