Skip to content

Conversation

@Christinarlong
Copy link
Contributor

oh man this one's a doozy

@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Dec 18, 2025

# Schedule async replication if using async mode
if not self._should_flush_outbox():
transaction.on_commit(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I'm not actually sure if this does anything since we've already completed all the prev save steps? But I saw this being used for drain_shard and we'd want to enqueue the replication task after the token has committed 🤷‍♀️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you're scheduling a task, it is best to do that after the transaction has commit as you can ensure that all the records are saved. Without this it is possible for the task to be processed while the transaction has not complete if postgres is slow and kafka is fast.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as duplicate.

@codecov
Copy link

codecov bot commented Dec 18, 2025

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
30761 1 30760 246
View the top 1 failed test(s) by shortest run time
tests.sentry.models.test_apitoken.ApiTokenTest::test_organization_id_for_non_internal
Stack Traces | 4.04s run time
#x1B[1m#x1B[.../sentry/models/test_apitoken.py#x1B[0m:69: in test_organization_id_for_non_internal
    assert ApiTokenReplica.objects.get(apitoken_id=token.id).organization_id is None
#x1B[1m#x1B[31mE   AssertionError: assert 4557306177650688 is None#x1B[0m
#x1B[1m#x1B[31mE    +  where 4557306177650688 = <ApiTokenReplica at 0x7f9ff8cfa0d0: id=28, user_id=535, token='4ea336dad930b4dcd424978cd06a3ae33c96e186039d73ab844623da67c11f9b', application_id=20>.organization_id#x1B[0m
#x1B[1m#x1B[31mE    +    where <ApiTokenReplica at 0x7f9ff8cfa0d0: id=28, user_id=535, token='4ea336dad930b4dcd424978cd06a3ae33c96e186039d73ab844623da67c11f9b', application_id=20> = <bound method QuerySet.get of <sentry.db.models.manager.base.BaseManager object at 0x7fa09dc6cc20>>(apitoken_id=28)#x1B[0m
#x1B[1m#x1B[31mE    +      where <bound method QuerySet.get of <sentry.db.models.manager.base.BaseManager object at 0x7fa09dc6cc20>> = <sentry.db.models.manager.base.BaseManager object at 0x7fa09dc6cc20>.get#x1B[0m
#x1B[1m#x1B[31mE    +        where <sentry.db.models.manager.base.BaseManager object at 0x7fa09dc6cc20> = ApiTokenReplica.objects#x1B[0m
#x1B[1m#x1B[31mE    +      and   28 = <ApiToken at 0x7f9ff8cef020: id=28, user_id=535, token='4ea336dad930b4dcd424978cd06a3ae33c96e186039d73ab844623da67c11f9b', application_id=20>.id#x1B[0m

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Comment on lines 282 to 294
has_async_flush = self.user_id in options.get("users:api-token-async-flush")
logger.info(
"async_flush_check",
extra={
"has_async_flush": has_async_flush,
"user_id": self.user_id,
"token_id": self.id,
},
)
if has_async_flush:
return False

return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in person, but I don't think we need a user-level flag for this. I think a global option that sets the value of the class's default_flush property should be sufficient for us. Having an all-or-nothing toggle is probably okay for this change, and I don't expect the majority of users to even notice the impact of this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting class properties based on options will be tricky as classes are imported during django's startup before the ORM is ready. The current approach of overriding _maybe_prepare_outboxes will work though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ended up doing some property decorator shenanigans since that should be loaded after the startup 🤔

Comment on lines 331 to 335
drain_outbox_shards_control.delay(
outbox_identifier_low=first_row.id,
outbox_identifier_hi=last_row.id + 1,
outbox_name="sentry.ControlOutbox",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should move this entire function to the parent ControlOutbox or even Outbox class to trigger instead. Being able to enqueue a task to more quickly drain a shard seems like a useful feature in other contexts, beyond just this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a new task that accepts the shard_scope, shard_identifier, category, and object_identifier as parameters, and that task could do the range query + call process_outbox_batch() 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slapped it into the parent ControlOutboxProducingModel


# Schedule async replication if using async mode
if not self._should_flush_outbox():
transaction.on_commit(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you're scheduling a task, it is best to do that after the transaction has commit as you can ensure that all the records are saved. Without this it is possible for the task to be processed while the transaction has not complete if postgres is slow and kafka is fast.

Comment on lines 282 to 294
has_async_flush = self.user_id in options.get("users:api-token-async-flush")
logger.info(
"async_flush_check",
extra={
"has_async_flush": has_async_flush,
"user_id": self.user_id,
"token_id": self.id,
},
)
if has_async_flush:
return False

return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting class properties based on options will be tricky as classes are imported during django's startup before the ORM is ready. The current approach of overriding _maybe_prepare_outboxes will work though.

Comment on lines 331 to 335
drain_outbox_shards_control.delay(
outbox_identifier_low=first_row.id,
outbox_identifier_hi=last_row.id + 1,
outbox_name="sentry.ControlOutbox",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a new task that accepts the shard_scope, shard_identifier, category, and object_identifier as parameters, and that task could do the range query + call process_outbox_batch() 🤷

@Christinarlong Christinarlong requested a review from a team as a code owner January 5, 2026 22:28
Copy link
Member

@markstory markstory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks correct to me.

@Christinarlong Christinarlong merged commit 39adb91 into master Jan 6, 2026
65 checks passed
@Christinarlong Christinarlong deleted the crl/async-outbox branch January 6, 2026 22:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Scope: Backend Automatically applied to PRs that change backend components

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants