diff --git a/.version b/.version index 1629abb8..7d096ab1 100644 --- a/.version +++ b/.version @@ -1,5 +1,5 @@ { "major": 0, "minor": 272, - "patch": 1 + "patch": 2 } diff --git a/api/addon_invocation_quota_store.go b/api/addon_invocation_quota_store.go index a9b363b7..f204a58a 100644 --- a/api/addon_invocation_quota_store.go +++ b/api/addon_invocation_quota_store.go @@ -12,7 +12,7 @@ import ( // Default quota values const ( - DefaultMaxActiveInvocations = 1 + DefaultMaxActiveInvocations = 3 // Allow 3 concurrent invocations per user by default DefaultMaxInvocationsPerHour = 10 ) diff --git a/api/addon_invocation_store.go b/api/addon_invocation_store.go index f5af1634..ac51fc6a 100644 --- a/api/addon_invocation_store.go +++ b/api/addon_invocation_store.go @@ -68,6 +68,9 @@ type AddonInvocationStore interface { // GetActiveForUser retrieves the active invocation for a user (for quota enforcement) GetActiveForUser(ctx context.Context, userID uuid.UUID) (*AddonInvocation, error) + // ListActiveForUser retrieves all active invocations (pending/in_progress) for a user up to limit + ListActiveForUser(ctx context.Context, userID uuid.UUID, limit int) ([]AddonInvocation, error) + // Delete removes an invocation (for cleanup) Delete(ctx context.Context, id uuid.UUID) error @@ -370,6 +373,67 @@ func (s *AddonInvocationRedisStore) GetActiveForUser(ctx context.Context, userID return s.Get(ctx, invocationID) } +// ListActiveForUser retrieves all active invocations (pending/in_progress) for a user up to limit +func (s *AddonInvocationRedisStore) ListActiveForUser(ctx context.Context, userID uuid.UUID, limit int) ([]AddonInvocation, error) { + logger := slogging.Get() + + // Scan for all invocation keys + pattern := "addon:invocation:*" + var cursor uint64 + var activeInvocations []AddonInvocation + + client := s.redis.GetClient() + + for { + var keys []string + var newCursor uint64 + keys, newCursor, err := client.Scan(ctx, cursor, pattern, 100).Result() + if err != nil { + logger.Error("Failed to scan invocation keys: %v", err) + return nil, fmt.Errorf("failed to scan invocations: %w", err) + } + + // Check each invocation + for _, key := range keys { + data, err := s.redis.Get(ctx, key) + if err != nil { + if err == redis.Nil { + continue // Key expired between scan and get + } + logger.Error("Failed to get invocation from key %s: %v", key, err) + continue + } + + var invocation AddonInvocation + if err := json.Unmarshal([]byte(data), &invocation); err != nil { + logger.Error("Failed to unmarshal invocation from key %s: %v", key, err) + continue + } + + // Check if invocation belongs to user and is active + if invocation.InvokedByUUID == userID && + (invocation.Status == InvocationStatusPending || invocation.Status == InvocationStatusInProgress) { + activeInvocations = append(activeInvocations, invocation) + + // Stop if we've reached the limit + if len(activeInvocations) >= limit { + logger.Debug("Found %d active invocations for user %s (limit reached)", len(activeInvocations), userID) + return activeInvocations, nil + } + } + } + + cursor = newCursor + if cursor == 0 { + break + } + } + + logger.Debug("Found %d active invocations for user %s", len(activeInvocations), userID) + + return activeInvocations, nil +} + // Delete removes an invocation func (s *AddonInvocationRedisStore) Delete(ctx context.Context, id uuid.UUID) error { logger := slogging.Get() diff --git a/api/addon_invocation_worker.go b/api/addon_invocation_worker.go index 67af60b3..3afa675b 100644 --- a/api/addon_invocation_worker.go +++ b/api/addon_invocation_worker.go @@ -22,6 +22,7 @@ type AddonInvocationWorker struct { running bool stopChan chan struct{} workChan chan uuid.UUID // Channel for invocation IDs to process + baseURL string // Server base URL for callback URLs } // AddonInvocationPayload represents the payload sent to webhook endpoints @@ -48,9 +49,15 @@ func NewAddonInvocationWorker() *AddonInvocationWorker { }, stopChan: make(chan struct{}), workChan: make(chan uuid.UUID, 100), // Buffer up to 100 pending invocations + baseURL: "http://localhost:8080", // Default, should be set via SetBaseURL } } +// SetBaseURL sets the server's base URL for callback URLs +func (w *AddonInvocationWorker) SetBaseURL(baseURL string) { + w.baseURL = baseURL +} + // Start begins processing invocations func (w *AddonInvocationWorker) Start(ctx context.Context) error { logger := slogging.Get() @@ -145,9 +152,8 @@ func (w *AddonInvocationWorker) processInvocation(ctx context.Context, invocatio logger.Debug("sending addon invocation to %s (invocation: %s)", webhook.Url, invocationID) - // Build callback URL - // TODO: Get base URL from configuration - callbackURL := fmt.Sprintf("https://tmi.example.com/invocations/%s/status", invocationID) + // Build callback URL using configured base URL + callbackURL := fmt.Sprintf("%s/invocations/%s/status", w.baseURL, invocationID) // Build payload payload := AddonInvocationPayload{ @@ -212,9 +218,24 @@ func (w *AddonInvocationWorker) processInvocation(ctx context.Context, invocatio logger.Info("addon invocation sent successfully to %s (invocation: %s, status: %d)", webhook.Url, invocationID, resp.StatusCode) - // Mark as in_progress (webhook will update to completed/failed via callback) - invocation.Status = InvocationStatusInProgress - invocation.StatusMessage = "Invocation sent to webhook" + // Check if webhook wants to use async callbacks + // If X-TMI-Callback: async is set, the webhook will call back with status updates + // Otherwise, auto-complete the invocation (webhook handles work internally) + callbackMode := resp.Header.Get("X-TMI-Callback") + + if callbackMode == "async" { + // Webhook will call back with status updates + invocation.Status = InvocationStatusInProgress + invocation.StatusMessage = "Invocation sent to webhook, awaiting callback" + logger.Debug("webhook requested async callback mode for invocation %s", invocationID) + } else { + // Auto-complete: webhook accepted and will handle internally + invocation.Status = InvocationStatusCompleted + invocation.StatusMessage = "Invocation delivered successfully" + invocation.StatusPercent = 100 + logger.Debug("auto-completing invocation %s (no async callback requested)", invocationID) + } + if err := GlobalAddonInvocationStore.Update(ctx, invocation); err != nil { logger.Error("failed to update invocation status: %v", err) } diff --git a/api/addon_rate_limiter.go b/api/addon_rate_limiter.go index 64d8af6d..b6af1ee1 100644 --- a/api/addon_rate_limiter.go +++ b/api/addon_rate_limiter.go @@ -32,7 +32,7 @@ func (rl *AddonRateLimiter) buildRateLimitKey(userID uuid.UUID) string { return fmt.Sprintf("addon:ratelimit:hour:%s", userID.String()) } -// CheckActiveInvocationLimit checks if user has an active invocation (blocks if they do) +// CheckActiveInvocationLimit checks if user has reached their concurrent invocation limit func (rl *AddonRateLimiter) CheckActiveInvocationLimit(ctx context.Context, userID uuid.UUID) error { logger := slogging.Get() @@ -43,24 +43,63 @@ func (rl *AddonRateLimiter) CheckActiveInvocationLimit(ctx context.Context, user return fmt.Errorf("failed to check quota: %w", err) } - // Check if user has an active invocation - activeInvocation, err := GlobalAddonInvocationStore.GetActiveForUser(ctx, userID) + // Get active invocations for user (fetch one more than limit to check if over) + activeInvocations, err := GlobalAddonInvocationStore.ListActiveForUser(ctx, userID, quota.MaxActiveInvocations+1) if err != nil { - logger.Error("Failed to check active invocation for user %s: %v", userID, err) - return fmt.Errorf("failed to check active invocation: %w", err) + logger.Error("Failed to check active invocations for user %s: %v", userID, err) + return fmt.Errorf("failed to check active invocations: %w", err) } - if activeInvocation != nil { - logger.Warn("User %s has active invocation (limit: %d): invocation_id=%s", - userID, quota.MaxActiveInvocations, activeInvocation.ID) + if len(activeInvocations) >= quota.MaxActiveInvocations { + logger.Warn("User %s has %d active invocations (limit: %d)", + userID, len(activeInvocations), quota.MaxActiveInvocations) + + // Build blocking invocation details for the error response + blockingInvocations := make([]map[string]interface{}, 0, len(activeInvocations)) + var earliestTimeout time.Time + + for _, inv := range activeInvocations { + timeout := inv.LastActivityAt.Add(AddonInvocationTimeout) + if earliestTimeout.IsZero() || timeout.Before(earliestTimeout) { + earliestTimeout = timeout + } + + blockingInvocations = append(blockingInvocations, map[string]interface{}{ + "invocation_id": inv.ID.String(), + "addon_id": inv.AddonID.String(), + "status": inv.Status, + "created_at": inv.CreatedAt.Format(time.RFC3339), + "expires_at": timeout.Format(time.RFC3339), + "seconds_remaining": int(time.Until(timeout).Seconds()), + }) + } + + // Calculate retry_after (time until oldest invocation times out) + retryAfter := int(time.Until(earliestTimeout).Seconds()) + if retryAfter < 0 { + retryAfter = 0 + } + + suggestion := fmt.Sprintf("Wait for an existing invocation to complete, or retry after %d seconds when the oldest will timeout.", retryAfter) + return &RequestError{ Status: 429, Code: "rate_limit_exceeded", - Message: fmt.Sprintf("You already have an active add-on invocation. Please wait for it to complete. (Limit: %d concurrent invocation)", quota.MaxActiveInvocations), + Message: fmt.Sprintf("Active invocation limit reached: %d/%d concurrent invocations.", len(activeInvocations), quota.MaxActiveInvocations), + Details: &ErrorDetails{ + Context: map[string]interface{}{ + "limit": quota.MaxActiveInvocations, + "current": len(activeInvocations), + "retry_after": retryAfter, + "blocking_invocations": blockingInvocations, + }, + Suggestion: &suggestion, + }, } } - logger.Debug("Active invocation check passed for user %s", userID) + logger.Debug("Active invocation check passed for user %s: %d/%d", + userID, len(activeInvocations), quota.MaxActiveInvocations) return nil } diff --git a/api/request_utils.go b/api/request_utils.go index fbb1178a..62c9076f 100644 --- a/api/request_utils.go +++ b/api/request_utils.go @@ -478,6 +478,15 @@ func HandleRequestError(c *gin.Context, err error) { SetWWWAuthenticateHeader(c, WWWAuthInvalidToken, reqErr.Message) } + // Add Retry-After header for 429 Too Many Requests responses per RFC 6585 + if reqErr.Status == http.StatusTooManyRequests && reqErr.Details != nil { + if retryAfter, ok := reqErr.Details.Context["retry_after"]; ok { + if retryAfterInt, ok := retryAfter.(int); ok { + c.Header("Retry-After", fmt.Sprintf("%d", retryAfterInt)) + } + } + } + c.JSON(reqErr.Status, response) } else { // SECURITY: Truncate error message before any stack trace markers to prevent diff --git a/api/version.go b/api/version.go index d3241285..2ca24a00 100644 --- a/api/version.go +++ b/api/version.go @@ -29,7 +29,7 @@ var ( // Minor version number VersionMinor = "272" // Patch version number - VersionPatch = "1" + VersionPatch = "2" // GitCommit is the git commit hash from build GitCommit = "development" // BuildDate is the build timestamp diff --git a/cmd/server/main.go b/cmd/server/main.go index d5410942..e9e613bd 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1306,7 +1306,7 @@ func setupRouter(config *config.Config) (*gin.Engine, *api.Server) { } // startWebhookWorkers initializes and starts all webhook workers -func startWebhookWorkers(ctx context.Context) (*api.WebhookEventConsumer, *api.WebhookChallengeWorker, *api.WebhookDeliveryWorker, *api.WebhookCleanupWorker) { +func startWebhookWorkers(ctx context.Context, cfg *config.Config) (*api.WebhookEventConsumer, *api.WebhookChallengeWorker, *api.WebhookDeliveryWorker, *api.WebhookCleanupWorker) { logger := slogging.Get() // Start webhook workers if database and Redis are available @@ -1354,6 +1354,7 @@ func startWebhookWorkers(ctx context.Context) (*api.WebhookEventConsumer, *api.W // Start addon invocation worker api.GlobalAddonInvocationWorker = api.NewAddonInvocationWorker() + api.GlobalAddonInvocationWorker.SetBaseURL(cfg.GetBaseURL()) if err := api.GlobalAddonInvocationWorker.Start(ctx); err != nil { logger.Error("Failed to start addon invocation worker: %v", err) } @@ -1444,7 +1445,7 @@ func main() { apiServer.StartWebSocketHub(ctx) // Initialize and start webhook workers - webhookConsumer, challengeWorker, deliveryWorker, cleanupWorker := startWebhookWorkers(ctx) + webhookConsumer, challengeWorker, deliveryWorker, cleanupWorker := startWebhookWorkers(ctx, cfg) // Prepare address addr := fmt.Sprintf("%s:%s", cfg.Server.Interface, cfg.Server.Port) diff --git a/docs/developer/addons/addon-development-guide.md b/docs/developer/addons/addon-development-guide.md index 9fa4363d..850974ab 100644 --- a/docs/developer/addons/addon-development-guide.md +++ b/docs/developer/addons/addon-development-guide.md @@ -162,12 +162,42 @@ def handle_invocation(): task_queue.enqueue(process_invocation, payload) # Respond immediately - return '', 200 # TMI marks as in_progress + return '', 200 # TMI auto-completes the invocation ``` -### Step 4: Update Status During Processing +### Callback Modes -Call back to TMI to update progress: +TMI supports two callback modes, controlled by the `X-TMI-Callback` response header: + +**Auto-Complete Mode (Default)** + +When your webhook returns a 2xx response without the `X-TMI-Callback` header (or with any value other than `async`), TMI automatically marks the invocation as `completed`. Use this mode when: +- Your webhook handles the work synchronously +- You don't need to report progress updates +- The invocation is "fire and forget" + +```python +# Auto-complete mode - invocation marked complete immediately +return '', 200 +``` + +**Async Callback Mode** + +When your webhook returns the `X-TMI-Callback: async` header, TMI marks the invocation as `in_progress` and expects your service to call back with status updates. Use this mode when: +- Your processing takes significant time +- You want to report progress percentages +- You need to report success or failure after processing + +```python +# Async callback mode - you must call back with status updates +return '', 200, {'X-TMI-Callback': 'async'} +``` + +**Important:** If you use async mode but never call back, the invocation will timeout after 15 minutes of inactivity and be marked as `failed`. + +### Step 4: Update Status During Processing (Async Mode Only) + +If using async callback mode, call back to TMI to update progress: ```python import requests diff --git a/internal/config/config.go b/internal/config/config.go index 72ff6b0e..f509aa9d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -38,6 +38,7 @@ type Config struct { type ServerConfig struct { Port string `yaml:"port" env:"SERVER_PORT"` Interface string `yaml:"interface" env:"SERVER_INTERFACE"` + BaseURL string `yaml:"base_url" env:"SERVER_BASE_URL"` // Public base URL for callbacks (auto-inferred if empty) ReadTimeout time.Duration `yaml:"read_timeout" env:"SERVER_READ_TIMEOUT"` WriteTimeout time.Duration `yaml:"write_timeout" env:"SERVER_WRITE_TIMEOUT"` IdleTimeout time.Duration `yaml:"idle_timeout" env:"SERVER_IDLE_TIMEOUT"` @@ -924,3 +925,30 @@ func (c *Config) GetOAuthProvider(providerID string) (OAuthProviderConfig, bool) func (c *Config) GetWebSocketInactivityTimeout() time.Duration { return time.Duration(c.WebSocket.InactivityTimeoutSeconds) * time.Second } + +// GetBaseURL returns the server's public base URL for callbacks. +// If BaseURL is explicitly configured, it is returned as-is. +// Otherwise, the URL is auto-inferred from Interface, Port, and TLSEnabled. +func (c *Config) GetBaseURL() string { + if c.Server.BaseURL != "" { + return c.Server.BaseURL + } + + // Auto-infer from server configuration + scheme := "http" + if c.Server.TLSEnabled { + scheme = "https" + } + + host := c.Server.Interface + if host == "" { + host = "localhost" + } + + port := c.Server.Port + if port == "" { + port = "8080" + } + + return fmt.Sprintf("%s://%s:%s", scheme, host, port) +}