Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 196 additions & 64 deletions src/rotator_library/providers/antigravity_auth_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
import httpx

from .google_oauth_base import GoogleOAuthBase
from .utilities.gemini_shared_utils import CODE_ASSIST_ENDPOINT
# Note: Endpoint constants are imported by helper methods from gemini_shared_utils

lib_logger = logging.getLogger("rotator_library")

# Headers for Antigravity auth/discovery calls
# Uses Gemini CLI style User-Agent/X-Goog-Api-Client for compatibility with newer API versions,
# with Antigravity-specific Client-Metadata (JSON format)
# Note: ideType in Client-Metadata header stays IDE_UNSPECIFIED for compatibility,
# while ideType in request body metadata uses "ANTIGRAVITY"
ANTIGRAVITY_AUTH_HEADERS = {
"User-Agent": "google-api-nodejs-client/9.15.1",
"X-Goog-Api-Client": "google-cloud-sdk vscode_cloudshelleditor/0.1",
"User-Agent": "google-api-nodejs-client/10.3.0",
"X-Goog-Api-Client": "gl-node/22.18.0",
"Client-Metadata": '{"ideType":"IDE_UNSPECIFIED","platform":"PLATFORM_UNSPECIFIED","pluginType":"GEMINI"}',
}

Expand Down Expand Up @@ -100,6 +102,147 @@ async def _post_auth_discovery(
f"tier={tier}, project={project_id}"
)

# =========================================================================
# ENDPOINT FALLBACK HELPERS
# =========================================================================

def _extract_project_id_from_response(
self, data: Dict[str, Any], key: str = "cloudaicompanionProject"
) -> Optional[str]:
"""
Extract project ID from API response, handling both string and object formats.

The API may return cloudaicompanionProject as either:
- A string: "project-id-123"
- An object: {"id": "project-id-123", ...}

Args:
data: API response data
key: Key to extract from (default: "cloudaicompanionProject")

Returns:
Project ID string or None if not found
"""
value = data.get(key)
if isinstance(value, str) and value:
return value
if isinstance(value, dict):
return value.get("id")
return None

async def _call_load_code_assist(
self,
client: httpx.AsyncClient,
access_token: str,
configured_project_id: Optional[str],
headers: Dict[str, str],
) -> tuple:
"""
Call loadCodeAssist with endpoint fallback chain.

Tries endpoints in ANTIGRAVITY_LOAD_ENDPOINT_ORDER (prod first for better
project resolution, then fallback to sandbox).

Args:
client: httpx async client
access_token: OAuth access token
configured_project_id: User-configured project ID (or None)
headers: Request headers

Returns:
Tuple of (response_data, successful_endpoint) or (None, None) on failure
"""
from .utilities.gemini_shared_utils import ANTIGRAVITY_LOAD_ENDPOINT_ORDER

core_client_metadata = {
"ideType": "ANTIGRAVITY",
"platform": "PLATFORM_UNSPECIFIED",
"pluginType": "GEMINI",
}
if configured_project_id:
core_client_metadata["duetProject"] = configured_project_id

load_request = {
"cloudaicompanionProject": configured_project_id,
"metadata": core_client_metadata,
}

last_error = None
for endpoint in ANTIGRAVITY_LOAD_ENDPOINT_ORDER:
try:
lib_logger.debug(f"Trying loadCodeAssist at {endpoint}")
response = await client.post(
f"{endpoint}:loadCodeAssist",
headers=headers,
json=load_request,
timeout=15,
)
if response.status_code == 200:
data = response.json()
lib_logger.debug(f"loadCodeAssist succeeded at {endpoint}")
return data, endpoint
lib_logger.debug(
f"loadCodeAssist returned {response.status_code} at {endpoint}"
)
last_error = f"HTTP {response.status_code}"
except Exception as e:
lib_logger.debug(f"loadCodeAssist failed at {endpoint}: {e}")
last_error = str(e)
continue

lib_logger.warning(
f"All loadCodeAssist endpoints failed. Last error: {last_error}"
)
return None, None

async def _call_onboard_user(
self,
client: httpx.AsyncClient,
headers: Dict[str, str],
onboard_request: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
"""
Call onboardUser with endpoint fallback chain.

Tries endpoints in ANTIGRAVITY_ENDPOINT_FALLBACKS (daily first, then prod).

Args:
client: httpx async client
headers: Request headers
onboard_request: Onboarding request payload

Returns:
Response data dict or None on failure
"""
from .utilities.gemini_shared_utils import ANTIGRAVITY_ENDPOINT_FALLBACKS

last_error = None
for endpoint in ANTIGRAVITY_ENDPOINT_FALLBACKS:
try:
lib_logger.debug(f"Trying onboardUser at {endpoint}")
response = await client.post(
f"{endpoint}:onboardUser",
headers=headers,
json=onboard_request,
timeout=30,
)
if response.status_code == 200:
lib_logger.debug(f"onboardUser succeeded at {endpoint}")
return response.json()
lib_logger.debug(
f"onboardUser returned {response.status_code} at {endpoint}"
)
last_error = f"HTTP {response.status_code}"
except Exception as e:
lib_logger.debug(f"onboardUser failed at {endpoint}: {e}")
last_error = str(e)
continue

lib_logger.warning(
f"All onboardUser endpoints failed. Last error: {last_error}"
)
return None

# =========================================================================
# PROJECT ID DISCOVERY
# =========================================================================
Expand Down Expand Up @@ -203,45 +346,39 @@ async def _discover_project_id(
**ANTIGRAVITY_AUTH_HEADERS,
}

# Build core metadata for API requests
core_client_metadata = {
"ideType": "ANTIGRAVITY",
"platform": "PLATFORM_UNSPECIFIED",
"pluginType": "GEMINI",
}
if configured_project_id:
core_client_metadata["duetProject"] = configured_project_id

discovered_project_id = None
discovered_tier = None

async with httpx.AsyncClient() as client:
# 1. Try discovery endpoint with loadCodeAssist
# 1. Try discovery endpoint with loadCodeAssist using endpoint fallback
lib_logger.debug(
"Attempting project discovery via Code Assist loadCodeAssist endpoint..."
)
try:
# Build metadata - include duetProject only if we have a configured project
core_client_metadata = {
"ideType": "ANTIGRAVITY",
"platform": "PLATFORM_UNSPECIFIED",
"pluginType": "GEMINI",
}
if configured_project_id:
core_client_metadata["duetProject"] = configured_project_id

# Build load request - pass configured_project_id if available, otherwise None
load_request = {
"cloudaicompanionProject": configured_project_id, # Can be None
"metadata": core_client_metadata,
}

lib_logger.debug(
f"Sending loadCodeAssist request with cloudaicompanionProject={configured_project_id}"
)
response = await client.post(
f"{CODE_ASSIST_ENDPOINT}:loadCodeAssist",
headers=headers,
json=load_request,
timeout=20,
# Use helper with endpoint fallback chain
data, successful_endpoint = await self._call_load_code_assist(
client, access_token, configured_project_id, headers
)
response.raise_for_status()
data = response.json()

# Log full response for debugging
if data is None:
# All endpoints failed - skip to GCP Resource Manager fallback
raise httpx.HTTPStatusError(
"All loadCodeAssist endpoints failed",
request=None,
response=None,
)

lib_logger.debug(
f"loadCodeAssist full response keys: {list(data.keys())}"
f"loadCodeAssist succeeded at {successful_endpoint}, response keys: {list(data.keys())}"
)

# Extract tier information
Expand Down Expand Up @@ -269,7 +406,8 @@ async def _discover_project_id(
# Check if user is already known to server (has currentTier)
if current_tier_id:
# User is already onboarded - check for project from server
server_project = data.get("cloudaicompanionProject")
# Use helper to handle both string and object formats
server_project = self._extract_project_id_from_response(data)

# Check if this tier requires user-defined project (paid tiers)
requires_user_project = any(
Expand Down Expand Up @@ -405,58 +543,52 @@ async def _discover_project_id(
f"Paid tier onboarding: using project {configured_project_id}"
)

lib_logger.debug("Initiating onboardUser request...")
lro_response = await client.post(
f"{CODE_ASSIST_ENDPOINT}:onboardUser",
headers=headers,
json=onboard_request,
timeout=30,
lib_logger.debug(
"Initiating onboardUser request with endpoint fallback..."
)
lro_response.raise_for_status()
lro_data = lro_response.json()
lro_data = await self._call_onboard_user(
client, headers, onboard_request
)

if lro_data is None:
raise ValueError(
"All onboardUser endpoints failed. Cannot onboard user."
)

lib_logger.debug(
f"Initial onboarding response: done={lro_data.get('done')}"
)

# Poll for onboarding completion (up to 5 minutes)
for i in range(150): # 150 × 2s = 5 minutes
# Poll for onboarding completion (up to 60 seconds)
for i in range(30): # 30 × 2s = 60 seconds
if lro_data.get("done"):
lib_logger.debug(
f"Onboarding completed after {i} polling attempts"
)
lib_logger.debug(f"Onboarding completed after {i * 2}s")
break
await asyncio.sleep(2)
if (i + 1) % 15 == 0: # Log every 30 seconds
if (i + 1) % 10 == 0: # Log every 20 seconds
lib_logger.info(
f"Still waiting for onboarding completion... ({(i + 1) * 2}s elapsed)"
)
lib_logger.debug(
f"Polling onboarding status... (Attempt {i + 1}/150)"
f"Polling onboarding status... (Attempt {i + 1}/30)"
)
lro_response = await client.post(
f"{CODE_ASSIST_ENDPOINT}:onboardUser",
headers=headers,
json=onboard_request,
timeout=30,
lro_data = await self._call_onboard_user(
client, headers, onboard_request
)
lro_response.raise_for_status()
lro_data = lro_response.json()
if lro_data is None:
lib_logger.warning("onboardUser endpoint failed during polling")
break

if not lro_data.get("done"):
lib_logger.error("Onboarding process timed out after 5 minutes")
if not lro_data or not lro_data.get("done"):
lib_logger.error("Onboarding process timed out after 60 seconds")
raise ValueError(
"Onboarding process timed out after 5 minutes. Please try again or contact support."
"Onboarding process timed out after 60 seconds. Please try again or contact support."
)

# Extract project ID from LRO response
# Extract project ID from LRO response using helper
# Note: onboardUser returns response.cloudaicompanionProject as an object with .id
lro_response_data = lro_data.get("response", {})
lro_project_obj = lro_response_data.get("cloudaicompanionProject", {})
project_id = (
lro_project_obj.get("id")
if isinstance(lro_project_obj, dict)
else None
)
project_id = self._extract_project_id_from_response(lro_response_data)

# Fallback to configured project if LRO didn't return one
if not project_id and configured_project_id:
Expand Down
Loading