Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.

Commit 98c6acc

Browse files
committed
add a helper class for publishing to shelter
1 parent df236e8 commit 98c6acc

File tree

3 files changed

+87
-100
lines changed

3 files changed

+87
-100
lines changed

codecov_auth/signals.py

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
from typing import Any, Dict, Optional, Type
34

45
from django.conf import settings
56
from django.db.models.signals import post_save
@@ -13,73 +14,81 @@
1314

1415
@receiver(post_save, sender=Owner)
1516
def create_owner_profile_when_owner_is_created(
16-
sender, instance: Owner, created, **kwargs
17-
):
17+
sender: Type[Owner], instance: Owner, created: bool, **kwargs: Dict[str, Any]
18+
) -> Optional[OwnerProfile]:
1819
if created:
1920
return OwnerProfile.objects.create(owner_id=instance.ownerid)
2021

2122

22-
_pubsub_publisher = None
23+
class ShelterPubsub:
24+
pubsub_publisher = None
25+
_instance = None
2326

27+
@classmethod
28+
def get_instance(cls) -> "ShelterPubsub":
29+
"""
30+
This class needs the Django settings to be fully loaded before it can be instantiated,
31+
therefore use this method to get an instance rather than instantiating directly.
32+
"""
33+
if cls._instance is None:
34+
cls._instance = cls()
35+
return cls._instance
2436

25-
def _get_pubsub_publisher():
26-
global _pubsub_publisher
27-
if not _pubsub_publisher:
28-
_pubsub_publisher = pubsub_v1.PublisherClient()
29-
return _pubsub_publisher
37+
def __init__(self) -> None:
38+
if not self.pubsub_publisher:
39+
self.pubsub_publisher = pubsub_v1.PublisherClient()
40+
pubsub_project_id: str = settings.SHELTER_PUBSUB_PROJECT_ID
41+
42+
# topic_id has REPO in the name but it is used for all types of objects
43+
topic_id: str = settings.SHELTER_PUBSUB_SYNC_REPO_TOPIC_ID
44+
self.topic_path = self.pubsub_publisher.topic_path(pubsub_project_id, topic_id)
45+
46+
def publish(self, data: Dict[str, Any]) -> None:
47+
try:
48+
self.pubsub_publisher.publish(
49+
self.topic_path,
50+
json.dumps(data).encode("utf-8"),
51+
)
52+
except Exception as e:
53+
log.warning(
54+
"Failed to publish a message",
55+
extra=dict(data_to_publish=data, error=e),
56+
)
3057

3158

3259
@receiver(
3360
post_save, sender=OrganizationLevelToken, dispatch_uid="shelter_sync_org_token"
3461
)
35-
def update_org_token(sender, instance: OrganizationLevelToken, **kwargs):
36-
pubsub_project_id = settings.SHELTER_PUBSUB_PROJECT_ID
37-
topic_id = settings.SHELTER_PUBSUB_SYNC_REPO_TOPIC_ID
38-
if pubsub_project_id and topic_id:
39-
publisher = _get_pubsub_publisher()
40-
topic_path = publisher.topic_path(pubsub_project_id, topic_id)
41-
publisher.publish(
42-
topic_path,
43-
json.dumps(
44-
{
45-
"type": "org_token",
46-
"sync": "one",
47-
"id": instance.id,
48-
}
49-
).encode("utf-8"),
50-
)
62+
def update_org_token(
63+
sender: Type[OrganizationLevelToken],
64+
instance: OrganizationLevelToken,
65+
**kwargs: Dict[str, Any],
66+
) -> None:
67+
data = {
68+
"type": "org_token",
69+
"sync": "one",
70+
"id": instance.id,
71+
}
72+
ShelterPubsub.get_instance().publish(data)
5173

5274

5375
@receiver(post_save, sender=Owner, dispatch_uid="shelter_sync_owner")
54-
def update_owner(sender, instance: Owner, **kwargs):
76+
def update_owner(
77+
sender: Type[Owner], instance: Owner, **kwargs: Dict[str, Any]
78+
) -> None:
5579
"""
5680
Shelter tracks a limited set of Owner fields - only update if those fields have changed.
5781
"""
58-
created = kwargs["created"]
82+
created: bool = kwargs["created"]
5983
tracked_fields = [
6084
"upload_token_required_for_public_repos",
6185
"username",
6286
"service",
6387
]
6488
if created or any(instance.tracker.has_changed(field) for field in tracked_fields):
65-
pubsub_project_id = settings.SHELTER_PUBSUB_PROJECT_ID
66-
topic_id = settings.SHELTER_PUBSUB_SYNC_REPO_TOPIC_ID
67-
if pubsub_project_id and topic_id:
68-
try:
69-
publisher = _get_pubsub_publisher()
70-
topic_path = publisher.topic_path(pubsub_project_id, topic_id)
71-
publisher.publish(
72-
topic_path,
73-
json.dumps(
74-
{
75-
"type": "owner",
76-
"sync": "one",
77-
"id": instance.ownerid,
78-
}
79-
).encode("utf-8"),
80-
)
81-
except Exception as e:
82-
log.warning(
83-
"Failed to publish message for owner",
84-
extra=dict(owner_id=instance.pk, error=e),
85-
)
89+
data = {
90+
"type": "owner",
91+
"sync": "one",
92+
"id": instance.ownerid,
93+
}
94+
ShelterPubsub.get_instance().publish(data)

codecov_auth/tests/test_signals.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ def test_sync_error(self, mock_log, mock_publish):
128128
)
129129

130130
mock_log.assert_called_once_with(
131-
"Failed to publish message for owner",
132-
extra=dict(owner_id=12345, error=mock_publish.side_effect),
131+
"Failed to publish a message",
132+
extra=dict(
133+
data_to_publish={"type": "owner", "sync": "one", "id": 12345},
134+
error=mock_publish.side_effect,
135+
),
133136
)

core/signals.py

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,43 @@
1-
import json
21
import logging
2+
from typing import Any, Dict, List, Type
33

4-
from django.conf import settings
54
from django.db.models.signals import post_save
65
from django.dispatch import receiver
7-
from google.cloud import pubsub_v1
86
from shared.django_apps.core.models import Commit
97

8+
from codecov_auth.signals import ShelterPubsub
109
from core.models import Repository
1110

12-
_pubsub_publisher = None
1311
log = logging.getLogger(__name__)
1412

1513

16-
def _get_pubsub_publisher():
17-
global _pubsub_publisher
18-
if not _pubsub_publisher:
19-
_pubsub_publisher = pubsub_v1.PublisherClient()
20-
return _pubsub_publisher
21-
22-
2314
@receiver(post_save, sender=Repository, dispatch_uid="shelter_sync_repo")
24-
def update_repository(sender, instance: Repository, **kwargs):
15+
def update_repository(
16+
sender: Type[Repository], instance: Repository, **kwargs: Dict[str, Any]
17+
) -> None:
2518
log.info(f"Signal triggered for repository {instance.repoid}")
26-
created = kwargs["created"]
27-
changes = instance.tracker.changed()
28-
if created or any([field in changes for field in ["name", "upload_token"]]):
29-
try:
30-
pubsub_project_id = settings.SHELTER_PUBSUB_PROJECT_ID
31-
topic_id = settings.SHELTER_PUBSUB_SYNC_REPO_TOPIC_ID
32-
if pubsub_project_id and topic_id:
33-
publisher = _get_pubsub_publisher()
34-
topic_path = publisher.topic_path(pubsub_project_id, topic_id)
35-
publisher.publish(
36-
topic_path,
37-
json.dumps(
38-
{
39-
"type": "repo",
40-
"sync": "one",
41-
"id": instance.repoid,
42-
}
43-
).encode("utf-8"),
44-
)
45-
log.info(f"Message published for repository {instance.repoid}")
46-
except Exception as e:
47-
log.warning(f"Failed to publish message for repo {instance.repoid}: {e}")
19+
created: bool = kwargs["created"]
20+
changes: Dict[str, Any] = instance.tracker.changed()
21+
tracked_fields: List[str] = ["name", "upload_token"]
22+
23+
if created or any([field in changes for field in tracked_fields]):
24+
data = {
25+
"type": "repo",
26+
"sync": "one",
27+
"id": instance.repoid,
28+
}
29+
ShelterPubsub.get_instance().publish(data)
4830

4931

5032
@receiver(post_save, sender=Commit, dispatch_uid="shelter_sync_commit")
51-
def update_commit(sender, instance: Commit, **kwargs):
52-
branch = instance.branch
33+
def update_commit(
34+
sender: Type[Commit], instance: Commit, **kwargs: Dict[str, Any]
35+
) -> None:
36+
branch: str = instance.branch
5337
if branch and ":" in branch:
54-
pubsub_project_id = settings.SHELTER_PUBSUB_PROJECT_ID
55-
topic_id = settings.SHELTER_PUBSUB_SYNC_REPO_TOPIC_ID
56-
if pubsub_project_id and topic_id:
57-
publisher = _get_pubsub_publisher()
58-
topic_path = publisher.topic_path(pubsub_project_id, topic_id)
59-
publisher.publish(
60-
topic_path,
61-
json.dumps(
62-
{
63-
"type": "commit",
64-
"sync": "one",
65-
"id": instance.id,
66-
}
67-
).encode("utf-8"),
68-
)
38+
data = {
39+
"type": "commit",
40+
"sync": "one",
41+
"id": instance.id,
42+
}
43+
ShelterPubsub.get_instance().publish(data)

0 commit comments

Comments
 (0)