Skip to content

Commit 21ace23

Browse files
daviddavisdralley
authored andcommitted
Sign packages concurrently when adding to repo
Assisted by: GPT-5.2-Codex fixes #4357
1 parent 8a297a5 commit 21ace23

File tree

5 files changed

+125
-56
lines changed

5 files changed

+125
-56
lines changed

CHANGES/4357.feature

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added code and a setting (MAX_PACKAGE_SIGNING_WORKERS) to sign packages concurrently when adding
2+
them to a repo. Signing packages concurrently will speed up the time it takes to sign and add
3+
packages to a repo.

docs/admin/reference/settings.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,8 @@ When publishing RPM metadata, if this is true, Pulp will use the timestamp that
6464
added to the repo rather than the timestamp that the package first appeared in Pulp. This timestamp
6565
appears in the "file" field of the time element for each package in primary.xml. Defaults to
6666
`False`.
67+
68+
69+
## MAX_PACKAGE_SIGNING_WORKERS
70+
71+
Sets the number of workers that pulp_rpm uses when concurrently signing packages. Defaults to 5.

pulp_rpm/app/models/content.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,26 @@ def sign(
4545
_env_vars["PULP_SIGNING_KEY_FINGERPRINT"] = pubkey_fingerprint
4646
return super().sign(filename, _env_vars)
4747

48+
async def asign(
49+
self,
50+
filename: str,
51+
env_vars: Optional[dict] = None,
52+
pubkey_fingerprint: Optional[str] = None,
53+
):
54+
"""
55+
Asynchronously sign a package @filename using @pubkey_figerprint.
56+
57+
Args:
58+
filename: The absolute path to the package to be signed.
59+
env_vars: (optional) Dict of env_vars to be passed to the signing script.
60+
pubkey_fingerprint: The V4 fingerprint that correlates with the private key to use.
61+
"""
62+
if not pubkey_fingerprint:
63+
raise ValueError("A pubkey_fingerprint must be provided.")
64+
_env_vars = env_vars or {}
65+
_env_vars["PULP_SIGNING_KEY_FINGERPRINT"] = pubkey_fingerprint
66+
return await super().asign(filename, _env_vars)
67+
4868
def validate(self):
4969
"""
5070
Validate a signing service for a Rpm Package signature.

pulp_rpm/app/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@
1919
PRUNE_WORKERS_MAX = 5
2020
# workaround for: https://github.com/pulp/pulp_rpm/issues/4125
2121
SPECTACULAR_SETTINGS__OAS_VERSION = "3.0.1"
22+
MAX_PACKAGE_SIGNING_WORKERS = 5

pulp_rpm/app/tasks/signing.py

Lines changed: 96 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import asyncio
12
import logging
23
import re
34
import subprocess
45
from pathlib import Path
56
from tempfile import NamedTemporaryFile
67

8+
from django.conf import settings
9+
710
from pulpcore.plugin.models import (
811
Artifact,
912
ContentArtifact,
@@ -65,9 +68,7 @@ def _verify_package_fingerprint(package_file, signing_fingerprint):
6568
return False
6669

6770

68-
def _sign_file(package_file, signing_service, signing_fingerprint):
69-
result = signing_service.sign(package_file.name, pubkey_fingerprint=signing_fingerprint)
70-
signed_package_path = Path(result["rpm_package"])
71+
def _create_signed_artifact(signed_package_path, result):
7172
if not signed_package_path.exists():
7273
raise Exception(f"Signing script did not create the signed package: {result}")
7374
artifact = Artifact.init_and_validate(str(signed_package_path))
@@ -77,6 +78,73 @@ def _sign_file(package_file, signing_service, signing_fingerprint):
7778
return artifact
7879

7980

81+
def _sign_file(package_file, signing_service, signing_fingerprint):
82+
result = signing_service.sign(package_file.name, pubkey_fingerprint=signing_fingerprint)
83+
signed_package_path = Path(result["rpm_package"])
84+
return _create_signed_artifact(signed_package_path, result)
85+
86+
87+
async def _asign_file(package_file, signing_service, signing_fingerprint):
88+
result = await signing_service.asign(package_file.name, pubkey_fingerprint=signing_fingerprint)
89+
signed_package_path = Path(result["rpm_package"])
90+
return await asyncio.to_thread(_create_signed_artifact, signed_package_path, result)
91+
92+
93+
def _sign_package(package, signing_service, signing_fingerprint):
94+
"""
95+
Sign a package or reuse an existing signed result.
96+
97+
Returns None if already signed with the fingerprint, otherwise a
98+
tuple of (original_package_id, new_package_id).
99+
"""
100+
# the viewset is currently already checking (and rejecting) on demand content
101+
# but in the future we could just download it instead
102+
content_artifact = package.contentartifact_set.first()
103+
artifact_obj = content_artifact.artifact
104+
package_id = str(package.pk)
105+
106+
with NamedTemporaryFile(mode="wb", dir=".", delete=False) as final_package:
107+
artifact_file = artifact_obj.file
108+
_save_file(artifact_file, final_package)
109+
110+
# check if the package is already signed with our fingerprint
111+
if _verify_package_fingerprint(final_package, signing_fingerprint):
112+
return None
113+
114+
# check if the package has been signed in the past with our fingerprint and replace
115+
# it with the previously-created signed package if so
116+
if existing_result := RpmPackageSigningResult.objects.filter(
117+
original_package_sha256=content_artifact.artifact.sha256,
118+
package_signing_fingerprint=signing_fingerprint,
119+
).first():
120+
return (package_id, str(existing_result.result_package.pk))
121+
122+
# create a new signed version of the package
123+
log.info(f"Signing package {package.filename}.")
124+
artifact = _sign_file(final_package, signing_service, signing_fingerprint)
125+
signed_package = package
126+
signed_package.pk = None
127+
signed_package.pulp_id = None
128+
signed_package.pkgId = artifact.sha256
129+
signed_package.checksum_type = CHECKSUM_TYPES.SHA256
130+
signed_package.save()
131+
ContentArtifact.objects.create(
132+
artifact=artifact,
133+
content=signed_package,
134+
relative_path=content_artifact.relative_path,
135+
)
136+
RpmPackageSigningResult.objects.create(
137+
original_package_sha256=artifact_obj.sha256,
138+
package_signing_fingerprint=signing_fingerprint,
139+
result_package=signed_package,
140+
)
141+
142+
resource = CreatedResource(content_object=signed_package)
143+
resource.save()
144+
log.info(f"Signed package {package.filename}.")
145+
return (package_id, str(signed_package.pk))
146+
147+
80148
def sign_and_create(
81149
app_label,
82150
serializer_name,
@@ -120,58 +188,30 @@ def signed_add_and_remove(
120188
repo = RpmRepository.objects.get(pk=repository_pk)
121189

122190
if repo.package_signing_service:
123-
# sign each package and replace it in the add_content_units list
124-
for package in Package.objects.filter(pk__in=add_content_units).iterator():
125-
# the viewset is currently already checking (and rejecting) on demand content
126-
# but in the future we could just download it instead
127-
content_artifact = package.contentartifact_set.first()
128-
artifact_obj = content_artifact.artifact
129-
package_id = str(package.pk)
130-
131-
with NamedTemporaryFile(mode="wb", dir=".", delete=False) as final_package:
132-
artifact_file = artifact_obj.file
133-
_save_file(artifact_file, final_package)
134-
135-
# check if the package is already signed with our fingerprint
136-
if _verify_package_fingerprint(final_package, repo.package_signing_fingerprint):
137-
continue
138-
139-
# check if the package has been signed in the past with our fingerprint and replace
140-
# it with the previously-created signed package if so
141-
if existing_result := RpmPackageSigningResult.objects.filter(
142-
original_package_sha256=content_artifact.artifact.sha256,
143-
package_signing_fingerprint=repo.package_signing_fingerprint,
144-
).first():
145-
while package_id in add_content_units:
146-
add_content_units.remove(package_id)
147-
add_content_units.append(str(existing_result.result_package.pk))
148-
continue
149-
150-
# create a new signed version of the package
151-
artifact = _sign_file(
152-
final_package, repo.package_signing_service, repo.package_signing_fingerprint
153-
)
154-
signed_package = package
155-
signed_package.pk = None
156-
signed_package.pulp_id = None
157-
signed_package.pkgId = artifact.sha256
158-
signed_package.checksum_type = CHECKSUM_TYPES.SHA256
159-
signed_package.save()
160-
ContentArtifact.objects.create(
161-
artifact=artifact,
162-
content=signed_package,
163-
relative_path=content_artifact.relative_path,
164-
)
165-
RpmPackageSigningResult.objects.create(
166-
original_package_sha256=artifact_obj.sha256,
167-
package_signing_fingerprint=repo.package_signing_fingerprint,
168-
result_package=signed_package,
169-
)
170-
171-
resource = CreatedResource(content_object=signed_package)
172-
resource.save()
173-
while package_id in add_content_units:
174-
add_content_units.remove(package_id)
175-
add_content_units.append(str(signed_package.pk))
191+
add_content_units = set(add_content_units)
192+
packages = list(Package.objects.filter(pk__in=add_content_units).all())
193+
194+
async def _sign_packages():
195+
semaphore = asyncio.Semaphore(settings.MAX_PACKAGE_SIGNING_WORKERS)
196+
197+
async def _bounded_sign(pkg):
198+
async with semaphore:
199+
return await asyncio.to_thread(
200+
_sign_package,
201+
pkg,
202+
repo.package_signing_service,
203+
repo.package_signing_fingerprint,
204+
)
205+
206+
return await asyncio.gather(*(_bounded_sign(pkg) for pkg in packages))
207+
208+
for result in asyncio.run(_sign_packages()):
209+
if not result:
210+
continue
211+
old_id, new_id = result
212+
add_content_units.discard(old_id)
213+
add_content_units.add(new_id)
214+
215+
add_content_units = list(add_content_units)
176216

177217
return add_and_remove(repository_pk, add_content_units, remove_content_units, base_version_pk)

0 commit comments

Comments
 (0)