diff --git a/src/sentry/hybridcloud/outbox/base.py b/src/sentry/hybridcloud/outbox/base.py index abc4b3144bf9cb..d899c1f4d89074 100644 --- a/src/sentry/hybridcloud/outbox/base.py +++ b/src/sentry/hybridcloud/outbox/base.py @@ -226,6 +226,7 @@ class ControlOutboxProducingModel(Model): default_flush: bool | None = None replication_version: int = 1 + enqueue_after_flush: bool = False class Meta: abstract = True @@ -238,13 +239,21 @@ def _maybe_prepare_outboxes(self, *, outbox_before_super: bool) -> Generator[Non transaction.atomic(router.db_for_write(type(self))), flush=self.default_flush, ): + saved_outboxes = [] if not outbox_before_super: yield for outbox in self.outboxes_for_update(): outbox.save() + saved_outboxes.append(outbox.id) if outbox_before_super: yield + if not self.default_flush and self.enqueue_after_flush: + transaction.on_commit( + lambda: self._schedule_async_replication(saved_outboxes), + using=router.db_for_write(type(self)), + ) + def save(self, *args: Any, **kwds: Any) -> None: with self._maybe_prepare_outboxes(outbox_before_super=False): super().save(*args, **kwds) @@ -260,6 +269,24 @@ def delete(self, *args: Any, **kwds: Any) -> tuple[int, dict[str, Any]]: def outboxes_for_update(self, shard_identifier: int | None = None) -> list[ControlOutboxBase]: raise NotImplementedError + def _schedule_async_replication(self, saved_outboxes: list[int]) -> None: + from sentry.hybridcloud.tasks.deliver_from_outbox import drain_outbox_shards_control + + if not saved_outboxes: + logger.error( + "missing-outboxes.async-replication", + extra={ + "model": self.__class__.__name__, + }, + ) + return + + drain_outbox_shards_control.delay( + outbox_identifier_low=min(saved_outboxes), + outbox_identifier_hi=max(saved_outboxes) + 1, + outbox_name="sentry.ControlOutbox", + ) + _CM = TypeVar("_CM", bound=ControlOutboxProducingModel) diff --git a/src/sentry/models/apitoken.py b/src/sentry/models/apitoken.py index 9feccb1627217c..79913ba0d378b1 100644 --- a/src/sentry/models/apitoken.py +++ b/src/sentry/models/apitoken.py @@ -2,6 +2,7 @@ import base64 import hashlib +import logging import re import secrets from collections.abc import Collection, Mapping @@ -34,6 +35,9 @@ DEFAULT_EXPIRATION = timedelta(days=30) TOKEN_REDACTED = "***REDACTED***" +logger = logging.getLogger("sentry.apitoken") + + # RFC 7636 ยง4.1: code_verifier is 43-128 unreserved characters # ABNF: code-verifier = 43*128unreserved # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" @@ -172,6 +176,10 @@ class ApiToken(ReplicatedControlModel, HasApiScopes): __relocation_scope__ = {RelocationScope.Global, RelocationScope.Config} category = OutboxCategory.API_TOKEN_UPDATE + # Outbox settings + enqueue_after_flush = True + _default_flush: bool | None = None + # users can generate tokens without being application-bound application = FlexibleForeignKey("sentry.ApiApplication", null=True) user = FlexibleForeignKey("sentry.User") @@ -554,6 +562,21 @@ def organization_id(self) -> int | None: return installation.organization_id + @property + def default_flush(self) -> bool: + from sentry import options + + has_async_flush = options.get("api-token-async-flush") + + if self._default_flush is not None: + return self._default_flush + + return not has_async_flush + + @default_flush.setter + def default_flush(self, value: bool) -> None: + self._default_flush = value + def is_api_token_auth(auth: object) -> bool: """:returns True when an API token is hitting the API.""" diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index 3223d19422f53a..80bd6c3d06ba9e 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3850,3 +3850,11 @@ default=[], flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE, ) + +# Global flag to enable API token async flush +register( + "api-token-async-flush", + default=False, + type=Bool, + flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE, +) diff --git a/tests/sentry/models/test_apitoken.py b/tests/sentry/models/test_apitoken.py index 07d08c19339d44..c11d070de99b1b 100644 --- a/tests/sentry/models/test_apitoken.py +++ b/tests/sentry/models/test_apitoken.py @@ -7,11 +7,14 @@ from sentry.conf.server import SENTRY_SCOPE_HIERARCHY_MAPPING, SENTRY_SCOPES from sentry.hybridcloud.models import ApiTokenReplica +from sentry.hybridcloud.models.outbox import ControlOutbox +from sentry.hybridcloud.outbox.category import OutboxCategory, OutboxScope from sentry.models.apitoken import ApiToken, NotSupported, PlaintextSecretAlreadyRead from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation from sentry.sentry_apps.models.sentry_app_installation_token import SentryAppInstallationToken from sentry.silo.base import SiloMode from sentry.testutils.cases import TestCase +from sentry.testutils.helpers.options import override_options from sentry.testutils.outbox import outbox_runner from sentry.testutils.silo import assume_test_silo_mode, control_silo_test from sentry.types.token import AuthTokenType @@ -51,9 +54,10 @@ def test_enforces_scope_hierarchy(self) -> None: assert set(token.get_scopes()) == SENTRY_SCOPE_HIERARCHY_MAPPING[scope] def test_organization_id_for_non_internal(self) -> None: - install = self.create_sentry_app_installation() - token = install.api_token - org_id = token.organization_id + with outbox_runner(): + install = self.create_sentry_app_installation() + token = install.api_token + org_id = token.organization_id with assume_test_silo_mode(SiloMode.REGION): assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id == org_id @@ -63,6 +67,7 @@ def test_organization_id_for_non_internal(self) -> None: with assume_test_silo_mode(SiloMode.REGION): assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id is None + assert token.organization_id is None def test_last_chars_are_set(self) -> None: @@ -143,13 +148,15 @@ def test_default_string_serialization(self) -> None: def test_replica_string_serialization(self) -> None: user = self.create_user() - token = ApiToken.objects.create(user_id=user.id) - with assume_test_silo_mode(SiloMode.REGION): - replica = ApiTokenReplica.objects.get(apitoken_id=token.id) - assert ( - f"{replica} is swug" - == f"replica_token_id={replica.id}, token_id={token.id} is swug" - ) + with outbox_runner(): + token = ApiToken.objects.create(user_id=user.id) + + with assume_test_silo_mode(SiloMode.REGION): + replica = ApiTokenReplica.objects.get(apitoken_id=token.id) + assert ( + f"{replica} is swug" + == f"replica_token_id={replica.id}, token_id={token.id} is swug" + ) def test_delete_token_removes_replica(self) -> None: user = self.create_user() @@ -186,6 +193,90 @@ def test_handle_async_deletion_called(self, mock_delete_replica: mock.MagicMock) region_name=mock.ANY, ) + @override_options({"api-token-async-flush": True}) + def test_outboxes_created_with_default_flush_false(self) -> None: + user = self.create_user() + + token = ApiToken.objects.create(user_id=user.id) + + outboxes = ControlOutbox.objects.filter( + shard_scope=OutboxScope.USER_SCOPE, + shard_identifier=user.id, + category=OutboxCategory.API_TOKEN_UPDATE, + object_identifier=token.id, + ) + assert outboxes.exists() + assert outboxes.count() > 0 + + with assume_test_silo_mode(SiloMode.REGION): + assert not ApiTokenReplica.objects.filter(apitoken_id=token.id).exists() + + @override_options({"api-token-async-flush": True}) + def test_outboxes_created_on_update_with_async_flush(self) -> None: + user = self.create_user() + + with outbox_runner(): + token = ApiToken.objects.create(user_id=user.id) + + updated_expires_at = timezone.now() + timedelta(days=30) + token.update(expires_at=updated_expires_at) + + outboxes = ControlOutbox.objects.filter( + shard_scope=OutboxScope.USER_SCOPE, + shard_identifier=user.id, + category=OutboxCategory.API_TOKEN_UPDATE, + object_identifier=token.id, + ) + assert outboxes.exists() + assert outboxes.count() > 0 + + with assume_test_silo_mode(SiloMode.REGION): + replica = ApiTokenReplica.objects.get(apitoken_id=token.id) + assert replica.expires_at != updated_expires_at + + @override_options({"api-token-async-flush": True}) + def test_async_replication_creates_replica_after_processing(self) -> None: + user = self.create_user() + + with self.tasks(): + token = ApiToken.objects.create(user_id=user.id) + + # Verify outboxes were processed (should be deleted after processing) + remaining_outboxes = ControlOutbox.objects.filter( + shard_scope=OutboxScope.USER_SCOPE, + shard_identifier=user.id, + category=OutboxCategory.API_TOKEN_UPDATE, + object_identifier=token.id, + ) + assert not remaining_outboxes.exists() + + with assume_test_silo_mode(SiloMode.REGION): + replica = ApiTokenReplica.objects.get(apitoken_id=token.id) + assert replica.hashed_token == token.hashed_token + assert replica.user_id == user.id + + @override_options({"api-token-async-flush": True}) + def test_async_replication_updates_existing_replica(self) -> None: + user = self.create_user() + initial_expires_at = timezone.now() + timedelta(days=1) + updated_expires_at = timezone.now() + timedelta(days=30) + + with self.tasks(): + token = ApiToken.objects.create(user_id=user.id, expires_at=initial_expires_at) + + with assume_test_silo_mode(SiloMode.REGION): + replica = ApiTokenReplica.objects.get(apitoken_id=token.id) + assert replica.expires_at is not None + assert abs((replica.expires_at - initial_expires_at).total_seconds()) < 1 + + with self.tasks(): + token.update(expires_at=updated_expires_at) + + with assume_test_silo_mode(SiloMode.REGION): + replica = ApiTokenReplica.objects.get(apitoken_id=token.id) + assert replica.expires_at is not None + assert abs((replica.expires_at - updated_expires_at).total_seconds()) < 1 + @control_silo_test class ApiTokenInternalIntegrationTest(TestCase): @@ -223,3 +314,27 @@ def test_multiple_tokens_have_correct_organization_id(self) -> None: with assume_test_silo_mode(SiloMode.REGION): assert ApiTokenReplica.objects.get(apitoken_id=token_1.id).organization_id is None assert ApiTokenReplica.objects.get(apitoken_id=token_2.id).organization_id is None + + @override_options({"api-token-async-flush": True}) + @mock.patch("sentry.hybridcloud.tasks.deliver_from_outbox.drain_outbox_shards_control.delay") + def test_async_replication_schedules_drain_task(self, mock_drain_task) -> None: + user = self.create_user() + + token = ApiToken.objects.create(user_id=user.id) + + assert mock_drain_task.called + call_args = mock_drain_task.call_args + assert call_args.kwargs["outbox_name"] == "sentry.ControlOutbox" + + outboxes = ControlOutbox.objects.filter( + shard_scope=OutboxScope.USER_SCOPE, + shard_identifier=user.id, + category=OutboxCategory.API_TOKEN_UPDATE, + object_identifier=token.id, + ) + assert outboxes.exists() + + # Verify the task was called with the correct ID range + outbox_ids = list(outboxes.values_list("id", flat=True)) + assert call_args.kwargs["outbox_identifier_low"] == min(outbox_ids) + assert call_args.kwargs["outbox_identifier_hi"] == max(outbox_ids) + 1