Conversation
Summary of ChangesHello @think-in-universe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements a new 'purchased tokens' feature, providing users with the flexibility to acquire additional tokens beyond their standard subscription allowances. The changes encompass the full stack, from database schema and repository logic to new API endpoints for managing token purchases and balances, and crucial integration into the existing usage tracking and billing systems. This enhancement aims to offer more granular control over resource consumption and monetization. Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
PR Review: feat: purchase tokensCritical Issues1. Race condition causes over-debit of purchased tokens In let overflow = used - max_tokens as i64; // used = total including all concurrent requests
let to_debit = std::cmp::min(quantity, overflow);With concurrent requests, both can see the same combined
The DB // overflow attributable only to this request
let prior_used = used - quantity;
let prior_overflow = (prior_used - max_tokens as i64).max(0);
let current_overflow = (used - max_tokens as i64).max(0);
let to_debit = current_overflow - prior_overflow; // tokens from THIS request in overflow zoneThis is correct regardless of concurrency since it's deterministic given (used, quantity, max_tokens). 2. Debit runs synchronously on every request's response path
This adds multi-DB-roundtrip latency to every non-streaming response. This should be spawned as a background task: let svc = state.subscription_service.clone();
tokio::spawn(async move {
if let Err(e) = svc.debit_purchased_tokens_after_usage(user_id, tokens).await {
tracing::warn\!("Failed to debit purchased tokens (user_id={}): {}", user_id, e);
}
});The streaming path in 3. In the webhook handler (service.rs ~line 770), when .ok_or_else(|| {
SubscriptionError::InternalError("No user_id in checkout session metadata".into())
})?;This returns an error that propagates out of the entire webhook handler as an internal error. Any legitimate Consider returning let user_id_str = metadata
.and_then(|m| m.get("user_id"))
.and_then(|v| v.as_str());
let Some(user_id_str) = user_id_str else {
// Not a token purchase session, skip
None // returns token_purchase_user_id = None
};Minor Notes
Reviewed by Claude Sonnet 4.6 |
There was a problem hiding this comment.
Code Review
This pull request introduces the functionality for users to purchase tokens, including new database structures, services, API endpoints, and integration with the Stripe payment provider for checkout session creation and webhook handling. However, two significant security vulnerabilities were identified in the Stripe webhook handling logic. The token crediting operation is not atomic with the webhook idempotency check, which could lead to double-crediting. Furthermore, tokens are credited without verifying that the payment status is 'paid', potentially allowing users to receive tokens for free. These issues could lead to financial loss and should be addressed by ensuring atomicity within the database transaction and adding a payment status check. Additionally, the review suggests improving code maintainability by reducing duplication and promoting the use of standard libraries.
| self.purchased_token_repo | ||
| .credit(user_id, tokens) | ||
| .await |
There was a problem hiding this comment.
The credit operation for purchased tokens is performed outside the database transaction used for webhook idempotency. In handle_stripe_webhook, a transaction txn is started and used to store the webhook event. However, self.purchased_token_repo.credit (line 900) uses a separate connection from the pool (as seen in its implementation in purchased_token_repository.rs).
If the transaction fails to commit at line 953 (e.g., due to a transient database error), the record of the webhook being processed will be rolled back, but the tokens will have already been credited to the user's account. When Stripe retries the webhook, the handler will see the event as new and credit the tokens again, leading to a double-crediting vulnerability.
| })?; | ||
|
|
||
| let mode = obj.get("mode").and_then(|m| m.as_str()); | ||
| if mode == Some("payment") { |
There was a problem hiding this comment.
The webhook handler credits tokens immediately upon receiving a checkout.session.completed event with mode == "payment", but it fails to verify the payment_status of the session. According to Stripe documentation, this event indicates the checkout flow is complete, but the payment may still be pending (e.g., for delayed payment methods like ACH).
Crediting tokens when payment_status is "unpaid" allows users to potentially receive tokens for free if the payment eventually fails. You should verify that payment_status == "paid" before crediting tokens.
crates/api/src/routes/api.rs
Outdated
| // Debit purchased tokens if usage overflowed monthly limit | ||
| if let Err(e) = state | ||
| .subscription_service | ||
| .debit_purchased_tokens_after_usage(user_id, usage.total_tokens as i64) | ||
| .await | ||
| { | ||
| tracing::warn!( | ||
| "Failed to debit purchased tokens for overflow (user_id={}): {}", | ||
| user_id, | ||
| e | ||
| ); | ||
| } |
There was a problem hiding this comment.
This logic for debiting purchased tokens is duplicated in record_response_usage_from_body (lines 4798-4809). To improve maintainability and reduce code duplication, consider extracting this block into a separate helper function.
For example, you could create a helper function like this:
async fn debit_overflow_tokens(state: &AppState, user_id: UserId, tokens: i64) {
if let Err(e) = state
.subscription_service
.debit_purchased_tokens_after_usage(user_id, tokens)
.await
{
tracing::warn!(
"Failed to debit purchased tokens for overflow (user_id={}): {}",
user_id,
e
);
}
}Then you could call it from both record_chat_usage_from_body and record_response_usage_from_body:
debit_overflow_tokens(&state, user_id, usage.total_tokens as i64).await;| let metadata = obj.get("metadata").and_then(|m| m.as_object()); | ||
| let user_id_str = metadata | ||
| .and_then(|m| m.get("user_id")) | ||
| .and_then(|v| v.as_str()) | ||
| .ok_or_else(|| { | ||
| SubscriptionError::InternalError( | ||
| "No user_id in checkout session metadata".into(), | ||
| ) | ||
| })?; | ||
| let tokens_str = metadata | ||
| .and_then(|m| m.get("tokens")) | ||
| .and_then(|v| v.as_str()) | ||
| .ok_or_else(|| { | ||
| SubscriptionError::InternalError( | ||
| "No tokens in checkout session metadata".into(), | ||
| ) | ||
| })?; |
There was a problem hiding this comment.
The current parsing logic for user_id and tokens from the webhook metadata can produce misleading error messages. If the metadata object itself is missing, the error will state that user_id or tokens is missing, which isn't the root cause.
Consider refactoring this to first ensure the metadata object exists, and then parse the fields from it. This will provide more accurate error logging and make debugging easier.
Here's a suggested refactoring:
let metadata = obj.get("metadata").and_then(|m| m.as_object()).ok_or_else(|| SubscriptionError::InternalError("Missing metadata in checkout session object".into()))?;
let user_id_str = metadata
.get("user_id")
.and_then(|v| v.as_str())
.ok_or_else(|| {
SubscriptionError::InternalError(
"No user_id in checkout session metadata".into(),
)
})?;
let tokens_str = metadata
.get("tokens")
.and_then(|v| v.as_str())
.ok_or_else(|| {
SubscriptionError::InternalError(
"No tokens in checkout session metadata".into(),
)
})?;| let metadata = obj.get("metadata").and_then(|m| m.as_object()); | |
| let user_id_str = metadata | |
| .and_then(|m| m.get("user_id")) | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No user_id in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let tokens_str = metadata | |
| .and_then(|m| m.get("tokens")) | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No tokens in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let metadata = obj.get("metadata").and_then(|m| m.as_object()).ok_or_else(|| SubscriptionError::InternalError("Missing metadata in checkout session object".into()))?; | |
| let user_id_str = metadata | |
| .get("user_id") | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No user_id in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let tokens_str = metadata | |
| .get("tokens") | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No tokens in checkout session metadata".into(), | |
| ) | |
| })?; |
| fn format_number(n: u64) -> String { | ||
| let s = n.to_string(); | ||
| let mut result = String::with_capacity(s.len() + (s.len() - 1) / 3); | ||
| let chars: Vec<char> = s.chars().collect(); | ||
| let len = chars.len(); | ||
| for (i, c) in chars.into_iter().enumerate() { | ||
| result.push(c); | ||
| if (len - i - 1) % 3 == 0 && i != len - 1 { | ||
| result.push(','); | ||
| } | ||
| } | ||
| result | ||
| } |
There was a problem hiding this comment.
This custom implementation for number formatting is a bit complex and could be simplified by using a well-tested external crate like num-format. This would make the code more readable and less prone to errors.
Here's how you could implement it with num-format after adding it to your Cargo.toml:
use num_format::{Locale, ToFormattedString};
fn format_number(n: u64) -> String {
n.to_formatted_string(&Locale::en)
}If you prefer to avoid adding a new dependency, the current implementation is functional, but using a standard library is generally recommended for common tasks like this.
There was a problem hiding this comment.
Pull request overview
This pull request implements a token purchase feature that allows users to buy additional tokens beyond their monthly subscription limits. The feature integrates with Stripe for payment processing and uses webhooks to credit tokens after successful purchases.
Changes:
- Added database table and repository for tracking purchased token balances per user
- Implemented configurable token purchase pricing through system configs (amount and price_per_million)
- Created API endpoints for purchasing tokens, viewing balance, and retrieving purchase info
- Integrated token purchase balance into subscription token limit calculations
- Added automatic debit of purchased tokens when usage exceeds monthly limits
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/database/src/migrations/sql/V23__add_purchased_tokens.sql | Adds purchased_tokens table with balance, total_purchased columns and appropriate constraints |
| crates/database/src/repositories/purchased_token_repository.rs | Implements repository for token balance operations (get, credit, debit) |
| crates/database/src/repositories/mod.rs | Exports new purchased token repository |
| crates/database/src/lib.rs | Registers purchased token repository in Database struct |
| crates/services/src/system_configs/ports.rs | Adds TokensPricingConfig for configurable token purchase pricing with backward compatibility |
| crates/services/src/subscription/ports.rs | Adds PurchasedTokenRepository trait and new service methods for token purchase |
| crates/services/src/subscription/service.rs | Implements token purchase checkout, webhook handling, balance queries, and usage-based debit logic |
| crates/api/src/routes/subscriptions.rs | Adds three new authenticated endpoints for token purchase operations |
| crates/api/src/routes/api.rs | Integrates purchased token debit into non-streaming usage recording |
| crates/api/src/usage_parsing.rs | Adds subscription service to usage tracking streams for token debit |
| crates/api/src/openapi.rs | Registers new API endpoints and models in OpenAPI spec |
| crates/api/src/models.rs | Updates PartialSystemConfigs conversion to support new fields |
| crates/api/src/main.rs | Wires up purchased token repository in service configuration |
| crates/api/tests/common.rs | Adds purchased token repository to test setup |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return Err(SubscriptionError::MonthlyTokenLimitExceeded { | ||
| used, | ||
| limit: max_tokens, | ||
| limit: effective_limit as u64, |
There was a problem hiding this comment.
Potential arithmetic overflow when converting effective_limit from i64 to u64. If purchased_balance is negative (which shouldn't happen due to database constraints, but defensive programming is wise), then effective_limit could be negative, and casting to u64 would wrap around to a very large positive value. Consider adding validation or using saturating arithmetic.
| limit: effective_limit as u64, | |
| limit: effective_limit.max(0) as u64, |
|
|
||
| if tokens > 0 { | ||
| self.purchased_token_repo | ||
| .credit(user_id, tokens) |
There was a problem hiding this comment.
The token purchase credit operation (line 900-903) doesn't use the transaction context (txn). This could lead to issues where tokens are credited but the webhook record commit fails, resulting in duplicate credits on webhook retries. The credit operation should either use the transaction or be moved after the transaction commit with additional safeguards.
| .credit(user_id, tokens) | |
| .credit(txn, user_id, tokens) |
| let metadata = obj.get("metadata").and_then(|m| m.as_object()); | ||
| let user_id_str = metadata | ||
| .and_then(|m| m.get("user_id")) | ||
| .and_then(|v| v.as_str()) | ||
| .ok_or_else(|| { | ||
| SubscriptionError::InternalError( | ||
| "No user_id in checkout session metadata".into(), | ||
| ) | ||
| })?; | ||
| let tokens_str = metadata | ||
| .and_then(|m| m.get("tokens")) | ||
| .and_then(|v| v.as_str()) | ||
| .ok_or_else(|| { | ||
| SubscriptionError::InternalError( | ||
| "No tokens in checkout session metadata".into(), | ||
| ) | ||
| })?; | ||
| let tokens: i64 = tokens_str.parse().map_err(|_| { | ||
| SubscriptionError::InternalError("Invalid tokens in metadata".into()) | ||
| })?; | ||
| let user_id = UserId(uuid::Uuid::parse_str(user_id_str).map_err(|e| { | ||
| SubscriptionError::InternalError(format!("Invalid user_id: {}", e)) | ||
| })?); | ||
|
|
||
| if tokens > 0 { | ||
| self.purchased_token_repo | ||
| .credit(user_id, tokens) | ||
| .await | ||
| .map_err(|e| SubscriptionError::DatabaseError(e.to_string()))?; | ||
| tracing::info!( | ||
| "Token purchase credited: user_id={}, tokens={}, event_id={}", | ||
| user_id, | ||
| tokens, | ||
| event_id | ||
| ); | ||
| Some(user_id) | ||
| } else { | ||
| None |
There was a problem hiding this comment.
Missing payment_status verification in checkout.session.completed webhook handler. The code should verify that payment_status is "paid" before crediting tokens. Currently, tokens could be credited even if the payment failed or is unpaid. Add a check like: let payment_status = obj.get("payment_status").and_then(|s| s.as_str()); if payment_status != Some("paid") { return Ok(()); }
| let metadata = obj.get("metadata").and_then(|m| m.as_object()); | |
| let user_id_str = metadata | |
| .and_then(|m| m.get("user_id")) | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No user_id in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let tokens_str = metadata | |
| .and_then(|m| m.get("tokens")) | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No tokens in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let tokens: i64 = tokens_str.parse().map_err(|_| { | |
| SubscriptionError::InternalError("Invalid tokens in metadata".into()) | |
| })?; | |
| let user_id = UserId(uuid::Uuid::parse_str(user_id_str).map_err(|e| { | |
| SubscriptionError::InternalError(format!("Invalid user_id: {}", e)) | |
| })?); | |
| if tokens > 0 { | |
| self.purchased_token_repo | |
| .credit(user_id, tokens) | |
| .await | |
| .map_err(|e| SubscriptionError::DatabaseError(e.to_string()))?; | |
| tracing::info!( | |
| "Token purchase credited: user_id={}, tokens={}, event_id={}", | |
| user_id, | |
| tokens, | |
| event_id | |
| ); | |
| Some(user_id) | |
| } else { | |
| None | |
| // Ensure the checkout session is fully paid before crediting tokens | |
| let payment_status = obj.get("payment_status").and_then(|s| s.as_str()); | |
| if payment_status != Some("paid") { | |
| tracing::info!( | |
| "Skipping token credit for non-paid checkout session: event_id={}, payment_status={:?}", | |
| event_id, | |
| payment_status | |
| ); | |
| None | |
| } else { | |
| let metadata = obj.get("metadata").and_then(|m| m.as_object()); | |
| let user_id_str = metadata | |
| .and_then(|m| m.get("user_id")) | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No user_id in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let tokens_str = metadata | |
| .and_then(|m| m.get("tokens")) | |
| .and_then(|v| v.as_str()) | |
| .ok_or_else(|| { | |
| SubscriptionError::InternalError( | |
| "No tokens in checkout session metadata".into(), | |
| ) | |
| })?; | |
| let tokens: i64 = tokens_str.parse().map_err(|_| { | |
| SubscriptionError::InternalError("Invalid tokens in metadata".into()) | |
| })?; | |
| let user_id = UserId(uuid::Uuid::parse_str(user_id_str).map_err(|e| { | |
| SubscriptionError::InternalError(format!("Invalid user_id: {}", e)) | |
| })?); | |
| if tokens > 0 { | |
| self.purchased_token_repo | |
| .credit(user_id, tokens) | |
| .await | |
| .map_err(|e| SubscriptionError::DatabaseError(e.to_string()))?; | |
| tracing::info!( | |
| "Token purchase credited: user_id={}, tokens={}, event_id={}", | |
| user_id, | |
| tokens, | |
| event_id | |
| ); | |
| Some(user_id) | |
| } else { | |
| None | |
| } |
| // Amount in cents: (tokens/1e6) * price_per_million * 100 | ||
| let amount_cents = | ||
| ((tokens as f64 / 1_000_000.0) * price_per_million * 100.0).round() as i64; | ||
|
|
There was a problem hiding this comment.
The amount_cents calculation could result in zero or negative values for very small token amounts or edge cases. Consider adding validation: if amount_cents <= 0 { return Err(SubscriptionError::InternalError("Invalid price calculation".into())); }. This prevents creating checkout sessions with invalid amounts.
| if amount_cents <= 0 { | |
| return Err(SubscriptionError::InternalError("Invalid price calculation".into())); | |
| } |
| /// Format large numbers with commas (e.g. 1_000_000 -> "1,000,000") | ||
| fn format_number(n: u64) -> String { | ||
| let s = n.to_string(); | ||
| let mut result = String::with_capacity(s.len() + (s.len() - 1) / 3); | ||
| let chars: Vec<char> = s.chars().collect(); | ||
| let len = chars.len(); | ||
| for (i, c) in chars.into_iter().enumerate() { | ||
| result.push(c); | ||
| if (len - i - 1) % 3 == 0 && i != len - 1 { | ||
| result.push(','); | ||
| } | ||
| } | ||
| result | ||
| } |
There was a problem hiding this comment.
The format_number function has a potential off-by-one error in comma placement logic. The condition (len - i - 1) % 3 == 0 && i != len - 1 will place commas incorrectly. For example, for "1000000", it would produce "1,00,00,00" instead of "1,000,000". The correct logic should track position from the right: if (len - i - 1) > 0 && (len - i - 1) % 3 == 0.
| async fn create_token_purchase_checkout( | ||
| &self, | ||
| user_id: UserId, | ||
| success_url: String, | ||
| cancel_url: String, | ||
| ) -> Result<String, SubscriptionError> { | ||
| tracing::info!("Creating token purchase checkout for user_id={}", user_id); | ||
|
|
||
| // Check Stripe configuration | ||
| if self.stripe_secret_key.is_empty() || self.stripe_webhook_secret.is_empty() { | ||
| return Err(SubscriptionError::TokenPurchaseNotConfigured); | ||
| } | ||
|
|
||
| let configs = self | ||
| .system_configs_service | ||
| .get_configs() | ||
| .await | ||
| .map_err(|e| SubscriptionError::InternalError(e.to_string()))? | ||
| .ok_or(SubscriptionError::TokenPurchaseNotConfigured)?; | ||
|
|
||
| let pricing = configs | ||
| .tokens_pricing | ||
| .as_ref() | ||
| .ok_or(SubscriptionError::TokenPurchaseNotConfigured)?; | ||
| let tokens = pricing.amount; | ||
| let price_per_million = pricing.price_per_million; | ||
|
|
||
| // Get or create Stripe customer | ||
| let customer_id = self.get_or_create_stripe_customer(user_id).await?; | ||
|
|
||
| // Amount in cents: (tokens/1e6) * price_per_million * 100 | ||
| let amount_cents = | ||
| ((tokens as f64 / 1_000_000.0) * price_per_million * 100.0).round() as i64; | ||
|
|
||
| let client = self.get_stripe_client(); | ||
|
|
||
| let mut params = CreateCheckoutSession::new(); | ||
| params.mode = Some(CheckoutSessionMode::Payment); | ||
| params.customer = Some( | ||
| customer_id | ||
| .parse() | ||
| .map_err(|_| SubscriptionError::StripeError("Invalid customer ID".to_string()))?, | ||
| ); | ||
| params.success_url = Some(&success_url); | ||
| params.cancel_url = Some(&cancel_url); | ||
| params.line_items = Some(vec![CreateCheckoutSessionLineItems { | ||
| price_data: Some(CreateCheckoutSessionLineItemsPriceData { | ||
| currency: Currency::USD, | ||
| unit_amount: Some(amount_cents), | ||
| product_data: Some(CreateCheckoutSessionLineItemsPriceDataProductData { | ||
| name: format!("{} Tokens", format_number(tokens)), | ||
| ..Default::default() | ||
| }), | ||
| ..Default::default() | ||
| }), | ||
| quantity: Some(1), | ||
| ..Default::default() | ||
| }]); | ||
| params.metadata = Some( | ||
| vec![ | ||
| ("user_id".to_string(), user_id.0.to_string()), | ||
| ("tokens".to_string(), tokens.to_string()), | ||
| ] | ||
| .into_iter() | ||
| .collect(), | ||
| ); | ||
|
|
||
| let session = CheckoutSession::create(&client, params) | ||
| .await | ||
| .map_err(|e| SubscriptionError::StripeError(e.to_string()))?; | ||
|
|
||
| let checkout_url = session | ||
| .url | ||
| .ok_or_else(|| SubscriptionError::StripeError("No checkout URL returned".into()))?; | ||
|
|
||
| tracing::info!( | ||
| "Token purchase checkout created: user_id={}, session_id={}, tokens={}", | ||
| user_id, | ||
| session.id, | ||
| tokens | ||
| ); | ||
|
|
||
| Ok(checkout_url) | ||
| } |
There was a problem hiding this comment.
No test coverage for the new token purchase feature. Tests should be added to verify: 1) creating token purchase checkout, 2) handling checkout.session.completed webhooks, 3) crediting tokens after successful payment, 4) deducting tokens on usage overflow, 5) including purchased balance in effective token limits. The codebase has comprehensive test coverage in crates/api/tests/subscriptions_tests.rs that should be extended.
| /// Create a token purchase checkout session. Single fixed option (1M tokens at $1.70). | ||
| /// Returns the Stripe checkout URL. |
There was a problem hiding this comment.
Documentation comment is outdated. It says "Single fixed option (1M tokens at $1.70)" but the actual implementation is configurable via TokensPricingConfig (amount and price_per_million fields). Update the comment to reflect that the token purchase amount and pricing are configurable via system configs.
| /// Create a token purchase checkout session. Single fixed option (1M tokens at $1.70). | |
| /// Returns the Stripe checkout URL. | |
| /// Create a token purchase checkout session using the configured token amount and price. | |
| /// Returns the Stripe checkout URL. Amount and pricing are defined via system configs (TokensPricingConfig). |
Resolves #180