Skip to content

Commit 39adb91

Browse files
fix(tokens): Add async flush outboxes (#105264)
1 parent 5c32e22 commit 39adb91

File tree

4 files changed

+183
-10
lines changed

4 files changed

+183
-10
lines changed

src/sentry/hybridcloud/outbox/base.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ class ControlOutboxProducingModel(Model):
226226

227227
default_flush: bool | None = None
228228
replication_version: int = 1
229+
enqueue_after_flush: bool = False
229230

230231
class Meta:
231232
abstract = True
@@ -238,13 +239,21 @@ def _maybe_prepare_outboxes(self, *, outbox_before_super: bool) -> Generator[Non
238239
transaction.atomic(router.db_for_write(type(self))),
239240
flush=self.default_flush,
240241
):
242+
saved_outboxes = []
241243
if not outbox_before_super:
242244
yield
243245
for outbox in self.outboxes_for_update():
244246
outbox.save()
247+
saved_outboxes.append(outbox.id)
245248
if outbox_before_super:
246249
yield
247250

251+
if not self.default_flush and self.enqueue_after_flush:
252+
transaction.on_commit(
253+
lambda: self._schedule_async_replication(saved_outboxes),
254+
using=router.db_for_write(type(self)),
255+
)
256+
248257
def save(self, *args: Any, **kwds: Any) -> None:
249258
with self._maybe_prepare_outboxes(outbox_before_super=False):
250259
super().save(*args, **kwds)
@@ -260,6 +269,24 @@ def delete(self, *args: Any, **kwds: Any) -> tuple[int, dict[str, Any]]:
260269
def outboxes_for_update(self, shard_identifier: int | None = None) -> list[ControlOutboxBase]:
261270
raise NotImplementedError
262271

272+
def _schedule_async_replication(self, saved_outboxes: list[int]) -> None:
273+
from sentry.hybridcloud.tasks.deliver_from_outbox import drain_outbox_shards_control
274+
275+
if not saved_outboxes:
276+
logger.error(
277+
"missing-outboxes.async-replication",
278+
extra={
279+
"model": self.__class__.__name__,
280+
},
281+
)
282+
return
283+
284+
drain_outbox_shards_control.delay(
285+
outbox_identifier_low=min(saved_outboxes),
286+
outbox_identifier_hi=max(saved_outboxes) + 1,
287+
outbox_name="sentry.ControlOutbox",
288+
)
289+
263290

264291
_CM = TypeVar("_CM", bound=ControlOutboxProducingModel)
265292

src/sentry/models/apitoken.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import base64
44
import hashlib
5+
import logging
56
import re
67
import secrets
78
from collections.abc import Collection, Mapping
@@ -34,6 +35,9 @@
3435
DEFAULT_EXPIRATION = timedelta(days=30)
3536
TOKEN_REDACTED = "***REDACTED***"
3637

38+
logger = logging.getLogger("sentry.apitoken")
39+
40+
3741
# RFC 7636 §4.1: code_verifier is 43-128 unreserved characters
3842
# ABNF: code-verifier = 43*128unreserved
3943
# unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
@@ -172,6 +176,10 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
172176
__relocation_scope__ = {RelocationScope.Global, RelocationScope.Config}
173177
category = OutboxCategory.API_TOKEN_UPDATE
174178

179+
# Outbox settings
180+
enqueue_after_flush = True
181+
_default_flush: bool | None = None
182+
175183
# users can generate tokens without being application-bound
176184
application = FlexibleForeignKey("sentry.ApiApplication", null=True)
177185
user = FlexibleForeignKey("sentry.User")
@@ -554,6 +562,21 @@ def organization_id(self) -> int | None:
554562

555563
return installation.organization_id
556564

565+
@property
566+
def default_flush(self) -> bool:
567+
from sentry import options
568+
569+
has_async_flush = options.get("api-token-async-flush")
570+
571+
if self._default_flush is not None:
572+
return self._default_flush
573+
574+
return not has_async_flush
575+
576+
@default_flush.setter
577+
def default_flush(self, value: bool) -> None:
578+
self._default_flush = value
579+
557580

558581
def is_api_token_auth(auth: object) -> bool:
559582
""":returns True when an API token is hitting the API."""

src/sentry/options/defaults.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3832,3 +3832,11 @@
38323832
default=[],
38333833
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
38343834
)
3835+
3836+
# Global flag to enable API token async flush
3837+
register(
3838+
"api-token-async-flush",
3839+
default=False,
3840+
type=Bool,
3841+
flags=FLAG_ALLOW_EMPTY | FLAG_AUTOMATOR_MODIFIABLE,
3842+
)

tests/sentry/models/test_apitoken.py

Lines changed: 125 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77

88
from sentry.conf.server import SENTRY_SCOPE_HIERARCHY_MAPPING, SENTRY_SCOPES
99
from sentry.hybridcloud.models import ApiTokenReplica
10+
from sentry.hybridcloud.models.outbox import ControlOutbox
11+
from sentry.hybridcloud.outbox.category import OutboxCategory, OutboxScope
1012
from sentry.models.apitoken import ApiToken, NotSupported, PlaintextSecretAlreadyRead
1113
from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation
1214
from sentry.sentry_apps.models.sentry_app_installation_token import SentryAppInstallationToken
1315
from sentry.silo.base import SiloMode
1416
from sentry.testutils.cases import TestCase
17+
from sentry.testutils.helpers.options import override_options
1518
from sentry.testutils.outbox import outbox_runner
1619
from sentry.testutils.silo import assume_test_silo_mode, control_silo_test
1720
from sentry.types.token import AuthTokenType
@@ -51,9 +54,10 @@ def test_enforces_scope_hierarchy(self) -> None:
5154
assert set(token.get_scopes()) == SENTRY_SCOPE_HIERARCHY_MAPPING[scope]
5255

5356
def test_organization_id_for_non_internal(self) -> None:
54-
install = self.create_sentry_app_installation()
55-
token = install.api_token
56-
org_id = token.organization_id
57+
with outbox_runner():
58+
install = self.create_sentry_app_installation()
59+
token = install.api_token
60+
org_id = token.organization_id
5761

5862
with assume_test_silo_mode(SiloMode.REGION):
5963
assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id == org_id
@@ -63,6 +67,7 @@ def test_organization_id_for_non_internal(self) -> None:
6367

6468
with assume_test_silo_mode(SiloMode.REGION):
6569
assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id is None
70+
6671
assert token.organization_id is None
6772

6873
def test_last_chars_are_set(self) -> None:
@@ -143,13 +148,15 @@ def test_default_string_serialization(self) -> None:
143148

144149
def test_replica_string_serialization(self) -> None:
145150
user = self.create_user()
146-
token = ApiToken.objects.create(user_id=user.id)
147-
with assume_test_silo_mode(SiloMode.REGION):
148-
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
149-
assert (
150-
f"{replica} is swug"
151-
== f"replica_token_id={replica.id}, token_id={token.id} is swug"
152-
)
151+
with outbox_runner():
152+
token = ApiToken.objects.create(user_id=user.id)
153+
154+
with assume_test_silo_mode(SiloMode.REGION):
155+
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
156+
assert (
157+
f"{replica} is swug"
158+
== f"replica_token_id={replica.id}, token_id={token.id} is swug"
159+
)
153160

154161
def test_delete_token_removes_replica(self) -> None:
155162
user = self.create_user()
@@ -186,6 +193,90 @@ def test_handle_async_deletion_called(self, mock_delete_replica: mock.MagicMock)
186193
region_name=mock.ANY,
187194
)
188195

196+
@override_options({"api-token-async-flush": True})
197+
def test_outboxes_created_with_default_flush_false(self) -> None:
198+
user = self.create_user()
199+
200+
token = ApiToken.objects.create(user_id=user.id)
201+
202+
outboxes = ControlOutbox.objects.filter(
203+
shard_scope=OutboxScope.USER_SCOPE,
204+
shard_identifier=user.id,
205+
category=OutboxCategory.API_TOKEN_UPDATE,
206+
object_identifier=token.id,
207+
)
208+
assert outboxes.exists()
209+
assert outboxes.count() > 0
210+
211+
with assume_test_silo_mode(SiloMode.REGION):
212+
assert not ApiTokenReplica.objects.filter(apitoken_id=token.id).exists()
213+
214+
@override_options({"api-token-async-flush": True})
215+
def test_outboxes_created_on_update_with_async_flush(self) -> None:
216+
user = self.create_user()
217+
218+
with outbox_runner():
219+
token = ApiToken.objects.create(user_id=user.id)
220+
221+
updated_expires_at = timezone.now() + timedelta(days=30)
222+
token.update(expires_at=updated_expires_at)
223+
224+
outboxes = ControlOutbox.objects.filter(
225+
shard_scope=OutboxScope.USER_SCOPE,
226+
shard_identifier=user.id,
227+
category=OutboxCategory.API_TOKEN_UPDATE,
228+
object_identifier=token.id,
229+
)
230+
assert outboxes.exists()
231+
assert outboxes.count() > 0
232+
233+
with assume_test_silo_mode(SiloMode.REGION):
234+
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
235+
assert replica.expires_at != updated_expires_at
236+
237+
@override_options({"api-token-async-flush": True})
238+
def test_async_replication_creates_replica_after_processing(self) -> None:
239+
user = self.create_user()
240+
241+
with self.tasks():
242+
token = ApiToken.objects.create(user_id=user.id)
243+
244+
# Verify outboxes were processed (should be deleted after processing)
245+
remaining_outboxes = ControlOutbox.objects.filter(
246+
shard_scope=OutboxScope.USER_SCOPE,
247+
shard_identifier=user.id,
248+
category=OutboxCategory.API_TOKEN_UPDATE,
249+
object_identifier=token.id,
250+
)
251+
assert not remaining_outboxes.exists()
252+
253+
with assume_test_silo_mode(SiloMode.REGION):
254+
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
255+
assert replica.hashed_token == token.hashed_token
256+
assert replica.user_id == user.id
257+
258+
@override_options({"api-token-async-flush": True})
259+
def test_async_replication_updates_existing_replica(self) -> None:
260+
user = self.create_user()
261+
initial_expires_at = timezone.now() + timedelta(days=1)
262+
updated_expires_at = timezone.now() + timedelta(days=30)
263+
264+
with self.tasks():
265+
token = ApiToken.objects.create(user_id=user.id, expires_at=initial_expires_at)
266+
267+
with assume_test_silo_mode(SiloMode.REGION):
268+
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
269+
assert replica.expires_at is not None
270+
assert abs((replica.expires_at - initial_expires_at).total_seconds()) < 1
271+
272+
with self.tasks():
273+
token.update(expires_at=updated_expires_at)
274+
275+
with assume_test_silo_mode(SiloMode.REGION):
276+
replica = ApiTokenReplica.objects.get(apitoken_id=token.id)
277+
assert replica.expires_at is not None
278+
assert abs((replica.expires_at - updated_expires_at).total_seconds()) < 1
279+
189280

190281
@control_silo_test
191282
class ApiTokenInternalIntegrationTest(TestCase):
@@ -223,3 +314,27 @@ def test_multiple_tokens_have_correct_organization_id(self) -> None:
223314
with assume_test_silo_mode(SiloMode.REGION):
224315
assert ApiTokenReplica.objects.get(apitoken_id=token_1.id).organization_id is None
225316
assert ApiTokenReplica.objects.get(apitoken_id=token_2.id).organization_id is None
317+
318+
@override_options({"api-token-async-flush": True})
319+
@mock.patch("sentry.hybridcloud.tasks.deliver_from_outbox.drain_outbox_shards_control.delay")
320+
def test_async_replication_schedules_drain_task(self, mock_drain_task) -> None:
321+
user = self.create_user()
322+
323+
token = ApiToken.objects.create(user_id=user.id)
324+
325+
assert mock_drain_task.called
326+
call_args = mock_drain_task.call_args
327+
assert call_args.kwargs["outbox_name"] == "sentry.ControlOutbox"
328+
329+
outboxes = ControlOutbox.objects.filter(
330+
shard_scope=OutboxScope.USER_SCOPE,
331+
shard_identifier=user.id,
332+
category=OutboxCategory.API_TOKEN_UPDATE,
333+
object_identifier=token.id,
334+
)
335+
assert outboxes.exists()
336+
337+
# Verify the task was called with the correct ID range
338+
outbox_ids = list(outboxes.values_list("id", flat=True))
339+
assert call_args.kwargs["outbox_identifier_low"] == min(outbox_ids)
340+
assert call_args.kwargs["outbox_identifier_hi"] == max(outbox_ids) + 1

0 commit comments

Comments
 (0)