|
| 1 | +import base64 |
1 | 2 | import os |
2 | 3 | import pathlib |
3 | 4 | import urllib.request |
@@ -29,7 +30,8 @@ def __init__(self, system_config_root: pathlib.Path, cmdline_cache: Optional[str |
29 | 30 | self.build_cache_mirror = ([mirror for mirror in self.mirrors if mirror.get('buildcache', False)] |
30 | 31 | + [None]).pop(0) |
31 | 32 | self.bootstrap_mirrors = [mirror for mirror in self.mirrors if mirror.get('bootstrap', False)] |
32 | | - self.keys = [mirror['key'] for mirror in self.mirrors if mirror.get('key') is not None] |
| 33 | + # Will hold a list of all the keys |
| 34 | + self.keys = None |
33 | 35 |
|
34 | 36 | def _load_mirrors(self, cmdline_cache: Optional[str]) -> List[Dict]: |
35 | 37 | """Load the mirrors file, if one exists.""" |
@@ -159,39 +161,47 @@ def _key_setup(self, key_store: pathlib.Path): |
159 | 161 | """Validate mirror keys, relocate to key_store, and update mirror config with new key paths.""" |
160 | 162 |
|
161 | 163 | for mirror in self.mirrors: |
162 | | - if mirror["public_key"]: |
163 | | - key = mirror["public_key"] |
164 | | - |
165 | | - # key will be saved under key_store/mirror_name.gpg |
166 | | - dest = (key_store / f"'{mirror["name"]}'.gpg").resolve() |
167 | | - |
168 | | - # if path, check if abs path, if not, append sys config path in front and check again |
169 | | - path = pathlib.Path(os.path.expandvars(key)) |
170 | | - if path.exists(): |
171 | | - if not path.is_absolute(): |
172 | | - #try prepending system config path |
173 | | - path = self._system_config_root/path |
174 | | - if not path.is_file(): |
175 | | - raise MirrorError( |
176 | | - f"The key path '{path}' is not a file. " |
177 | | - f"Check the key listed in mirrors.yaml in system config.") |
178 | | - |
179 | | - file_type = magic.from_file(path) |
180 | | - |
181 | | - if not file_type.startswith("OpenPGP Public Key"): |
| 164 | + if not mirror["public_key"]: |
| 165 | + continue |
| 166 | + |
| 167 | + key = mirror["public_key"] |
| 168 | + |
| 169 | + # key will be saved under key_store/mirror_name.gpg |
| 170 | + dest = (key_store / f"'{mirror["name"]}'.gpg").resolve() |
| 171 | + |
| 172 | + # if path, check if abs path, if not, append sys config path in front and check again |
| 173 | + path = pathlib.Path(os.path.expandvars(key)) |
| 174 | + if path.exists(): |
| 175 | + if not path.is_absolute(): |
| 176 | + #try prepending system config path |
| 177 | + path = self._system_config_root/path |
| 178 | + if not path.is_file(): |
182 | 179 | raise MirrorError( |
183 | | - f"'{path}' is not a valid GPG key. " |
| 180 | + f"The key path '{path}' is not a file. " |
184 | 181 | f"Check the key listed in mirrors.yaml in system config.") |
185 | | - |
186 | | - # copy key to new destination in key store |
187 | | - with open(path, 'r') as reader, open(dest, 'w') as writer: |
188 | | - data = reader.read() |
189 | | - writer.write(data) |
190 | | - |
191 | | - else: |
192 | | - # if PGP key, convert to binary, ???, convert back |
193 | | - with open(dest, "w") as file: |
194 | | - file.write(key) |
| 182 | + |
| 183 | + file_type = magic.from_file(path) |
| 184 | + |
| 185 | + if not file_type.startswith("OpenPGP Public Key"): |
| 186 | + raise MirrorError( |
| 187 | + f"'{path}' is not a valid GPG key. " |
| 188 | + f"Check the key listed in mirrors.yaml in system config.") |
195 | 189 |
|
196 | | - # update mirror with new path |
197 | | - mirror["key"] = dest |
| 190 | + # copy key to new destination in key store |
| 191 | + with open(path, 'r') as reader, open(dest, 'w') as writer: |
| 192 | + data = reader.read() |
| 193 | + writer.write(data) |
| 194 | + |
| 195 | + else: |
| 196 | + try: |
| 197 | + key = base64.b64decode(key) |
| 198 | + except ValueError as err: |
| 199 | + pass |
| 200 | + magic.from_buffer(key) |
| 201 | + |
| 202 | + # if PGP key, convert to binary, ???, convert back |
| 203 | + with open(dest, "wb") as file: |
| 204 | + file.write(key) |
| 205 | + |
| 206 | + # update mirror with new path |
| 207 | + mirror["key"] = dest |
0 commit comments