Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ def clear_expired_django_sessions():
logger.warning("Clear expired django sessions")
return
management.call_command("clearsessions", verbosity=0)


@shared_task
def celery_queue_uptime_check(queue: str):
logger.info("Celery Queue %s is taking task...", queue)
1 change: 1 addition & 0 deletions apps/contributor/graphql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ContributorUserGroupType(UserResourceTypeMixin, ArchivableResourceTypeMixi
models.Subquery(
ContributorUserGroupMembership.objects.filter(
user_group_id=models.OuterRef("id"),
is_active=True,
)
.order_by()
.values("user_group_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ def create_project(
existing_project: existing_db_models.Project,
requesting_organization: str,
bot_user: User,
):
) -> tuple[Project, bool]:
try:
assert existing_project.project_type is not None, "Project type should be defined"

Expand Down Expand Up @@ -544,7 +544,7 @@ def create_project(
# 2022-10-13 00:00:00 -> 2022-10-13
project_metadata["last_contribution_date"] = day_.split(" ")[0]

return Project.objects.update_or_create(
project, project_created = Project.objects.update_or_create(
old_id=existing_project.project_id,
create_defaults=dict(
client_id=client_id,
Expand All @@ -553,6 +553,15 @@ def create_project(
),
defaults=project_metadata,
)

# NOTE: Django doesn't allow custom value for auto_add fields, we can use objects.update to do that
if project_creation_date := parse_datetime(existing_project.created):
Project.objects.filter(id=project.pk).update(
created_at=project_creation_date,
modified_at=project_creation_date,
)

return project, project_created
except IntegrityError as e:
if not str(e).startswith('duplicate key value violates unique constraint "unique_project_name"'):
raise
Expand Down
9 changes: 6 additions & 3 deletions apps/mapping/firebase/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from main.bulk_managers import BulkCreateManager
from main.config import Config
from main.logging import log_extra
from main.sentry import SentryTag

from .utils import (
FirebaseCleanup,
Expand All @@ -22,12 +23,11 @@ def _transfer_results_for_project(
firebase_cleanup: FirebaseCleanup,
project: Project,
):
SentryTag.set_tags({SentryTag.Tag.PROJECT: project.pk})
group_results = FH.ref(Config.FirebaseKeys.results_project_groups(project.firebase_id)).get()
assert type(group_results) is dict
results_to_temp_table(bulk_create_manager, firebase_cleanup, project, group_results)

# generate user_group mapping session


def pull_results_from_firebase():
# TODO: Add a lock to prevent concurrent execution. We have lock within celery
Expand Down Expand Up @@ -59,6 +59,7 @@ def pull_results_from_firebase():
{
"tutorial_firebase_id": project_firebase_id,
},
sentry_tags={SentryTag.Tag.PROJECT: project_firebase_id},
),
)
firebase_cleanup.add_project(project_firebase_id=project_firebase_id)
Expand All @@ -73,9 +74,11 @@ def pull_results_from_firebase():
{
"project_firebase_id": project_firebase_id,
},
sentry_tags={SentryTag.Tag.PROJECT: project_firebase_id},
),
)
firebase_cleanup.add_project(project_firebase_id=project_firebase_id)
# TODO: Cleanup this.. For now let's do this manually
# firebase_cleanup.add_project(project_firebase_id=project_firebase_id)
continue

_transfer_results_for_project(
Expand Down
3 changes: 2 additions & 1 deletion apps/project/exports/overall_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def regenerate_projects_csv(temp_projects_csv: typing.IO): # type: ignore[repor
"area_sqkm": models.F("aoi_geometry__total_area"),
"centroid": None, # TODO: use this after removing from model models.F("aoi_geometry__centroid"),
"geom": models.F("aoi_geometry__geometry"),
"progress": None,
"progress": None, # NOTE: This is changed to float later
"number_of_contributor_users": None,
"number_of_results": None,
"number_of_results_for_progress": None,
Expand All @@ -116,6 +116,7 @@ def regenerate_projects_csv(temp_projects_csv: typing.IO): # type: ignore[repor
data["image_url"] = image_file_url
data["status_display"] = ProjectStatusEnum(data["status"]).label
data["project_type_display"] = ProjectTypeEnum(data["project_type"]).label
data["progress"] = data["progress"] / 100

writer.writerow(data)

Expand Down
7 changes: 6 additions & 1 deletion apps/project/graphql/types/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,17 @@ class ProjectType(UserResourceTypeMixin, ProjectExportAssetTypeMixin, FirebasePu
aoi_geometry: GeometryType | None

progress_status: strawberry.auto
progress: strawberry.auto
number_of_contributor_users: strawberry.auto
number_of_results: strawberry.auto
number_of_results_for_progress: strawberry.auto
last_contribution_date: strawberry.auto

@strawberry_django.field(
description=str(Project._meta.get_field("progress").help_text), # type: ignore[reportAttributeAccessIssue]
)
def progress(self, project: strawberry.Parent[Project]) -> float:
return project.progress / 100

@strawberry_django.field(
description="No. of unique contributors in this project",
)
Expand Down
2 changes: 1 addition & 1 deletion assets
10 changes: 1 addition & 9 deletions main/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
from celery import signals
from django.conf import settings
from django.db import models
from kombu import Queue

from .cronjobs import BEAT_SCHEDULES
from .cronjobs import BEAT_SCHEDULES, CeleryQueue

if TYPE_CHECKING:
from celery.app.task import Task
Expand All @@ -24,13 +23,6 @@
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")


class CeleryQueue:
# NOTE: Make sure all queue names are lowercase (They are in k8s)
default = Queue("default")

ALL_QUEUE = (default,)


class Celery(celery.Celery):
def on_configure(self): # type: ignore[reportIncompatibleVariableOverride]
if settings.SENTRY_ENABLED:
Expand Down
26 changes: 26 additions & 0 deletions main/cronjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime

from celery.schedules import crontab
from kombu import Queue
from sentry_sdk.integrations.celery import beat as sentry_celery_beat

if typing.TYPE_CHECKING:
Expand All @@ -20,6 +21,13 @@ class TimeConstants:
EVERY_1_MINUTES = crontab(minute="*/1")


class CeleryQueue:
# NOTE: Make sure all queue names are lowercase (They are in k8s)
default = Queue("default")

ALL_QUEUE = (default,)


class CronJobOption(typing.TypedDict, total=False):
# https://docs.celeryq.dev/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async

Expand All @@ -37,6 +45,7 @@ class CeleryBeatSchedule(typing.TypedDict):
task: str
schedule: crontab
options: CronJobOption
args: tuple[typing.Any, ...] | None


class CronJobSentryConfig(typing.NamedTuple):
Expand Down Expand Up @@ -67,6 +76,7 @@ def as_dict(self) -> "MonitorConfig":
class CronJob(typing.NamedTuple):
task: str
schedule: crontab
args: tuple[typing.Any, ...] | None = None
sentry_config: CronJobSentryConfig = CronJobSentryConfig()
options: CronJobOption = {}

Expand Down Expand Up @@ -118,11 +128,27 @@ class CronJob(typing.NamedTuple):
max_runtime=2,
),
),
# Queue uptime
**{
f"celery_queue_uptime_{celery_queue.name}": CronJob(
task="apps.common.tasks.celery_queue_uptime_check",
args=(celery_queue.name,),
schedule=TimeConstants.EVERY_HOUR,
options=CronJobOption(expires=TimeConstants.SECONDS_IN_A_HOUR),
sentry_config=CronJobSentryConfig(
failure_issue_threshold=2,
checkin_margin=2,
max_runtime=2,
),
)
for celery_queue in CeleryQueue.ALL_QUEUE
},
}

BEAT_SCHEDULES: dict[str, CeleryBeatSchedule] = {
name: {
"task": config.task,
"args": config.args,
"schedule": config.schedule,
"options": config.options,
}
Expand Down
8 changes: 7 additions & 1 deletion main/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import requests

from main.sentry import SentryTag


def log_render_extra_context(record: logging.LogRecord):
"""Append extra->context to logs.
Expand All @@ -20,10 +22,14 @@ def log_render_extra_context(record: logging.LogRecord):
return True


def log_extra(extra: dict[typing.Any, typing.Any]):
def log_extra(
extra: dict[typing.Any, typing.Any],
sentry_tags: dict[SentryTag.Tag, typing.Any] | None = None,
):
"""Basic helper function to view extra argument in logs using log_render_extra_context."""
return {
"context": extra,
"tags": sentry_tags, # https://forum.sentry.io/t/how-to-add-tags-to-python-logging/323/4
}


Expand Down
14 changes: 14 additions & 0 deletions main/sentry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import dataclasses
import json
import typing
from enum import Enum

import sentry_sdk
from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -106,3 +107,16 @@ def track_transaction(graphql_urls: set[str], request: typing.Any):
"is_superuser": user.is_superuser,
},
)


class SentryTag:
"""https://docs.sentry.io/platforms/python/enriching-events/tags/."""

class Tag(Enum):
_BASE = "mapswipe."
PROJECT = _BASE + "project"

@staticmethod
def set_tags(kwargs: dict[Tag, int | str]):
for key, value in kwargs.items():
sentry_sdk.set_tag(key.value, value)
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ reportPrivateImportUsage = "error"
[tool.coverage.report]
# https://coverage.readthedocs.io/en/7.10.6/excluding.html#advanced-exclusion
exclude_also = [
'raise NotImplementedError',
'if typing.TYPE_CHECKING:',
"raise NotImplementedError",
"if typing.TYPE_CHECKING:",
]
partial_branches = [
"if typing.TYPE_CHECKING:",
]
2 changes: 1 addition & 1 deletion schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,7 @@ type ProjectType implements UserResourceTypeMixin & ProjectExportAssetTypeMixin
processingStatus: ProjectProcessingStatusEnum

"""Percentage of the required contribution that has been completed"""
progress: Int!
progress: Float!
progressStatus: ProjectProgressStatusEnum!

"""Provide project instruction"""
Expand Down
Loading