Skip to content
27 changes: 27 additions & 0 deletions src/sentry/hybridcloud/outbox/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class ControlOutboxProducingModel(Model):

default_flush: bool | None = None
replication_version: int = 1
enqueue_after_flush: bool = False

class Meta:
abstract = True
Expand All @@ -238,13 +239,21 @@ def _maybe_prepare_outboxes(self, *, outbox_before_super: bool) -> Generator[Non
transaction.atomic(router.db_for_write(type(self))),
flush=self.default_flush,
):
saved_outboxes = []
if not outbox_before_super:
yield
for outbox in self.outboxes_for_update():
outbox.save()
saved_outboxes.append(outbox.id)
if outbox_before_super:
yield

if not self.default_flush and self.enqueue_after_flush:
transaction.on_commit(
lambda: self._schedule_async_replication(saved_outboxes),
using=router.db_for_write(type(self)),
)

def save(self, *args: Any, **kwds: Any) -> None:
with self._maybe_prepare_outboxes(outbox_before_super=False):
super().save(*args, **kwds)
Expand All @@ -260,6 +269,24 @@ def delete(self, *args: Any, **kwds: Any) -> tuple[int, dict[str, Any]]:
def outboxes_for_update(self, shard_identifier: int | None = None) -> list[ControlOutboxBase]:
raise NotImplementedError

def _schedule_async_replication(self, saved_outboxes: list[int]) -> None:
from sentry.hybridcloud.tasks.deliver_from_outbox import drain_outbox_shards_control

if not saved_outboxes:
logger.error(
"missing-outboxes.async-replication",
extra={
"model": self.__class__.__name__,
},
)
return

drain_outbox_shards_control.delay(
outbox_identifier_low=min(saved_outboxes),
outbox_identifier_hi=max(saved_outboxes) + 1,
outbox_name="sentry.ControlOutbox",
)


_CM = TypeVar("_CM", bound=ControlOutboxProducingModel)

Expand Down
23 changes: 23 additions & 0 deletions src/sentry/models/apitoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import base64
import hashlib
import logging
import re
import secrets
from collections.abc import Collection, Mapping
Expand Down Expand Up @@ -34,6 +35,9 @@
DEFAULT_EXPIRATION = timedelta(days=30)
TOKEN_REDACTED = "***REDACTED***"

logger = logging.getLogger("sentry.apitoken")


# RFC 7636 §4.1: code_verifier is 43-128 unreserved characters
# ABNF: code-verifier = 43*128unreserved
# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
Expand Down Expand Up @@ -172,6 +176,10 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
__relocation_scope__ = {RelocationScope.Global, RelocationScope.Config}
category = OutboxCategory.API_TOKEN_UPDATE

# Outbox settings
enqueue_after_flush = True
_default_flush: bool | None = None

# users can generate tokens without being application-bound
application = FlexibleForeignKey("sentry.ApiApplication", null=True)
user = FlexibleForeignKey("sentry.User")
Expand Down Expand Up @@ -554,6 +562,21 @@ def organization_id(self) -> int | None:

return installation.organization_id

@property
def default_flush(self) -> bool:
from sentry import options

has_async_flush = options.get("api-token-async-flush")

if self._default_flush is not None:
return self._default_flush

return not has_async_flush

@default_flush.setter
def default_flush(self, value: bool) -> None:
self._default_flush = value


def is_api_token_auth(auth: object) -> bool:
""":returns True when an API token is hitting the API."""
Expand Down
8 changes: 8 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3850,3 +3850,11 @@
default=[],
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
)

# Global flag to enable API token async flush
register(
"api-token-async-flush",
default=False,
type=Bool,
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
)
135 changes: 125 additions & 10 deletions tests/sentry/models/test_apitoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@

from sentry.conf.server import SENTRY_SCOPE_HIERARCHY_MAPPING, SENTRY_SCOPES
from sentry.hybridcloud.models import ApiTokenReplica
from sentry.hybridcloud.models.outbox import ControlOutbox
from sentry.hybridcloud.outbox.category import OutboxCategory, OutboxScope
from sentry.models.apitoken import ApiToken, NotSupported, PlaintextSecretAlreadyRead
from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation
from sentry.sentry_apps.models.sentry_app_installation_token import SentryAppInstallationToken
from sentry.silo.base import SiloMode
from sentry.testutils.cases import TestCase
from sentry.testutils.helpers.options import override_options
from sentry.testutils.outbox import outbox_runner
from sentry.testutils.silo import assume_test_silo_mode, control_silo_test
from sentry.types.token import AuthTokenType
Expand Down Expand Up @@ -51,9 +54,10 @@ def test_enforces_scope_hierarchy(self) -> None:
assert set(token.get_scopes()) == SENTRY_SCOPE_HIERARCHY_MAPPING[scope]

def test_organization_id_for_non_internal(self) -> None:
install = self.create_sentry_app_installation()
token = install.api_token
org_id = token.organization_id
with outbox_runner():
install = self.create_sentry_app_installation()
token = install.api_token
org_id = token.organization_id

with assume_test_silo_mode(SiloMode.REGION):
assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id == org_id
Expand All @@ -63,6 +67,7 @@ def test_organization_id_for_non_internal(self) -> None:

with assume_test_silo_mode(SiloMode.REGION):
assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id is None

assert token.organization_id is None

def test_last_chars_are_set(self) -> None:
Expand Down Expand Up @@ -143,13 +148,15 @@ def test_default_string_serialization(self) -> None:

def test_replica_string_serialization(self) -> None:
user = self.create_user()
token = ApiToken.objects.create(user_id=user.id)
with assume_test_silo_mode(SiloMode.REGION):
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
assert (
f"{replica} is swug"
== f"replica_token_id={replica.id}, token_id={token.id} is swug"
)
with outbox_runner():
token = ApiToken.objects.create(user_id=user.id)

with assume_test_silo_mode(SiloMode.REGION):
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
assert (
f"{replica} is swug"
== f"replica_token_id={replica.id}, token_id={token.id} is swug"
)

def test_delete_token_removes_replica(self) -> None:
user = self.create_user()
Expand Down Expand Up @@ -186,6 +193,90 @@ def test_handle_async_deletion_called(self, mock_delete_replica: mock.MagicMock)
region_name=mock.ANY,
)

@override_options({"api-token-async-flush": True})
def test_outboxes_created_with_default_flush_false(self) -> None:
user = self.create_user()

token = ApiToken.objects.create(user_id=user.id)

outboxes = ControlOutbox.objects.filter(
shard_scope=OutboxScope.USER_SCOPE,
shard_identifier=user.id,
category=OutboxCategory.API_TOKEN_UPDATE,
object_identifier=token.id,
)
assert outboxes.exists()
assert outboxes.count() > 0

with assume_test_silo_mode(SiloMode.REGION):
assert not ApiTokenReplica.objects.filter(apitoken_id=token.id).exists()

@override_options({"api-token-async-flush": True})
def test_outboxes_created_on_update_with_async_flush(self) -> None:
user = self.create_user()

with outbox_runner():
token = ApiToken.objects.create(user_id=user.id)

updated_expires_at = timezone.now() + timedelta(days=30)
token.update(expires_at=updated_expires_at)

outboxes = ControlOutbox.objects.filter(
shard_scope=OutboxScope.USER_SCOPE,
shard_identifier=user.id,
category=OutboxCategory.API_TOKEN_UPDATE,
object_identifier=token.id,
)
assert outboxes.exists()
assert outboxes.count() > 0

with assume_test_silo_mode(SiloMode.REGION):
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
assert replica.expires_at != updated_expires_at

@override_options({"api-token-async-flush": True})
def test_async_replication_creates_replica_after_processing(self) -> None:
user = self.create_user()

with self.tasks():
token = ApiToken.objects.create(user_id=user.id)

# Verify outboxes were processed (should be deleted after processing)
remaining_outboxes = ControlOutbox.objects.filter(
shard_scope=OutboxScope.USER_SCOPE,
shard_identifier=user.id,
category=OutboxCategory.API_TOKEN_UPDATE,
object_identifier=token.id,
)
assert not remaining_outboxes.exists()

with assume_test_silo_mode(SiloMode.REGION):
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
assert replica.hashed_token == token.hashed_token
assert replica.user_id == user.id

@override_options({"api-token-async-flush": True})
def test_async_replication_updates_existing_replica(self) -> None:
user = self.create_user()
initial_expires_at = timezone.now() + timedelta(days=1)
updated_expires_at = timezone.now() + timedelta(days=30)

with self.tasks():
token = ApiToken.objects.create(user_id=user.id, expires_at=initial_expires_at)

with assume_test_silo_mode(SiloMode.REGION):
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
assert replica.expires_at is not None
assert abs((replica.expires_at - initial_expires_at).total_seconds()) < 1

with self.tasks():
token.update(expires_at=updated_expires_at)

with assume_test_silo_mode(SiloMode.REGION):
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
assert replica.expires_at is not None
assert abs((replica.expires_at - updated_expires_at).total_seconds()) < 1


@control_silo_test
class ApiTokenInternalIntegrationTest(TestCase):
Expand Down Expand Up @@ -223,3 +314,27 @@ def test_multiple_tokens_have_correct_organization_id(self) -> None:
with assume_test_silo_mode(SiloMode.REGION):
assert ApiTokenReplica.objects.get(apitoken_id=token_1.id).organization_id is None
assert ApiTokenReplica.objects.get(apitoken_id=token_2.id).organization_id is None

@override_options({"api-token-async-flush": True})
@mock.patch("sentry.hybridcloud.tasks.deliver_from_outbox.drain_outbox_shards_control.delay")
def test_async_replication_schedules_drain_task(self, mock_drain_task) -> None:
user = self.create_user()

token = ApiToken.objects.create(user_id=user.id)

assert mock_drain_task.called
call_args = mock_drain_task.call_args
assert call_args.kwargs["outbox_name"] == "sentry.ControlOutbox"

outboxes = ControlOutbox.objects.filter(
shard_scope=OutboxScope.USER_SCOPE,
shard_identifier=user.id,
category=OutboxCategory.API_TOKEN_UPDATE,
object_identifier=token.id,
)
assert outboxes.exists()

# Verify the task was called with the correct ID range
outbox_ids = list(outboxes.values_list("id", flat=True))
assert call_args.kwargs["outbox_identifier_low"] == min(outbox_ids)
assert call_args.kwargs["outbox_identifier_hi"] == max(outbox_ids) + 1
Loading