|
| 1 | +import argparse |
| 2 | +import base64 |
| 3 | +import json |
| 4 | +import logging |
| 5 | +import os |
| 6 | +import sys |
| 7 | +import threading |
| 8 | +import time |
| 9 | +from typing import Any |
| 10 | + |
| 11 | +import stomp |
| 12 | + |
| 13 | +from operatorcert.logger import setup_logger |
| 14 | +from operatorcert.umb import start_umb_client |
| 15 | + |
| 16 | +LOGGER = logging.getLogger("operator-cert") |
| 17 | + |
| 18 | + |
| 19 | +def setup_argparser() -> Any: # pragma: no cover |
| 20 | + """ |
| 21 | + Setup argument parser |
| 22 | +
|
| 23 | + Returns: |
| 24 | + Any: Initialized argument parser |
| 25 | + """ |
| 26 | + parser = argparse.ArgumentParser( |
| 27 | + description="Cli tool to request signature from RADAS" |
| 28 | + ) |
| 29 | + parser.add_argument( |
| 30 | + "--manifest-digest", |
| 31 | + help="Manifest digest for the signed content, usually in the format sha256:xxx", |
| 32 | + required=True, |
| 33 | + ) |
| 34 | + parser.add_argument( |
| 35 | + "--output", |
| 36 | + help="Path to an output file.", |
| 37 | + default="signing_response.json", |
| 38 | + ) |
| 39 | + parser.add_argument( |
| 40 | + "--reference", |
| 41 | + help="Docker reference for the signed content, e.g. registry.redhat.io/redhat/community-operator-index:v4.9", |
| 42 | + required=True, |
| 43 | + ) |
| 44 | + parser.add_argument( |
| 45 | + "--requester", |
| 46 | + |
| 47 | + help="Name of the user that requested the signing, for auditing purposes", |
| 48 | + required=True, |
| 49 | + ) |
| 50 | + parser.add_argument( |
| 51 | + "--sig-key-id", |
| 52 | + default="4096R/55A34A82 SHA-256", |
| 53 | + help="The signing key id that the content was signed with", |
| 54 | + required=True, |
| 55 | + ) |
| 56 | + parser.add_argument( |
| 57 | + "--sig-key-name", |
| 58 | + default="containerisvsign", |
| 59 | + help="The signing key name that the content was signed with", |
| 60 | + required=True, |
| 61 | + ) |
| 62 | + parser.add_argument( |
| 63 | + "--umb-client-name", |
| 64 | + default="operatorpipelines", |
| 65 | + help="Client name to connect to umb, usually a service account name", |
| 66 | + required=True, |
| 67 | + ) |
| 68 | + parser.add_argument( |
| 69 | + "--umb-listen-topic", |
| 70 | + default="VirtualTopic.eng.robosignatory.isv.sign", |
| 71 | + help="umb topic to listen to for responses with signed content", |
| 72 | + required=True, |
| 73 | + ) |
| 74 | + parser.add_argument( |
| 75 | + "--umb-publish-topic", |
| 76 | + default="VirtualTopic.eng.operatorpipelines.isv.sign", |
| 77 | + help="umb topic to publish to for requesting signing", |
| 78 | + required=True, |
| 79 | + ) |
| 80 | + parser.add_argument( |
| 81 | + "--umb-url", |
| 82 | + default="umb.api.redhat.com", |
| 83 | + help="umb host to connect to for messaging", |
| 84 | + required=True, |
| 85 | + ) |
| 86 | + parser.add_argument("--verbose", action="store_true", help="Verbose output") |
| 87 | + return parser |
| 88 | + |
| 89 | + |
| 90 | +umb = None |
| 91 | +result_file = None |
| 92 | + |
| 93 | +# wait for signing response for a total of 5 min, at 5 second intervals |
| 94 | +TIMEOUT_COUNT = 60 |
| 95 | +WAIT_INTERVAL_SEC = 5 |
| 96 | + |
| 97 | + |
| 98 | +class UmbHandler(stomp.ConnectionListener): # pragma: no cover |
| 99 | + def on_error(self, frame: Any) -> None: |
| 100 | + LOGGER.error("Received an error frame:\n{}".format(frame.body)) |
| 101 | + |
| 102 | + def on_message(self, frame: Any) -> None: |
| 103 | + # handle response from radas in a thread |
| 104 | + t = threading.Thread(target=process_message, args=[frame.body]) |
| 105 | + t.start() |
| 106 | + |
| 107 | + def on_disconnected(self: Any) -> None: |
| 108 | + LOGGER.error("Disconnected from umb.") |
| 109 | + |
| 110 | + |
| 111 | +def process_message(msg: Any) -> None: |
| 112 | + """ |
| 113 | + Process a message received from UMB. |
| 114 | + Args: |
| 115 | + msg: The message body received. |
| 116 | + """ |
| 117 | + msg = json.loads(msg)["msg"] |
| 118 | + |
| 119 | + global umb |
| 120 | + if msg.get("request_id") == umb.id: |
| 121 | + LOGGER.info(f"Received radas response: {msg}") |
| 122 | + |
| 123 | + global result_file |
| 124 | + with open(result_file, "w") as f: |
| 125 | + json.dump(msg, f) |
| 126 | + LOGGER.info(f"Response from radas successfully received for request {umb.id}") |
| 127 | + sys.exit(0) |
| 128 | + else: |
| 129 | + LOGGER.info(f"Ignored message from another request ({msg.get('request_id')})") |
| 130 | + |
| 131 | + |
| 132 | +def gen_sig_claim_file(reference: str, digest: str, requested_by: str) -> str: |
| 133 | + """ |
| 134 | + Generated a claim file to be signed based on given data. |
| 135 | + Args: |
| 136 | + reference: Docker reference for the signed content, |
| 137 | + e.g. registry.redhat.io/redhat/community-operator-index:v4.9 |
| 138 | + digest: Manifest digest for the signed content, usually in the format sha256:xxx |
| 139 | + requested_by: Name of the user that requested the signing, for auditing purposes |
| 140 | + """ |
| 141 | + claim = { |
| 142 | + "critical": { |
| 143 | + "image": {"docker-manifest-digest": digest}, |
| 144 | + "type": "atomic container signature", |
| 145 | + "identity": {"docker-reference": reference}, |
| 146 | + }, |
| 147 | + "optional": {"creator": requested_by}, |
| 148 | + } |
| 149 | + |
| 150 | + claim = base64.b64encode(json.dumps(claim).encode("utf-8")) |
| 151 | + return claim.decode("utf-8") |
| 152 | + |
| 153 | + |
| 154 | +def gen_image_name(reference: str) -> str: |
| 155 | + """ |
| 156 | + Generate the image name as a signing input, based on the docker reference. |
| 157 | + Args: |
| 158 | + reference: Docker reference for the signed content, |
| 159 | + e.g. registry.redhat.io/redhat/community-operator-index:v4.9 |
| 160 | + """ |
| 161 | + no_tag = reference.split(":")[0] |
| 162 | + image_parts = no_tag.split("/") |
| 163 | + return "/".join(image_parts[1:]) |
| 164 | + |
| 165 | + |
| 166 | +def gen_request_msg(args, request_id): |
| 167 | + """ |
| 168 | + Generate the request message to send to RADAS. |
| 169 | + Args: |
| 170 | + args: Args from script input. |
| 171 | + request_id: UUID to identify match the request with RADAS's response. |
| 172 | +
|
| 173 | + Returns: |
| 174 | +
|
| 175 | + """ |
| 176 | + claim = gen_sig_claim_file(args.reference, args.manifest_digest, args.requester) |
| 177 | + image_name = gen_image_name(args.reference) |
| 178 | + request_msg = { |
| 179 | + "claim_file": claim, |
| 180 | + "docker_reference": args.reference, |
| 181 | + "image_name": image_name, |
| 182 | + "manifest_digest": args.manifest_digest, |
| 183 | + "request_id": request_id, |
| 184 | + "requested_by": args.requester, |
| 185 | + "sig_keyname": args.sig_key_name, |
| 186 | + "sig_key_id": args.sig_key_id, |
| 187 | + } |
| 188 | + return request_msg |
| 189 | + |
| 190 | + |
| 191 | +def request_signature(args: Any) -> None: |
| 192 | + """ |
| 193 | + Format and send out a UMB message to request signing, and retry as needed. |
| 194 | + """ |
| 195 | + global umb |
| 196 | + umb = start_umb_client( |
| 197 | + hosts=[args.umb_url], client_name=args.umb_client_name, handler=UmbHandler() |
| 198 | + ) |
| 199 | + global result_file |
| 200 | + result_file = args.output |
| 201 | + |
| 202 | + request_msg = gen_request_msg(args, umb.id) |
| 203 | + |
| 204 | + umb.connect_and_subscribe(args.umb_listen_topic) |
| 205 | + |
| 206 | + try: |
| 207 | + retry_count = 3 |
| 208 | + for i in range(retry_count + 1): |
| 209 | + LOGGER.info(f"Sending signing request message...attempt #{i+1}") |
| 210 | + umb.send(args.umb_publish_topic, json.dumps(request_msg)) |
| 211 | + |
| 212 | + wait_count = 0 |
| 213 | + LOGGER.debug(f"Checking for signing response result file {result_file}...") |
| 214 | + while not os.path.exists(result_file): |
| 215 | + time.sleep(WAIT_INTERVAL_SEC) |
| 216 | + wait_count += 1 |
| 217 | + if wait_count > TIMEOUT_COUNT: |
| 218 | + LOGGER.warning("Timeout from waiting for signing response.") |
| 219 | + break |
| 220 | + else: |
| 221 | + # exit retry loop if response file detected |
| 222 | + break |
| 223 | + |
| 224 | + LOGGER.info(f"No signing response received. Retrying.") |
| 225 | + finally: |
| 226 | + # unsubscribe to free up the queue |
| 227 | + LOGGER.info("Unsubscribing from queue and disconnecting from UMB...") |
| 228 | + umb.unsubscribe(args.umb_listen_topic) |
| 229 | + umb.stop() |
| 230 | + if not os.path.exists(result_file): |
| 231 | + LOGGER.error("No signing response received after all 3 retries.") |
| 232 | + sys.exit(1) |
| 233 | + |
| 234 | + |
| 235 | +def main(): # pragma: no cover |
| 236 | + """ |
| 237 | + Main func |
| 238 | + """ |
| 239 | + |
| 240 | + parser = setup_argparser() |
| 241 | + args = parser.parse_args() |
| 242 | + log_level = "DEBUG" if args.verbose else "INFO" |
| 243 | + setup_logger(log_level) |
| 244 | + |
| 245 | + request_signature(args) |
| 246 | + |
| 247 | + |
| 248 | +if __name__ == "__main__": # pragma: no cover |
| 249 | + main() |
0 commit comments