diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt
index 1fe6fd448dd8e2..decba53c5b0398 100644
--- a/migrations_lockfile.txt
+++ b/migrations_lockfile.txt
@@ -31,7 +31,7 @@ releases: 0004_cleanup_failed_safe_deletes
replays: 0007_organizationmember_replay_access
-sentry: 1016_remove_on_command_phrase_trigger
+sentry: 1017_add_apidevicecode
social_auth: 0003_social_auth_json_field
diff --git a/src/sentry/migrations/1017_add_apidevicecode.py b/src/sentry/migrations/1017_add_apidevicecode.py
new file mode 100644
index 00000000000000..511fccaf08ddb4
--- /dev/null
+++ b/src/sentry/migrations/1017_add_apidevicecode.py
@@ -0,0 +1,116 @@
+# Generated by Django 5.2.8 on 2026-01-05
+
+import django.contrib.postgres.fields
+import django.db.models.deletion
+import django.utils.timezone
+from django.conf import settings
+from django.db import migrations, models
+
+import sentry.db.models.fields.bounded
+import sentry.db.models.fields.foreignkey
+import sentry.db.models.fields.hybrid_cloud_foreign_key
+import sentry.models.apidevicecode
+from sentry.new_migrations.migrations import CheckedMigration
+
+
+class Migration(CheckedMigration):
+ # This flag is used to mark that a migration shouldn't be automatically run in production.
+ # This should only be used for operations where it's safe to run the migration after your
+ # code has deployed. So this should not be used for most operations that alter the schema
+ # of a table.
+ # Here are some things that make sense to mark as post deployment:
+ # - Large data migrations. Typically we want these to be run manually so that they can be
+ # monitored and not block the deploy for a long period of time while they run.
+ # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to
+ # run this outside deployments so that we don't block them. Note that while adding an index
+ # is a schema change, it's completely safe to run the operation after the code has deployed.
+ # Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment
+
+ is_post_deployment = False
+
+ dependencies = [
+ ("sentry", "1016_remove_on_command_phrase_trigger"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="ApiDeviceCode",
+ fields=[
+ (
+ "id",
+ sentry.db.models.fields.bounded.BoundedBigAutoField(
+ primary_key=True, serialize=False
+ ),
+ ),
+ (
+ "device_code",
+ models.CharField(
+ default=sentry.models.apidevicecode.generate_device_code,
+ max_length=64,
+ unique=True,
+ ),
+ ),
+ (
+ "user_code",
+ models.CharField(
+ unique=True,
+ default=sentry.models.apidevicecode.generate_user_code,
+ max_length=16,
+ ),
+ ),
+ (
+ "application",
+ sentry.db.models.fields.foreignkey.FlexibleForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ to="sentry.apiapplication",
+ ),
+ ),
+ (
+ "user",
+ sentry.db.models.fields.foreignkey.FlexibleForeignKey(
+ null=True,
+ on_delete=django.db.models.deletion.CASCADE,
+ to=settings.AUTH_USER_MODEL,
+ ),
+ ),
+ (
+ "organization_id",
+ sentry.db.models.fields.hybrid_cloud_foreign_key.HybridCloudForeignKey(
+ "sentry.Organization",
+ db_index=True,
+ null=True,
+ on_delete="CASCADE",
+ ),
+ ),
+ (
+ "scope_list",
+ django.contrib.postgres.fields.ArrayField(
+ base_field=models.TextField(),
+ default=list,
+ size=None,
+ ),
+ ),
+ (
+ "expires_at",
+ models.DateTimeField(
+ db_index=True,
+ default=sentry.models.apidevicecode.default_expiration,
+ ),
+ ),
+ (
+ "status",
+ models.CharField(
+ default="pending",
+ max_length=20,
+ ),
+ ),
+ (
+ "date_added",
+ models.DateTimeField(default=django.utils.timezone.now),
+ ),
+ ],
+ options={
+ "db_table": "sentry_apidevicecode",
+ },
+ ),
+ ]
diff --git a/src/sentry/models/__init__.py b/src/sentry/models/__init__.py
index 9f1cfb384b7d9d..8a66d245524152 100644
--- a/src/sentry/models/__init__.py
+++ b/src/sentry/models/__init__.py
@@ -1,6 +1,7 @@
from .activity import * # NOQA
from .apiapplication import * # NOQA
from .apiauthorization import * # NOQA
+from .apidevicecode import * # NOQA
from .apigrant import * # NOQA
from .apikey import * # NOQA
from .apiscopes import * # NOQA
diff --git a/src/sentry/models/apidevicecode.py b/src/sentry/models/apidevicecode.py
new file mode 100644
index 00000000000000..216087cbbc365f
--- /dev/null
+++ b/src/sentry/models/apidevicecode.py
@@ -0,0 +1,212 @@
+from __future__ import annotations
+
+import secrets
+from datetime import timedelta
+from enum import StrEnum
+from typing import Any
+
+from django.contrib.postgres.fields.array import ArrayField
+from django.db import IntegrityError, models
+from django.utils import timezone
+
+from sentry.backup.dependencies import NormalizedModelName, get_model_name
+from sentry.backup.sanitize import SanitizableField, Sanitizer
+from sentry.backup.scopes import RelocationScope
+from sentry.db.models import FlexibleForeignKey, Model, control_silo_model
+from sentry.db.models.fields.hybrid_cloud_foreign_key import HybridCloudForeignKey
+
+# RFC 8628 recommends short lifetimes for device codes (10-15 minutes)
+DEFAULT_EXPIRATION = timedelta(minutes=10)
+
+# Default polling interval in seconds (RFC 8628 §3.2)
+DEFAULT_INTERVAL = 5
+
+# Base-20 alphabet for user codes: excludes ambiguous characters (0/O, 1/I/L, etc.)
+# This provides ~34 bits of entropy for 8-character codes, sufficient with rate limiting.
+# Reference: RFC 8628 §5.1
+USER_CODE_ALPHABET = "BCDFGHJKLMNPQRSTVWXZ"
+USER_CODE_LENGTH = 8
+USER_CODE_GROUP_LENGTH = USER_CODE_LENGTH // 2 # Characters per group in "XXXX-XXXX" format
+
+# Device code entropy: 32 bytes = 256 bits
+DEVICE_CODE_BYTES = 32
+
+
+def default_expiration():
+ return timezone.now() + DEFAULT_EXPIRATION
+
+
+def generate_device_code():
+ """Generate a cryptographically secure device code (256-bit entropy)."""
+ return secrets.token_hex(nbytes=DEVICE_CODE_BYTES)
+
+
+def generate_user_code():
+ """
+ Generate a human-readable user code in format "XXXX-XXXX".
+
+ Uses base-20 alphabet to avoid ambiguous characters, providing ~34 bits
+ of entropy which is sufficient when combined with rate limiting.
+ Reference: RFC 8628 §5.1
+ """
+ chars = [secrets.choice(USER_CODE_ALPHABET) for _ in range(USER_CODE_LENGTH)]
+ return f"{''.join(chars[:USER_CODE_GROUP_LENGTH])}-{''.join(chars[USER_CODE_GROUP_LENGTH:])}"
+
+
+# Maximum retries for generating unique codes
+MAX_CODE_GENERATION_RETRIES = 10
+
+
+class UserCodeCollisionError(Exception):
+ """Raised when unable to generate a unique user code after maximum retries."""
+
+
+class DeviceCodeStatus(StrEnum):
+ """Status values for device authorization codes."""
+
+ PENDING = "pending"
+ APPROVED = "approved"
+ DENIED = "denied"
+
+
+@control_silo_model
+class ApiDeviceCode(Model):
+ """
+ Device authorization code for OAuth 2.0 Device Flow (RFC 8628).
+
+ This model stores the state of a device authorization request, which allows
+ headless devices (CLIs, Docker containers, CI/CD jobs) to obtain OAuth tokens
+ by having users authorize on a separate device with a browser.
+
+ Flow:
+ 1. Device requests authorization via POST /oauth/device/code
+ 2. Server returns device_code (secret) and user_code (human-readable)
+ 3. Device displays user_code and verification_uri to user
+ 4. Device polls POST /oauth/token with device_code
+ 5. User visits verification_uri, enters user_code, and approves/denies
+ 6. On approval, device receives access token on next poll
+
+ Reference: https://datatracker.ietf.org/doc/html/rfc8628
+ """
+
+ __relocation_scope__ = RelocationScope.Global
+
+ # Device code: secret, high-entropy code used for token polling (RFC 8628 §3.2)
+ device_code = models.CharField(max_length=64, unique=True, default=generate_device_code)
+
+ # User code: human-readable code for user entry (RFC 8628 §3.2)
+ # Format: "XXXX-XXXX" using base-20 alphabet
+ # Must be unique since users look up by this code
+ user_code = models.CharField(max_length=16, unique=True, default=generate_user_code)
+
+ # The OAuth application requesting authorization
+ application = FlexibleForeignKey("sentry.ApiApplication")
+
+ # User who approved the request (set when status changes to APPROVED)
+ user = FlexibleForeignKey("sentry.User", null=True, on_delete=models.CASCADE)
+
+ # Organization selected during approval (for org-level access apps)
+ organization_id = HybridCloudForeignKey(
+ "sentry.Organization",
+ db_index=True,
+ null=True,
+ on_delete="CASCADE",
+ )
+
+ # Requested scopes (space-delimited in requests, stored as array)
+ scope_list = ArrayField(models.TextField(), default=list)
+
+ # When this device code expires (RFC 8628 §3.2 expires_in)
+ expires_at = models.DateTimeField(db_index=True, default=default_expiration)
+
+ # Authorization status: pending -> approved/denied
+ status = models.CharField(max_length=20, default=DeviceCodeStatus.PENDING)
+
+ # Timestamps
+ date_added = models.DateTimeField(default=timezone.now)
+
+ class Meta:
+ app_label = "sentry"
+ db_table = "sentry_apidevicecode"
+
+ def __str__(self) -> str:
+ return f"id={self.id}, user_code={self.user_code}, application={self.application_id}, status={self.status}"
+
+ def get_scopes(self) -> list[str]:
+ """Return the list of requested scopes."""
+ return self.scope_list
+
+ def has_scope(self, scope: str) -> bool:
+ """Check if a specific scope was requested."""
+ return scope in self.scope_list
+
+ def is_expired(self) -> bool:
+ """Check if the device code has expired."""
+ return timezone.now() >= self.expires_at
+
+ def is_pending(self) -> bool:
+ """Check if the device code is still awaiting user action."""
+ return self.status == DeviceCodeStatus.PENDING
+
+ def is_approved(self) -> bool:
+ """Check if the user has approved this device code."""
+ return self.status == DeviceCodeStatus.APPROVED
+
+ def is_denied(self) -> bool:
+ """Check if the user has denied this device code."""
+ return self.status == DeviceCodeStatus.DENIED
+
+ @classmethod
+ def get_lock_key(cls, device_code_id: int) -> str:
+ """Return lock key for preventing race conditions during token exchange."""
+ return f"api_device_code:{device_code_id}"
+
+ @classmethod
+ def sanitize_relocation_json(
+ cls, json: Any, sanitizer: Sanitizer, model_name: NormalizedModelName | None = None
+ ) -> None:
+ model_name = get_model_name(cls) if model_name is None else model_name
+ super().sanitize_relocation_json(json, sanitizer, model_name)
+
+ sanitizer.set_string(
+ json, SanitizableField(model_name, "device_code"), lambda _: generate_device_code()
+ )
+ sanitizer.set_string(
+ json, SanitizableField(model_name, "user_code"), lambda _: generate_user_code()
+ )
+
+ @classmethod
+ def create_with_retry(cls, application, scope_list: list[str] | None = None) -> ApiDeviceCode:
+ """
+ Create a new device code with retry logic for user code collisions.
+
+ Since user codes have ~34 bits of entropy, collisions are rare but possible.
+ This method retries with new codes if a collision occurs.
+
+ Args:
+ application: The ApiApplication requesting authorization
+ scope_list: Optional list of requested scopes
+
+ Returns:
+ A new ApiDeviceCode instance
+
+ Raises:
+ UserCodeCollisionError: If unable to generate a unique code after max retries
+ """
+ if scope_list is None:
+ scope_list = []
+
+ last_error: IntegrityError | None = None
+ for attempt in range(MAX_CODE_GENERATION_RETRIES):
+ try:
+ return cls.objects.create(
+ application=application,
+ scope_list=scope_list,
+ )
+ except IntegrityError as e:
+ # Collision on device_code or user_code, retry with new generated codes
+ last_error = e
+
+ raise UserCodeCollisionError(
+ f"Unable to generate unique device code after {MAX_CODE_GENERATION_RETRIES} attempts"
+ ) from last_error
diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py
index 2e9cb821affc3f..14f932dd885b61 100644
--- a/src/sentry/runner/commands/cleanup.py
+++ b/src/sentry/runner/commands/cleanup.py
@@ -554,6 +554,7 @@ def remove_expired_values_for_org_members(
def delete_api_models(
is_filtered: Callable[[type[BaseModel]], bool], models_attempted: set[str]
) -> None:
+ from sentry.models.apidevicecode import ApiDeviceCode
from sentry.models.apigrant import ApiGrant
from sentry.models.apitoken import ApiToken
@@ -576,6 +577,15 @@ def delete_api_models(
queryset.delete()
+ # Device codes have short expiration times (10 minutes), so clean up
+ # any that have expired immediately without additional TTL buffer.
+ if is_filtered(ApiDeviceCode):
+ debug_output(">> Skipping ApiDeviceCode")
+ else:
+ debug_output("Removing expired values for ApiDeviceCode")
+ models_attempted.add(ApiDeviceCode.__name__.lower())
+ ApiDeviceCode.objects.filter(expires_at__lt=timezone.now()).delete()
+
@continue_on_error("specialized_cleanup_exported_data")
def exported_data(
diff --git a/src/sentry/sentry_apps/token_exchange/util.py b/src/sentry/sentry_apps/token_exchange/util.py
index 632e65b0f0ae0d..511e98403703c4 100644
--- a/src/sentry/sentry_apps/token_exchange/util.py
+++ b/src/sentry/sentry_apps/token_exchange/util.py
@@ -9,12 +9,14 @@
AUTHORIZATION = "authorization_code"
REFRESH = "refresh_token"
CLIENT_SECRET_JWT = "urn:sentry:params:oauth:grant-type:jwt-bearer"
+DEVICE_CODE = "urn:ietf:params:oauth:grant-type:device_code"
class GrantTypes:
AUTHORIZATION = AUTHORIZATION
REFRESH = REFRESH
CLIENT_SECRET_JWT = CLIENT_SECRET_JWT
+ DEVICE_CODE = DEVICE_CODE
def token_expiration() -> datetime:
diff --git a/src/sentry/templates/sentry/oauth-device-authorize.html b/src/sentry/templates/sentry/oauth-device-authorize.html
new file mode 100644
index 00000000000000..999fb4e7119366
--- /dev/null
+++ b/src/sentry/templates/sentry/oauth-device-authorize.html
@@ -0,0 +1,70 @@
+{% extends "sentry/bases/auth.html" %}
+
+{% load crispy_forms_tags %}
+{% load sentry_avatars %}
+
+{% block title %}Authorize Device | {{ block.super }}{% endblock %}
+
+{% block auth_main %}
+
+{% endblock %}
diff --git a/src/sentry/templates/sentry/oauth-device-complete.html b/src/sentry/templates/sentry/oauth-device-complete.html
new file mode 100644
index 00000000000000..87cfa946e25aaa
--- /dev/null
+++ b/src/sentry/templates/sentry/oauth-device-complete.html
@@ -0,0 +1,24 @@
+{% extends "sentry/bases/auth.html" %}
+
+{% block title %}Device Authorization Complete | {{ block.super }}{% endblock %}
+
+{% block auth_main %}
+
+ {% if application %}
+
+
Success!
+
{{ message }}
+
You can now close this tab and return to your device.
+
+ {% else %}
+
+
{{ message }}
+
You can now close this tab.
+
+ {% endif %}
+
+
+
+{% endblock %}
diff --git a/src/sentry/templates/sentry/oauth-device.html b/src/sentry/templates/sentry/oauth-device.html
new file mode 100644
index 00000000000000..d0acf214ae773b
--- /dev/null
+++ b/src/sentry/templates/sentry/oauth-device.html
@@ -0,0 +1,39 @@
+{% extends "sentry/bases/auth.html" %}
+
+{% load crispy_forms_tags %}
+
+{% block title %}Authorize Device | {{ block.super }}{% endblock %}
+
+{% block auth_main %}
+
+{% endblock %}
diff --git a/src/sentry/testutils/helpers/backups.py b/src/sentry/testutils/helpers/backups.py
index 6faf0330207a9d..a74778b3c8485e 100644
--- a/src/sentry/testutils/helpers/backups.py
+++ b/src/sentry/testutils/helpers/backups.py
@@ -55,6 +55,7 @@
from sentry.integrations.models.organization_integration import OrganizationIntegration
from sentry.models.activity import Activity
from sentry.models.apiauthorization import ApiAuthorization
+from sentry.models.apidevicecode import ApiDeviceCode
from sentry.models.apigrant import ApiGrant
from sentry.models.apikey import ApiKey
from sentry.models.apitoken import ApiToken
@@ -872,6 +873,10 @@ def create_exhaustive_sentry_app(self, name: str, owner: User, org: Organization
redirect_uri="https://example.com",
scope_list=["openid", "profile", "email"],
)
+ ApiDeviceCode.objects.create(
+ application=app.application,
+ scope_list=["openid", "profile"],
+ )
# ServiceHook
self.create_service_hook(
diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py
index 490080f7116325..f69d4656bca0d5 100644
--- a/src/sentry/web/frontend/oauth_authorize.py
+++ b/src/sentry/web/frontend/oauth_authorize.py
@@ -246,6 +246,13 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase:
"cc": code_challenge,
"ccm": code_challenge_method if code_challenge else None,
}
+ # TODO(dcramer): Using a single "oa2" session key means multiple tabs authorizing
+ # different applications will overwrite each other's session data. If a user has
+ # Tab A (App A) and Tab B (App B) open, whichever tab they opened last will have
+ # its payload in the session. Approving from Tab A would then authorize App B.
+ # Consider using a unique transaction ID per authorization request, stored either
+ # in the URL or as a per-request session key (e.g., oa2:{tx_id}).
+ # See oauth_device.py for an example using user_code as a natural unique key.
request.session["oa2"] = payload
if not request.user.is_authenticated:
diff --git a/src/sentry/web/frontend/oauth_device.py b/src/sentry/web/frontend/oauth_device.py
new file mode 100644
index 00000000000000..0ef0cb3c8b76d6
--- /dev/null
+++ b/src/sentry/web/frontend/oauth_device.py
@@ -0,0 +1,415 @@
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from django.conf import settings
+from django.db import IntegrityError, router, transaction
+from django.http import HttpRequest
+from django.http.response import HttpResponseBase
+
+from sentry.models.apiapplication import ApiApplicationStatus
+from sentry.models.apiauthorization import ApiAuthorization
+from sentry.models.apidevicecode import (
+ USER_CODE_GROUP_LENGTH,
+ USER_CODE_LENGTH,
+ ApiDeviceCode,
+ DeviceCodeStatus,
+)
+from sentry.ratelimits import backend as ratelimiter
+from sentry.users.services.user.service import user_service
+from sentry.utils import metrics
+from sentry.web.frontend.auth_login import AuthLoginView
+
+logger = logging.getLogger("sentry.oauth")
+
+# Rate limiting for user code verification (RFC 8628 §5.1)
+# Limits brute force attempts on the 8-character user code (~34 bits entropy)
+USER_CODE_RATE_LIMIT_WINDOW = 60 # seconds
+USER_CODE_RATE_LIMIT = 10 # max attempts per window per IP
+
+# Standardized error messages
+ERR_INVALID_CODE = "Invalid or expired code. Please check the code and try again."
+ERR_EXPIRED_CODE = "This code has expired. Please request a new code from your device."
+ERR_RATE_LIMITED = "Too many attempts. Please wait a minute and try again."
+ERR_NO_ORG_ACCESS = (
+ "This application requires organization-level access. "
+ "You must be a member of an organization to authorize this application."
+)
+ERR_SESSION_EXPIRED = "Your session has expired. Please start over."
+ERR_INVALID_REQUEST = "Invalid request. Please start over."
+ERR_INVALID_OP = "Invalid operation."
+ERR_SELECT_ORG = "Please select an organization."
+ERR_INVALID_ORG = "Invalid organization selection."
+ERR_NO_ORG_PERMISSION = "You don't have access to the selected organization."
+
+
+def _normalize_user_code(user_code: str) -> str:
+ """
+ Normalize a user code to the canonical format "XXXX-XXXX".
+
+ Handles case variations, missing dashes, and extra whitespace.
+ """
+ normalized = user_code.replace("-", "").upper().strip()
+ if len(normalized) == USER_CODE_LENGTH:
+ return f"{normalized[:USER_CODE_GROUP_LENGTH]}-{normalized[USER_CODE_GROUP_LENGTH:]}"
+ return user_code.upper().strip()
+
+
+class OAuthDeviceView(AuthLoginView):
+ """
+ Device verification page for OAuth 2.0 Device Flow (RFC 8628 §3.3).
+
+ This view handles the user-facing part of the device authorization flow,
+ where users enter the user_code displayed by their device and approve
+ or deny the authorization request.
+
+ Flow:
+ 1. GET /oauth/device - Show form to enter user_code (or with ?user_code=XXX)
+ 2. POST /oauth/device - Verify code and show approval form
+ 3. POST /oauth/device (op=approve/deny) - Complete verification
+
+ Reference: https://datatracker.ietf.org/doc/html/rfc8628#section-3.3
+ """
+
+ auth_required = False
+
+ def get_next_uri(self, request: HttpRequest) -> str:
+ return request.get_full_path()
+
+ def respond_login(self, request: HttpRequest, context, **kwargs):
+ context["banner"] = "Authorize Device"
+ return self.respond("sentry/login.html", context)
+
+ def _error_response(self, request: HttpRequest, error: str) -> HttpResponseBase:
+ """Return an error response on the device code entry page."""
+ context = self.get_default_context(request) | {
+ "user": request.user,
+ "error": error,
+ }
+ return self.respond("sentry/oauth-device.html", context)
+
+ def _get_validated_device_code(
+ self,
+ request: HttpRequest,
+ *,
+ device_code_id: int | None = None,
+ user_code: str | None = None,
+ ) -> tuple[ApiDeviceCode | None, HttpResponseBase | None]:
+ """
+ Fetch and validate a device code.
+
+ Args:
+ request: The HTTP request
+ device_code_id: Lookup by ID (for approve/deny after session lookup)
+ user_code: Lookup by user code (for initial code entry)
+
+ Returns:
+ (device_code, None) on success
+ (None, error_response) on failure
+ """
+ try:
+ if device_code_id is not None:
+ device_code = ApiDeviceCode.objects.select_related("application").get(
+ id=device_code_id,
+ status=DeviceCodeStatus.PENDING,
+ )
+ elif user_code is not None:
+ formatted_code = _normalize_user_code(user_code)
+ device_code = ApiDeviceCode.objects.select_related("application").get(
+ user_code=formatted_code,
+ status=DeviceCodeStatus.PENDING,
+ )
+ else:
+ return None, self._error_response(request, ERR_INVALID_REQUEST)
+ except ApiDeviceCode.DoesNotExist:
+ return None, self._error_response(request, ERR_INVALID_CODE)
+
+ # Check if expired
+ if device_code.is_expired():
+ device_code.delete()
+ return None, self._error_response(request, ERR_EXPIRED_CODE)
+
+ # Check if application is still active
+ if device_code.application.status != ApiApplicationStatus.active:
+ device_code.delete()
+ return None, self._error_response(request, ERR_INVALID_CODE)
+
+ return device_code, None
+
+ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase:
+ # Check if user_code was provided in query string (verification_uri_complete)
+ user_code = request.GET.get("user_code", "").upper().strip()
+
+ if not request.user.is_authenticated:
+ # Store user_code in session for after login
+ if user_code:
+ request.session["device_user_code"] = user_code
+ return super().get(request, **kwargs)
+
+ # If we have a user_code, try to look it up and show the approval form
+ if user_code:
+ return self._show_approval_form(request, user_code)
+
+ # Check if we stored a user_code in session during login
+ stored_code = request.session.pop("device_user_code", None)
+ if stored_code:
+ return self._show_approval_form(request, stored_code)
+
+ # Otherwise, show the user_code entry form
+ context = self.get_default_context(request) | {
+ "user": request.user,
+ }
+ return self.respond("sentry/oauth-device.html", context)
+
+ def _show_approval_form(self, request: HttpRequest, user_code: str) -> HttpResponseBase:
+ """Show the approval form for a valid user code."""
+ # Rate limit user code verification attempts (RFC 8628 §5.1)
+ # Note: REMOTE_ADDR is set correctly by SetRemoteAddrFromForwardedFor middleware
+ client_ip = request.META.get("REMOTE_ADDR")
+ rate_limit_key = f"oauth:device_verify:{client_ip}"
+ if ratelimiter.is_limited(
+ rate_limit_key, limit=USER_CODE_RATE_LIMIT, window=USER_CODE_RATE_LIMIT_WINDOW
+ ):
+ logger.warning(
+ "oauth.device-verification-rate-limited",
+ extra={"ip": client_ip, "user_code_prefix": user_code[:4] if user_code else None},
+ )
+ return self._error_response(request, ERR_RATE_LIMITED)
+
+ # Validate the device code
+ device_code, error_response = self._get_validated_device_code(request, user_code=user_code)
+ if error_response:
+ return error_response
+
+ assert device_code is not None
+ application = device_code.application
+ scopes = device_code.get_scopes()
+
+ # Build permissions list (same logic as oauth_authorize)
+ permissions = []
+ if scopes:
+ pending_scopes = set(scopes)
+ matched_sets = set()
+ for scope_set in settings.SENTRY_SCOPE_SETS:
+ for scope, description in scope_set:
+ if scope_set in matched_sets and scope in pending_scopes:
+ pending_scopes.remove(scope)
+ elif scope in pending_scopes:
+ permissions.append(description)
+ matched_sets.add(scope_set)
+ pending_scopes.remove(scope)
+
+ if pending_scopes:
+ raise NotImplementedError(f"{pending_scopes} scopes did not have descriptions")
+
+ # Get organization options if needed
+ if application.requires_org_level_access:
+ organization_options = user_service.get_organizations(
+ user_id=request.user.id, only_visible=True
+ )
+ if not organization_options:
+ return self._error_response(request, ERR_NO_ORG_ACCESS)
+ else:
+ organization_options = []
+
+ # Store device code ID in session keyed by user_code
+ # This allows multiple authorization flows in parallel (different user codes)
+ # Unlike oauth_authorize which uses a single "oa2" key, we key by user_code
+ # to avoid the multi-tab overwrite issue
+ session_key = f"oa2:{device_code.user_code}"
+ request.session[session_key] = {
+ "device_code_id": device_code.id,
+ "user_id": request.user.id,
+ }
+
+ context = self.get_default_context(request) | {
+ "user": request.user,
+ "application": application,
+ "scopes": scopes,
+ "permissions": permissions,
+ "organization_options": organization_options,
+ "user_code": device_code.user_code,
+ }
+ return self.respond("sentry/oauth-device-authorize.html", context)
+
+ def _handle_deny(self, request: HttpRequest, device_code: ApiDeviceCode) -> HttpResponseBase:
+ """Handle deny action for a device code."""
+ # Atomically mark as denied only if still pending (prevents race with approve)
+ updated = ApiDeviceCode.objects.filter(
+ id=device_code.id,
+ status=DeviceCodeStatus.PENDING,
+ ).update(status=DeviceCodeStatus.DENIED)
+
+ if not updated:
+ # Another request already processed this device code
+ return self._error_response(request, ERR_INVALID_CODE)
+
+ metrics.incr("oauth_device.deny", sample_rate=1.0)
+ logger.info(
+ "oauth.device-code-denied",
+ extra={
+ "device_code_id": device_code.id,
+ "application_id": device_code.application.id,
+ "user_id": request.user.id,
+ },
+ )
+
+ context = self.get_default_context(request) | {
+ "user": request.user,
+ "message": "Authorization denied. You can close this window.",
+ }
+ return self.respond("sentry/oauth-device-complete.html", context)
+
+ def _handle_approve(self, request: HttpRequest, device_code: ApiDeviceCode) -> HttpResponseBase:
+ """Handle approve action for a device code."""
+ # request.user.id is guaranteed to be int since we only reach here when authenticated
+ assert isinstance(request.user.id, int)
+ user_id: int = request.user.id
+
+ application = device_code.application
+
+ # Handle organization selection for org-level access apps
+ selected_org_id_int: int | None = None
+
+ if application.requires_org_level_access:
+ selected_organization_id = request.POST.get("selected_organization_id")
+ if not selected_organization_id:
+ return self._error_response(request, ERR_SELECT_ORG)
+
+ # Validate user has access to the selected organization
+ user_orgs = user_service.get_organizations(user_id=request.user.id, only_visible=True)
+ org_ids = {org.id for org in user_orgs}
+
+ try:
+ selected_org_id_int = int(selected_organization_id)
+ except (ValueError, TypeError):
+ return self._error_response(request, ERR_INVALID_ORG)
+
+ if selected_org_id_int not in org_ids:
+ return self._error_response(request, ERR_NO_ORG_PERMISSION)
+
+ scopes = device_code.get_scopes()
+
+ # Atomically mark as approved only if still pending (prevents race condition)
+ # This must happen first to claim the device code before creating authorization
+ updated = ApiDeviceCode.objects.filter(
+ id=device_code.id,
+ status=DeviceCodeStatus.PENDING,
+ ).update(
+ status=DeviceCodeStatus.APPROVED,
+ user_id=user_id,
+ organization_id=selected_org_id_int,
+ )
+ if not updated:
+ # Another request already processed this device code
+ return self._error_response(request, ERR_INVALID_CODE)
+
+ # Create or update ApiAuthorization record (same pattern as oauth_authorize)
+ # The try/except must be OUTSIDE the atomic block because PostgreSQL aborts
+ # the transaction on IntegrityError, preventing subsequent DB operations.
+ try:
+ with transaction.atomic(router.db_for_write(ApiAuthorization)):
+ ApiAuthorization.objects.create(
+ application=application,
+ user_id=user_id,
+ scope_list=scopes,
+ organization_id=selected_org_id_int,
+ )
+ except IntegrityError:
+ # Authorization already exists, merge in any new scopes
+ if scopes:
+ auth = ApiAuthorization.objects.get(
+ application=application,
+ user_id=user_id,
+ organization_id=selected_org_id_int,
+ )
+ auth.scope_list = list(set(auth.scope_list) | set(scopes))
+ auth.save(update_fields=["scope_list"])
+
+ metrics.incr(
+ "oauth_device.approve",
+ sample_rate=1.0,
+ tags={"org_level_access": application.requires_org_level_access},
+ )
+ logger.info(
+ "oauth.device-code-approved",
+ extra={
+ "device_code_id": device_code.id,
+ "application_id": device_code.application.id,
+ "user_id": user_id,
+ "organization_id": selected_org_id_int,
+ },
+ )
+
+ context = self.get_default_context(request) | {
+ "user": request.user,
+ "message": f"Authorization approved! Your device should now be connected to {application.name}. You can close this window.",
+ "application": application,
+ }
+ return self.respond("sentry/oauth-device-complete.html", context)
+
+ def _logged_out_post(self, request: HttpRequest, **kwargs: Any) -> HttpResponseBase:
+ """Handle POST when user is not logged in."""
+ response = super().post(request, **kwargs)
+ if request.user.is_authenticated:
+ # Regenerate session to prevent session fixation
+ request.session.cycle_key()
+ return response
+
+ def post(self, request: HttpRequest, **kwargs) -> HttpResponseBase:
+ if not request.user.is_authenticated:
+ return self._logged_out_post(request, **kwargs)
+
+ # Check if this is an approve/deny action (from the approval form)
+ # Must check op FIRST since approval form also includes user_code as hidden field
+ op = request.POST.get("op")
+ user_code = request.POST.get("user_code", "")
+
+ if op:
+ return self._handle_op(request, op, user_code)
+ elif user_code:
+ # User code submission from entry form - show approval form
+ return self._show_approval_form(request, user_code)
+ else:
+ return self._error_response(request, ERR_INVALID_REQUEST)
+
+ def _handle_op(self, request: HttpRequest, op: str, user_code: str) -> HttpResponseBase:
+ """Handle approve/deny operation from the approval form."""
+ if op not in ("approve", "deny"):
+ return self._error_response(request, ERR_INVALID_OP)
+
+ # Normalize and validate user_code
+ user_code = _normalize_user_code(user_code)
+ if not user_code:
+ return self._error_response(request, ERR_INVALID_REQUEST)
+
+ # Look up session data (keyed by user_code for multi-tab support)
+ session_key = f"oa2:{user_code}"
+ session_data = request.session.get(session_key)
+
+ if not session_data:
+ return self._error_response(request, ERR_SESSION_EXPIRED)
+
+ device_code_id = session_data.get("device_code_id")
+ stored_user_id = session_data.get("user_id")
+
+ if not device_code_id or stored_user_id != request.user.id:
+ return self._error_response(request, ERR_SESSION_EXPIRED)
+
+ # Validate the device code
+ device_code, error_response = self._get_validated_device_code(
+ request, device_code_id=device_code_id
+ )
+ if error_response:
+ return error_response
+
+ assert device_code is not None
+
+ # Clear session data before processing
+ request.session.pop(session_key, None)
+
+ if op == "deny":
+ return self._handle_deny(request, device_code)
+ else:
+ return self._handle_approve(request, device_code)
diff --git a/src/sentry/web/frontend/oauth_device_authorization.py b/src/sentry/web/frontend/oauth_device_authorization.py
new file mode 100644
index 00000000000000..bafea7f5ec3364
--- /dev/null
+++ b/src/sentry/web/frontend/oauth_device_authorization.py
@@ -0,0 +1,202 @@
+from __future__ import annotations
+
+import logging
+
+from django.conf import settings
+from django.http import HttpRequest, HttpResponse
+from django.utils.decorators import method_decorator
+from django.views.decorators.cache import never_cache
+from django.views.decorators.csrf import csrf_exempt
+from django.views.generic.base import View
+
+from sentry.models.apiapplication import ApiApplication, ApiApplicationStatus
+from sentry.models.apidevicecode import (
+ DEFAULT_EXPIRATION,
+ DEFAULT_INTERVAL,
+ ApiDeviceCode,
+ UserCodeCollisionError,
+)
+from sentry.utils import json, metrics
+from sentry.utils.http import absolute_uri
+from sentry.web.frontend.base import control_silo_view
+
+logger = logging.getLogger("sentry.oauth")
+
+
+@control_silo_view
+class OAuthDeviceAuthorizationView(View):
+ """
+ OAuth 2.0 Device Authorization Endpoint (RFC 8628 §3.1/§3.2).
+
+ This endpoint initiates the device authorization flow for headless clients
+ (CLIs, Docker containers, CI/CD jobs) that cannot use browser-based OAuth.
+
+ Request (POST /oauth/device/code):
+ client_id (required): The OAuth application's client ID
+ scope (optional): Space-delimited list of requested scopes
+
+ Response (200 OK, application/json):
+ device_code: Secret code for token endpoint polling
+ user_code: Human-readable code for user to enter
+ verification_uri: URL where user should go to authorize
+ verification_uri_complete: URL with embedded user_code (optional)
+ expires_in: Lifetime of codes in seconds
+ interval: Minimum polling interval in seconds
+
+ Errors:
+ invalid_client: Unknown or inactive client_id
+ invalid_scope: Requested scope is invalid or exceeds app permissions
+
+ Reference: https://datatracker.ietf.org/doc/html/rfc8628#section-3.1
+ """
+
+ @csrf_exempt
+ @method_decorator(never_cache)
+ def dispatch(self, request, *args, **kwargs):
+ return super().dispatch(request, *args, **kwargs)
+
+ def error(
+ self,
+ request: HttpRequest,
+ name: str,
+ description: str | None = None,
+ status: int = 400,
+ ) -> HttpResponse:
+ """Return a JSON error response per RFC 8628."""
+ client_id = request.POST.get("client_id")
+
+ logger.error(
+ "oauth.device-authorization-error",
+ extra={
+ "error_name": name,
+ "status": status,
+ "client_id": client_id,
+ "description": description,
+ },
+ )
+
+ response_data = {"error": name}
+ if description:
+ response_data["error_description"] = description
+
+ return HttpResponse(
+ json.dumps(response_data),
+ content_type="application/json",
+ status=status,
+ )
+
+ def post(self, request: HttpRequest) -> HttpResponse:
+ """
+ Handle device authorization request (RFC 8628 §3.1).
+
+ Creates a new device code and user code pair that can be used to
+ complete the device authorization flow.
+ """
+ client_id = request.POST.get("client_id")
+
+ # client_id is required (RFC 8628 §3.1)
+ if not client_id:
+ return self.error(
+ request,
+ name="invalid_client",
+ description="Missing required parameter: client_id",
+ status=401,
+ )
+
+ # Validate the application exists and is active
+ try:
+ application = ApiApplication.objects.get(
+ client_id=client_id,
+ status=ApiApplicationStatus.active,
+ )
+ except ApiApplication.DoesNotExist:
+ return self.error(
+ request,
+ name="invalid_client",
+ description="Invalid client_id",
+ status=401,
+ )
+
+ # Parse and validate scopes
+ scope_param = request.POST.get("scope", "")
+ scopes = scope_param.split() if scope_param else []
+
+ # Validate scopes against global allowed scopes
+ for scope in scopes:
+ if scope not in settings.SENTRY_SCOPES:
+ return self.error(
+ request,
+ name="invalid_scope",
+ description=f"Unknown scope: {scope}",
+ )
+
+ # For org-level access apps, validate scopes against app's max scopes
+ if application.requires_org_level_access:
+ max_scopes = application.scopes
+ for scope in scopes:
+ if scope not in max_scopes:
+ return self.error(
+ request,
+ name="invalid_scope",
+ description=f"Scope '{scope}' exceeds application permissions",
+ )
+
+ # Create the device code with retry logic for collisions
+ # Note: device_code and user_code are auto-generated by model defaults
+ try:
+ device_code = ApiDeviceCode.create_with_retry(
+ application=application,
+ scope_list=scopes,
+ )
+ except UserCodeCollisionError:
+ logger.exception(
+ "oauth.device-authorization-collision",
+ extra={"client_id": client_id, "application_id": application.id},
+ )
+ return self.error(
+ request,
+ name="server_error",
+ description="Unable to generate device code. Please try again.",
+ status=500,
+ )
+
+ # Build the verification URIs
+ verification_uri = absolute_uri("/oauth/device/")
+ verification_uri_complete = f"{verification_uri}?user_code={device_code.user_code}"
+
+ # Calculate expires_in from the expiration time
+ expires_in = int(DEFAULT_EXPIRATION.total_seconds())
+
+ metrics.incr(
+ "oauth_device_authorization.create",
+ sample_rate=1.0,
+ tags={"has_scopes": bool(scopes)},
+ )
+ logger.info(
+ "oauth.device-authorization-created",
+ extra={
+ "client_id": client_id,
+ "application_id": application.id,
+ "device_code_id": device_code.id,
+ "scopes": scopes,
+ },
+ )
+
+ # Return the device authorization response (RFC 8628 §3.2)
+ return HttpResponse(
+ json.dumps(
+ {
+ "device_code": device_code.device_code,
+ "user_code": device_code.user_code,
+ "verification_uri": verification_uri,
+ "verification_uri_complete": verification_uri_complete,
+ "expires_in": expires_in,
+ "interval": DEFAULT_INTERVAL,
+ }
+ ),
+ content_type="application/json",
+ )
+
+ def get(self, request: HttpRequest) -> HttpResponse:
+ """GET is not supported for device authorization (RFC 8628 §3.1)."""
+ return HttpResponse(status=405)
diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py
index adccfb2e5eb680..685f263751055b 100644
--- a/src/sentry/web/frontend/oauth_token.py
+++ b/src/sentry/web/frontend/oauth_token.py
@@ -5,7 +5,7 @@
from datetime import datetime
from typing import Literal, NotRequired, TypedDict
-from django.db import router
+from django.db import router, transaction
from django.http import HttpRequest, HttpResponse
from django.utils import timezone
from django.utils.decorators import method_decorator
@@ -15,12 +15,16 @@
from rest_framework.request import Request
from sentry import options
+from sentry.locks import locks
from sentry.models.apiapplication import ApiApplication, ApiApplicationStatus
+from sentry.models.apidevicecode import DEFAULT_INTERVAL, ApiDeviceCode, DeviceCodeStatus
from sentry.models.apigrant import ApiGrant, ExpiredGrantError, InvalidGrantError
from sentry.models.apitoken import ApiToken
+from sentry.ratelimits import backend as ratelimiter
from sentry.sentry_apps.token_exchange.util import GrantTypes
from sentry.silo.safety import unguarded_write
from sentry.utils import json, metrics
+from sentry.utils.locking import UnableToAcquireLock
from sentry.web.frontend.base import control_silo_view
from sentry.web.frontend.openidtoken import OpenIDToken
@@ -86,7 +90,7 @@ def post(self, request: Request) -> HttpResponse:
Purpose
- Exchanges an authorization code for tokens, or uses a refresh token to
- obtain a new access token.
+ obtain a new access token, or exchanges a device code for tokens.
Supported grant types
- `authorization_code` (RFC 6749 §4.1): requires `code` and, if bound,
@@ -94,6 +98,8 @@ def post(self, request: Request) -> HttpResponse:
signing is configured, an `id_token` (OIDC Core 1.0) is included.
- `refresh_token` (RFC 6749 §6): requires `refresh_token`. Supplying `scope`
is not supported here and returns `invalid_request`.
+ - `urn:ietf:params:oauth:grant-type:device_code` (RFC 8628 §3.4): requires
+ `device_code`. Used by headless clients to poll for authorization.
Client authentication
- Either Authorization header (Basic) or form fields `client_id`/`client_secret`
@@ -109,6 +115,8 @@ def post(self, request: Request) -> HttpResponse:
- Errors (RFC 6749 §5.2): 400 JSON for `invalid_request`, `invalid_grant`,
`unsupported_grant_type`; 401 JSON for `invalid_client` (with
`WWW-Authenticate: Basic realm="oauth"`).
+ - Device flow errors (RFC 8628 §3.5): `authorization_pending`, `slow_down`,
+ `expired_token`, `access_denied`.
"""
grant_type = request.POST.get("grant_type")
@@ -136,7 +144,7 @@ def post(self, request: Request) -> HttpResponse:
if not grant_type:
return self.error(request=request, name="invalid_request", reason="missing grant_type")
- if grant_type not in [GrantTypes.AUTHORIZATION, GrantTypes.REFRESH]:
+ if grant_type not in [GrantTypes.AUTHORIZATION, GrantTypes.REFRESH, GrantTypes.DEVICE_CODE]:
return self.error(request=request, name="unsupported_grant_type")
try:
@@ -179,6 +187,13 @@ def post(self, request: Request) -> HttpResponse:
# SentryAppInstallation.api_grant, which is a cross-model write
with unguarded_write(using=router.db_for_write(ApiGrant)):
ApiGrant.objects.filter(application=application, code=code).delete()
+ # For device_code, invalidate the device code
+ elif grant_type == GrantTypes.DEVICE_CODE:
+ device_code_value = request.POST.get("device_code")
+ if device_code_value:
+ ApiDeviceCode.objects.filter(
+ application=application, device_code=device_code_value
+ ).delete()
# Use invalid_grant per RFC 6749 §5.2: grants/tokens are effectively "revoked"
# when the application is deactivated. invalid_client would be incorrect here
# since client authentication succeeded (we verified the credentials).
@@ -209,6 +224,8 @@ def post(self, request: Request) -> HttpResponse:
if grant_type == GrantTypes.AUTHORIZATION:
token_data = self.get_access_tokens(request=request, application=application)
+ elif grant_type == GrantTypes.DEVICE_CODE:
+ return self.handle_device_code_grant(request=request, application=application)
else:
token_data = self.get_refresh_token(request=request, application=application)
if "error" in token_data:
@@ -370,6 +387,187 @@ def get_refresh_token(self, request: Request, application: ApiApplication) -> di
return {"token": refresh_token}
+ def handle_device_code_grant(
+ self, request: Request, application: ApiApplication
+ ) -> HttpResponse:
+ """
+ Handle device code grant type (RFC 8628 §3.4).
+
+ This is used by headless clients to poll for authorization status after
+ initiating a device authorization flow.
+
+ Returns:
+ - On success (approved): Access token response
+ - authorization_pending: User hasn't completed authorization yet
+ - slow_down: Client is polling too fast
+ - expired_token: Device code has expired
+ - access_denied: User denied the authorization
+ """
+ device_code_value = request.POST.get("device_code")
+
+ if not device_code_value:
+ return self.error(
+ request=request,
+ name="invalid_request",
+ reason="missing device_code",
+ )
+
+ # Rate limit polling per device_code (RFC 8628 §3.5)
+ # Allow 1 request per interval (default 5 seconds) = 12 requests/minute
+ rate_limit_key = f"oauth:device_poll:{device_code_value}"
+ if ratelimiter.is_limited(rate_limit_key, limit=1, window=DEFAULT_INTERVAL):
+ return self.error(
+ request=request,
+ name="slow_down",
+ reason="polling too fast",
+ )
+
+ # Look up the device code
+ try:
+ device_code = ApiDeviceCode.objects.get(
+ device_code=device_code_value,
+ application=application,
+ )
+ except ApiDeviceCode.DoesNotExist:
+ return self.error(
+ request=request,
+ name="invalid_grant",
+ reason="invalid device_code",
+ )
+
+ # Check if expired (RFC 8628 §3.5)
+ if device_code.is_expired():
+ device_code.delete()
+ return self.error(
+ request=request,
+ name="expired_token",
+ reason="device code expired",
+ )
+
+ # Check authorization status (RFC 8628 §3.5)
+ if device_code.status == DeviceCodeStatus.PENDING:
+ # User hasn't completed authorization yet
+ return self.error(
+ request=request,
+ name="authorization_pending",
+ reason="user authorization pending",
+ )
+ elif device_code.status == DeviceCodeStatus.DENIED:
+ # User denied the authorization
+ device_code.delete()
+ return self.error(
+ request=request,
+ name="access_denied",
+ reason="user denied authorization",
+ )
+ elif device_code.status == DeviceCodeStatus.APPROVED:
+ # Use locking to prevent race condition where multiple requests
+ # could create tokens for the same device code (TOCTOU)
+ lock = locks.get(
+ ApiDeviceCode.get_lock_key(device_code.id),
+ duration=10,
+ name="api_device_code",
+ )
+
+ try:
+ lock_context = lock.acquire()
+ except UnableToAcquireLock:
+ # Another request is currently processing this device code
+ return self.error(
+ request=request,
+ name="invalid_grant",
+ reason="device code already in use",
+ )
+
+ with lock_context:
+ # Re-fetch inside lock to prevent TOCTOU race condition
+ try:
+ device_code = ApiDeviceCode.objects.get(id=device_code.id)
+ except ApiDeviceCode.DoesNotExist:
+ # Another request already processed this device code
+ return self.error(
+ request=request,
+ name="invalid_grant",
+ reason="invalid device_code",
+ )
+
+ # Re-check status inside lock
+ if device_code.status != DeviceCodeStatus.APPROVED:
+ return self.error(
+ request=request,
+ name="invalid_grant",
+ reason="device code in invalid state",
+ )
+
+ # Re-check expiration inside lock (could have expired during lock wait)
+ if device_code.is_expired():
+ device_code.delete()
+ return self.error(
+ request=request,
+ name="expired_token",
+ reason="device code expired",
+ )
+
+ # User approved - issue tokens
+ if device_code.user is None:
+ # This shouldn't happen, but handle it gracefully
+ logger.error(
+ "Device code approved but no user set",
+ extra={
+ "device_code_id": device_code.id,
+ "application_id": application.id,
+ },
+ )
+ device_code.delete()
+ return self.error(
+ request=request,
+ name="invalid_grant",
+ reason="device code in invalid state",
+ )
+
+ # Use a transaction to ensure token creation and device code deletion
+ # are atomic. This prevents duplicate tokens if delete fails after
+ # token creation succeeds.
+ with transaction.atomic(router.db_for_write(ApiToken)):
+ # Create the access token
+ token = ApiToken.objects.create(
+ application=application,
+ user_id=device_code.user.id,
+ scope_list=device_code.scope_list,
+ scoping_organization_id=device_code.organization_id,
+ )
+
+ # Delete the device code (one-time use)
+ device_code.delete()
+
+ metrics.incr("oauth_device.token_exchange", sample_rate=1.0)
+ logger.info(
+ "oauth.device-code-exchanged",
+ extra={
+ "device_code_id": device_code.id,
+ "application_id": application.id,
+ "user_id": device_code.user.id,
+ "token_id": token.id,
+ },
+ )
+
+ return self.process_token_details(token=token)
+
+ # Unknown status - shouldn't happen
+ logger.error(
+ "Device code has unknown status",
+ extra={
+ "device_code_id": device_code.id,
+ "status": device_code.status,
+ },
+ )
+ device_code.delete()
+ return self.error(
+ request=request,
+ name="invalid_grant",
+ reason="device code in invalid state",
+ )
+
def process_token_details(
self, token: ApiToken, id_token: OpenIDToken | None = None
) -> HttpResponse:
diff --git a/src/sentry/web/urls.py b/src/sentry/web/urls.py
index 8dc19362f7e46a..e9b0353e042a83 100644
--- a/src/sentry/web/urls.py
+++ b/src/sentry/web/urls.py
@@ -40,6 +40,8 @@
from sentry.web.frontend.js_sdk_loader import JavaScriptSdkLoader
from sentry.web.frontend.mailgun_inbound_webhook import MailgunInboundWebhookView
from sentry.web.frontend.oauth_authorize import OAuthAuthorizeView
+from sentry.web.frontend.oauth_device import OAuthDeviceView
+from sentry.web.frontend.oauth_device_authorization import OAuthDeviceAuthorizationView
from sentry.web.frontend.oauth_token import OAuthTokenView
from sentry.web.frontend.organization_auth_settings import OrganizationAuthSettingsView
from sentry.web.frontend.organization_avatar import OrganizationAvatarPhotoView
@@ -208,6 +210,17 @@
OAuthUserInfoEndpoint.as_view(),
name="sentry-api-0-oauth-userinfo",
),
+ # Device Authorization Flow (RFC 8628)
+ re_path(
+ r"^device/code/$",
+ OAuthDeviceAuthorizationView.as_view(),
+ name="sentry-oauth-device-code",
+ ),
+ re_path(
+ r"^device/$",
+ OAuthDeviceView.as_view(),
+ name="sentry-oauth-device",
+ ),
]
),
),
diff --git a/static/app/data/controlsiloUrlPatterns.ts b/static/app/data/controlsiloUrlPatterns.ts
index bcafc44a816382..4771628b768af7 100644
--- a/static/app/data/controlsiloUrlPatterns.ts
+++ b/static/app/data/controlsiloUrlPatterns.ts
@@ -160,7 +160,10 @@ const patterns: RegExp[] = [
new RegExp('^api/0/tempest-ips/$'),
new RegExp('^api/0/secret-scanning/github/$'),
new RegExp('^api/hooks/mailgun/inbound/'),
+ new RegExp('^auth-v2/'),
new RegExp('^oauth/authorize/$'),
+ new RegExp('^oauth/device/$'),
+ new RegExp('^oauth/device/code/$'),
new RegExp('^oauth/token/$'),
new RegExp('^oauth/userinfo/$'),
new RegExp('^saml/acs/[^/]+/$'),
diff --git a/tests/sentry/web/frontend/test_oauth_device.py b/tests/sentry/web/frontend/test_oauth_device.py
new file mode 100644
index 00000000000000..f8f256ba4c8dbe
--- /dev/null
+++ b/tests/sentry/web/frontend/test_oauth_device.py
@@ -0,0 +1,219 @@
+from datetime import timedelta
+from functools import cached_property
+from unittest.mock import patch
+
+from django.utils import timezone
+
+from sentry.models.apiapplication import ApiApplication
+from sentry.models.apiauthorization import ApiAuthorization
+from sentry.models.apidevicecode import ApiDeviceCode, DeviceCodeStatus
+from sentry.testutils.cases import TestCase
+from sentry.testutils.silo import control_silo_test
+
+
+@control_silo_test
+class OAuthDeviceVerificationTest(TestCase):
+ """Tests for the OAuth 2.0 Device Verification UI (RFC 8628 §3.3)."""
+
+ @cached_property
+ def path(self) -> str:
+ return "/oauth/device/"
+
+ def setUp(self) -> None:
+ super().setUp()
+ self.application = ApiApplication.objects.create(
+ owner=self.user, redirect_uris="https://example.com", name="Test App"
+ )
+ self.device_code = ApiDeviceCode.objects.create(
+ application=self.application,
+ scope_list=["project:read", "org:read"],
+ )
+
+ def test_get_unauthenticated_shows_login(self) -> None:
+ """GET without authentication should show login form."""
+ resp = self.client.get(self.path)
+ assert resp.status_code == 200
+ assert b"login" in resp.content.lower() or b"sign in" in resp.content.lower()
+
+ def test_get_with_user_code_stores_in_session(self) -> None:
+ """GET with user_code parameter should store it in session for after login."""
+ resp = self.client.get(f"{self.path}?user_code={self.device_code.user_code}")
+ assert resp.status_code == 200
+ assert self.client.session.get("device_user_code") == self.device_code.user_code.upper()
+
+ def test_get_authenticated_shows_entry_form(self) -> None:
+ """GET while authenticated should show user code entry form."""
+ self.login_as(self.user)
+ resp = self.client.get(self.path)
+ assert resp.status_code == 200
+ assert b"user_code" in resp.content.lower()
+
+ def test_get_with_valid_user_code_shows_approval_form(self) -> None:
+ """GET with valid user_code should show approval form."""
+ self.login_as(self.user)
+ resp = self.client.get(f"{self.path}?user_code={self.device_code.user_code}")
+ assert resp.status_code == 200
+ assert b"Test App" in resp.content
+ assert b"approve" in resp.content.lower()
+
+ def test_get_with_invalid_user_code_shows_error(self) -> None:
+ """GET with invalid user_code should show error."""
+ self.login_as(self.user)
+ resp = self.client.get(f"{self.path}?user_code=XXXX-XXXX")
+ assert resp.status_code == 200
+ assert b"invalid" in resp.content.lower() or b"expired" in resp.content.lower()
+
+ def test_post_user_code_shows_approval_form(self) -> None:
+ """POST with user_code should show approval form."""
+ self.login_as(self.user)
+ resp = self.client.post(self.path, {"user_code": self.device_code.user_code})
+ assert resp.status_code == 200
+ assert b"Test App" in resp.content
+ assert b"approve" in resp.content.lower()
+
+ def test_post_user_code_normalized(self) -> None:
+ """POST with user_code without dash should work."""
+ self.login_as(self.user)
+ # Remove dash from user code
+ code_without_dash = self.device_code.user_code.replace("-", "")
+ resp = self.client.post(self.path, {"user_code": code_without_dash})
+ assert resp.status_code == 200
+ assert b"Test App" in resp.content
+
+ def test_post_user_code_case_insensitive(self) -> None:
+ """POST with lowercase user_code should work."""
+ self.login_as(self.user)
+ resp = self.client.post(self.path, {"user_code": self.device_code.user_code.lower()})
+ assert resp.status_code == 200
+ assert b"Test App" in resp.content
+
+ def test_approve_creates_authorization(self) -> None:
+ """POST approve should create ApiAuthorization and mark device code approved."""
+ self.login_as(self.user)
+
+ # First show approval form to set up session
+ self.client.post(self.path, {"user_code": self.device_code.user_code})
+
+ # Then approve (user_code in hidden field identifies the session)
+ resp = self.client.post(
+ self.path, {"op": "approve", "user_code": self.device_code.user_code}
+ )
+ assert resp.status_code == 200
+ assert b"approved" in resp.content.lower()
+
+ # Verify device code was approved
+ self.device_code.refresh_from_db()
+ assert self.device_code.status == DeviceCodeStatus.APPROVED
+ assert self.device_code.user is not None
+ assert self.device_code.user.id == self.user.id
+
+ # Verify ApiAuthorization was created
+ auth = ApiAuthorization.objects.get(application=self.application, user_id=self.user.id)
+ assert set(auth.scope_list) == {"project:read", "org:read"}
+
+ def test_deny_marks_device_code_denied(self) -> None:
+ """POST deny should mark device code as denied."""
+ self.login_as(self.user)
+
+ # First show approval form to set up session
+ self.client.post(self.path, {"user_code": self.device_code.user_code})
+
+ # Then deny
+ resp = self.client.post(self.path, {"op": "deny", "user_code": self.device_code.user_code})
+ assert resp.status_code == 200
+ assert b"denied" in resp.content.lower()
+
+ # Verify device code was denied
+ self.device_code.refresh_from_db()
+ assert self.device_code.status == DeviceCodeStatus.DENIED
+
+ def test_expired_device_code_shows_error(self) -> None:
+ """Expired device code should show error."""
+ self.login_as(self.user)
+ self.device_code.expires_at = timezone.now() - timedelta(minutes=1)
+ self.device_code.save()
+
+ resp = self.client.post(self.path, {"user_code": self.device_code.user_code})
+ assert resp.status_code == 200
+ assert b"expired" in resp.content.lower()
+
+ # Device code should be deleted
+ assert not ApiDeviceCode.objects.filter(id=self.device_code.id).exists()
+
+ def test_invalid_operation_shows_error(self) -> None:
+ """Invalid operation should show error."""
+ self.login_as(self.user)
+
+ # First show approval form to set up session
+ self.client.post(self.path, {"user_code": self.device_code.user_code})
+
+ # Then submit invalid operation
+ resp = self.client.post(
+ self.path, {"op": "invalid", "user_code": self.device_code.user_code}
+ )
+ assert resp.status_code == 200
+ assert b"invalid" in resp.content.lower()
+
+ def test_session_expired_shows_error(self) -> None:
+ """Missing user_code should show error."""
+ self.login_as(self.user)
+
+ # Submit without user_code
+ resp = self.client.post(self.path, {"op": "approve"})
+ assert resp.status_code == 200
+ assert b"invalid request" in resp.content.lower() or b"start over" in resp.content.lower()
+
+ def test_invalid_user_code_in_session_shows_error(self) -> None:
+ """User code not in session should show session expired error."""
+ self.login_as(self.user)
+
+ # Submit with user_code that has no corresponding session
+ resp = self.client.post(self.path, {"op": "approve", "user_code": "FAKE-CODE"})
+ assert resp.status_code == 200
+ assert b"session expired" in resp.content.lower() or b"start over" in resp.content.lower()
+
+ def test_rate_limiting(self) -> None:
+ """Rate limiting should prevent brute force attacks."""
+ self.login_as(self.user)
+
+ # Mock the rate limiter to simulate being rate limited
+ with patch("sentry.web.frontend.oauth_device.ratelimiter") as mock_ratelimiter:
+ mock_ratelimiter.is_limited.return_value = True
+
+ resp = self.client.post(self.path, {"user_code": "XXXX-XXXX"})
+ assert resp.status_code == 200
+ assert b"too many" in resp.content.lower() or b"wait" in resp.content.lower()
+
+
+@control_silo_test
+class OAuthDeviceVerificationOrgLevelTest(TestCase):
+ """Tests for org-level access applications in device flow.
+
+ Note: Full approval flow testing with organizations is limited in control_silo_test
+ due to cross-silo RPC limitations. Core org-level validation is tested here.
+ """
+
+ @cached_property
+ def path(self) -> str:
+ return "/oauth/device/"
+
+ def test_org_level_app_requires_user_with_organization(self) -> None:
+ """Users without organizations cannot use org-level access apps."""
+ application = ApiApplication.objects.create(
+ owner=self.user,
+ redirect_uris="https://example.com",
+ name="Org Level App",
+ requires_org_level_access=True,
+ scopes=["project:read", "org:read"],
+ )
+ device_code = ApiDeviceCode.objects.create(
+ application=application,
+ scope_list=["project:read"],
+ )
+ self.login_as(self.user)
+
+ # In control_silo_test, user_service.get_organizations returns empty
+ # This correctly shows the "must be member of organization" error
+ resp = self.client.post(self.path, {"user_code": device_code.user_code})
+ assert resp.status_code == 200
+ assert b"member of an organization" in resp.content
diff --git a/tests/sentry/web/frontend/test_oauth_device_authorization.py b/tests/sentry/web/frontend/test_oauth_device_authorization.py
new file mode 100644
index 00000000000000..c1e9711681e57e
--- /dev/null
+++ b/tests/sentry/web/frontend/test_oauth_device_authorization.py
@@ -0,0 +1,160 @@
+from functools import cached_property
+
+from sentry.models.apiapplication import ApiApplication, ApiApplicationStatus
+from sentry.models.apidevicecode import ApiDeviceCode
+from sentry.testutils.cases import TestCase
+from sentry.testutils.silo import control_silo_test
+from sentry.utils import json
+
+
+@control_silo_test
+class OAuthDeviceAuthorizationTest(TestCase):
+ """Tests for the OAuth 2.0 Device Authorization endpoint (RFC 8628 §3.1/§3.2)."""
+
+ @cached_property
+ def path(self) -> str:
+ return "/oauth/device/code/"
+
+ def setUp(self) -> None:
+ super().setUp()
+ self.application = ApiApplication.objects.create(
+ owner=self.user, redirect_uris="https://example.com"
+ )
+
+ def test_get_not_allowed(self) -> None:
+ """GET method should not be allowed."""
+ resp = self.client.get(self.path)
+ assert resp.status_code == 405
+
+ def test_missing_client_id(self) -> None:
+ """Missing client_id should return invalid_client error."""
+ resp = self.client.post(self.path, {})
+ assert resp.status_code == 401
+ data = json.loads(resp.content)
+ assert data["error"] == "invalid_client"
+ assert "client_id" in data.get("error_description", "")
+
+ def test_invalid_client_id(self) -> None:
+ """Invalid client_id should return invalid_client error."""
+ resp = self.client.post(self.path, {"client_id": "invalid"})
+ assert resp.status_code == 401
+ data = json.loads(resp.content)
+ assert data["error"] == "invalid_client"
+
+ def test_inactive_application(self) -> None:
+ """Inactive application should return invalid_client error."""
+ self.application.status = ApiApplicationStatus.inactive
+ self.application.save()
+
+ resp = self.client.post(self.path, {"client_id": self.application.client_id})
+ assert resp.status_code == 401
+ data = json.loads(resp.content)
+ assert data["error"] == "invalid_client"
+
+ def test_success_no_scope(self) -> None:
+ """Successful request without scope should return device code."""
+ resp = self.client.post(self.path, {"client_id": self.application.client_id})
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ assert "device_code" in data
+ assert "user_code" in data
+ assert "verification_uri" in data
+ assert "verification_uri_complete" in data
+ assert "expires_in" in data
+ assert "interval" in data
+
+ # Verify device code was created
+ device_code = ApiDeviceCode.objects.get(device_code=data["device_code"])
+ assert device_code.application == self.application
+ assert device_code.scope_list == []
+ assert device_code.user_code == data["user_code"]
+
+ def test_success_with_scope(self) -> None:
+ """Successful request with scope should return device code."""
+ resp = self.client.post(
+ self.path,
+ {"client_id": self.application.client_id, "scope": "project:read org:read"},
+ )
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ device_code = ApiDeviceCode.objects.get(device_code=data["device_code"])
+ assert set(device_code.scope_list) == {"project:read", "org:read"}
+
+ def test_invalid_scope(self) -> None:
+ """Invalid scope should return invalid_scope error."""
+ resp = self.client.post(
+ self.path,
+ {"client_id": self.application.client_id, "scope": "invalid:scope"},
+ )
+ assert resp.status_code == 400
+ data = json.loads(resp.content)
+ assert data["error"] == "invalid_scope"
+
+ def test_user_code_format(self) -> None:
+ """User code should be in XXXX-XXXX format."""
+ resp = self.client.post(self.path, {"client_id": self.application.client_id})
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ user_code = data["user_code"]
+
+ # Should be 9 characters: XXXX-XXXX
+ assert len(user_code) == 9
+ assert user_code[4] == "-"
+ # Should only contain uppercase letters (base-20 alphabet)
+ assert user_code[:4].isupper()
+ assert user_code[5:].isupper()
+
+ def test_verification_uri_complete(self) -> None:
+ """verification_uri_complete should include the user code."""
+ resp = self.client.post(self.path, {"client_id": self.application.client_id})
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ assert data["user_code"] in data["verification_uri_complete"]
+
+ def test_cache_headers(self) -> None:
+ """Response should have no-cache headers."""
+ resp = self.client.post(self.path, {"client_id": self.application.client_id})
+ assert resp.status_code == 200
+ assert "no-cache" in resp.get("Cache-Control", "") or "no-store" in resp.get(
+ "Cache-Control", ""
+ )
+
+ def test_org_level_app_scope_exceeds_max(self) -> None:
+ """Org-level app should reject scope exceeding application max permissions."""
+ org_app = ApiApplication.objects.create(
+ owner=self.user,
+ redirect_uris="https://example.com",
+ requires_org_level_access=True,
+ scopes=["org:read"], # App only allows org:read
+ )
+
+ # Request scope that exceeds app's max permissions
+ resp = self.client.post(
+ self.path,
+ {"client_id": org_app.client_id, "scope": "org:read project:write"},
+ )
+ assert resp.status_code == 400
+ data = json.loads(resp.content)
+ assert data["error"] == "invalid_scope"
+ assert "exceeds" in data.get("error_description", "").lower()
+
+ def test_org_level_app_scope_within_max(self) -> None:
+ """Org-level app should accept scope within application max permissions."""
+ org_app = ApiApplication.objects.create(
+ owner=self.user,
+ redirect_uris="https://example.com",
+ requires_org_level_access=True,
+ scopes=["org:read", "project:read"],
+ )
+
+ resp = self.client.post(
+ self.path,
+ {"client_id": org_app.client_id, "scope": "org:read"},
+ )
+ assert resp.status_code == 200
+ data = json.loads(resp.content)
+ assert "device_code" in data
diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py
index 888f2ccda4236f..012e88c4512748 100644
--- a/tests/sentry/web/frontend/test_oauth_token.py
+++ b/tests/sentry/web/frontend/test_oauth_token.py
@@ -4,6 +4,7 @@
from sentry.locks import locks
from sentry.models.apiapplication import ApiApplication
+from sentry.models.apidevicecode import ApiDeviceCode
from sentry.models.apigrant import ApiGrant
from sentry.models.apitoken import ApiToken
from sentry.testutils.cases import TestCase
@@ -1075,3 +1076,286 @@ def test_pkce_verifier_too_long(self) -> None:
# Grant should be immediately deleted on validation failure
assert not ApiGrant.objects.filter(id=grant.id).exists()
+
+
+@control_silo_test
+class OAuthTokenDeviceCodeTest(TestCase):
+ """Tests for device code grant type (RFC 8628 §3.4/§3.5)."""
+
+ @cached_property
+ def path(self) -> str:
+ return "/oauth/token/"
+
+ def setUp(self) -> None:
+ super().setUp()
+ from sentry.models.apidevicecode import ApiDeviceCode, DeviceCodeStatus
+
+ self.application = ApiApplication.objects.create(
+ owner=self.user, redirect_uris="https://example.com"
+ )
+ self.client_secret = self.application.client_secret
+ self.device_code = ApiDeviceCode.objects.create(
+ application=self.application,
+ scope_list=["project:read"],
+ )
+ self.DeviceCodeStatus = DeviceCodeStatus
+
+ def test_missing_device_code(self) -> None:
+ """Missing device_code should return invalid_request."""
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "invalid_request"}
+
+ def test_invalid_device_code(self) -> None:
+ """Invalid device_code should return invalid_grant."""
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": "invalid",
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "invalid_grant"}
+
+ def test_authorization_pending(self) -> None:
+ """Pending device code should return authorization_pending."""
+ assert self.device_code.status == self.DeviceCodeStatus.PENDING
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "authorization_pending"}
+
+ # Device code should still exist
+ assert ApiDeviceCode.objects.filter(id=self.device_code.id).exists()
+
+ def test_access_denied(self) -> None:
+ """Denied device code should return access_denied and delete the code."""
+ self.device_code.status = self.DeviceCodeStatus.DENIED
+ self.device_code.save()
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "access_denied"}
+
+ # Device code should be deleted
+ assert not ApiDeviceCode.objects.filter(id=self.device_code.id).exists()
+
+ def test_expired_token(self) -> None:
+ """Expired device code should return expired_token and delete the code."""
+ from datetime import timedelta
+
+ self.device_code.expires_at = timezone.now() - timedelta(minutes=1)
+ self.device_code.save()
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "expired_token"}
+
+ # Device code should be deleted
+ assert not ApiDeviceCode.objects.filter(id=self.device_code.id).exists()
+
+ def test_success_approved(self) -> None:
+ """Approved device code should return access token and delete the code."""
+ from sentry.models.apidevicecode import ApiDeviceCode
+
+ self.device_code.status = self.DeviceCodeStatus.APPROVED
+ self.device_code.user = self.user
+ self.device_code.save()
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ assert "access_token" in data
+ assert data["token_type"] == "Bearer"
+ assert "expires_in" in data
+ assert data["scope"] == "project:read"
+ assert data["user"]["id"] == str(self.user.id)
+
+ # Device code should be deleted
+ assert not ApiDeviceCode.objects.filter(id=self.device_code.id).exists()
+
+ # Token should be created
+ token = ApiToken.objects.get(token=data["access_token"])
+ assert token.user == self.user
+ assert token.application == self.application
+
+ def test_success_with_organization(self) -> None:
+ """Approved device code with org should include organization_id."""
+ organization = self.create_organization(owner=self.user)
+
+ self.device_code.status = self.DeviceCodeStatus.APPROVED
+ self.device_code.user = self.user
+ self.device_code.organization_id = organization.id
+ self.device_code.save()
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ assert data["organization_id"] == str(organization.id)
+
+ def test_wrong_application(self) -> None:
+ """Device code for different application should return invalid_grant."""
+ other_app = ApiApplication.objects.create(
+ owner=self.user, redirect_uris="https://other.com"
+ )
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": other_app.client_id,
+ "client_secret": other_app.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "invalid_grant"}
+
+ def test_slow_down(self) -> None:
+ """Polling too fast should return slow_down error."""
+ from unittest.mock import patch
+
+ # First request should succeed (returns authorization_pending)
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "authorization_pending"}
+
+ # Second request within the rate limit window should return slow_down
+ with patch("sentry.ratelimits.backend.is_limited", return_value=True):
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "slow_down"}
+
+ def test_success_returns_refresh_token(self) -> None:
+ """Approved device code should return refresh_token for token renewal.
+
+ Per RFC 6749 §5.1, refresh_token is OPTIONAL but RECOMMENDED for
+ headless clients that cannot easily re-authenticate interactively.
+ """
+ self.device_code.status = self.DeviceCodeStatus.APPROVED
+ self.device_code.user = self.user
+ self.device_code.save()
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+ assert resp.status_code == 200
+
+ data = json.loads(resp.content)
+ assert "refresh_token" in data
+ assert data["refresh_token"]
+
+ # Verify the refresh_token can be used
+ token = ApiToken.objects.get(token=data["access_token"])
+ assert token.refresh_token == data["refresh_token"]
+
+ def test_inactive_application_rejects_device_code_grant(self) -> None:
+ """Inactive applications cannot exchange approved device codes for tokens.
+
+ This prevents tokens from being issued after an application is disabled
+ (e.g., for security reasons) even if the device code was approved while
+ the application was still active.
+ """
+ from sentry.models.apiapplication import ApiApplicationStatus
+
+ self.device_code.status = self.DeviceCodeStatus.APPROVED
+ self.device_code.user = self.user
+ self.device_code.save()
+
+ # Deactivate the application after approval
+ self.application.status = ApiApplicationStatus.inactive
+ self.application.save()
+
+ resp = self.client.post(
+ self.path,
+ {
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
+ "device_code": self.device_code.device_code,
+ "client_id": self.application.client_id,
+ "client_secret": self.client_secret,
+ },
+ )
+
+ # Per RFC 6749 §5.2, invalid_grant when grant is "revoked"
+ assert resp.status_code == 400
+ assert json.loads(resp.content) == {"error": "invalid_grant"}
+
+ # Device code should be deleted
+ assert not ApiDeviceCode.objects.filter(id=self.device_code.id).exists()
+
+ # No token should be created
+ assert not ApiToken.objects.filter(application=self.application, user=self.user).exists()