Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"major": 0,
"minor": 272,
"patch": 1
"patch": 2
}
2 changes: 1 addition & 1 deletion api/addon_invocation_quota_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// Default quota values
const (
DefaultMaxActiveInvocations = 1
DefaultMaxActiveInvocations = 3 // Allow 3 concurrent invocations per user by default
DefaultMaxInvocationsPerHour = 10
)

Expand Down
64 changes: 64 additions & 0 deletions api/addon_invocation_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type AddonInvocationStore interface {
// GetActiveForUser retrieves the active invocation for a user (for quota enforcement)
GetActiveForUser(ctx context.Context, userID uuid.UUID) (*AddonInvocation, error)

// ListActiveForUser retrieves all active invocations (pending/in_progress) for a user up to limit
ListActiveForUser(ctx context.Context, userID uuid.UUID, limit int) ([]AddonInvocation, error)

// Delete removes an invocation (for cleanup)
Delete(ctx context.Context, id uuid.UUID) error

Expand Down Expand Up @@ -370,6 +373,67 @@ func (s *AddonInvocationRedisStore) GetActiveForUser(ctx context.Context, userID
return s.Get(ctx, invocationID)
}

// ListActiveForUser retrieves all active invocations (pending/in_progress) for a user up to limit
func (s *AddonInvocationRedisStore) ListActiveForUser(ctx context.Context, userID uuid.UUID, limit int) ([]AddonInvocation, error) {
logger := slogging.Get()

// Scan for all invocation keys
pattern := "addon:invocation:*"
var cursor uint64
var activeInvocations []AddonInvocation

client := s.redis.GetClient()

for {
var keys []string
var newCursor uint64
keys, newCursor, err := client.Scan(ctx, cursor, pattern, 100).Result()
if err != nil {
logger.Error("Failed to scan invocation keys: %v", err)
return nil, fmt.Errorf("failed to scan invocations: %w", err)
}

// Check each invocation
for _, key := range keys {
data, err := s.redis.Get(ctx, key)
if err != nil {
if err == redis.Nil {
continue // Key expired between scan and get
}
logger.Error("Failed to get invocation from key %s: %v", key, err)
continue
}

var invocation AddonInvocation
if err := json.Unmarshal([]byte(data), &invocation); err != nil {
logger.Error("Failed to unmarshal invocation from key %s: %v", key, err)
continue
}

// Check if invocation belongs to user and is active
if invocation.InvokedByUUID == userID &&
(invocation.Status == InvocationStatusPending || invocation.Status == InvocationStatusInProgress) {
activeInvocations = append(activeInvocations, invocation)

// Stop if we've reached the limit
if len(activeInvocations) >= limit {
logger.Debug("Found %d active invocations for user %s (limit reached)", len(activeInvocations), userID)
return activeInvocations, nil
}
}
}

cursor = newCursor
if cursor == 0 {
break
}
}

logger.Debug("Found %d active invocations for user %s", len(activeInvocations), userID)

return activeInvocations, nil
}

// Delete removes an invocation
func (s *AddonInvocationRedisStore) Delete(ctx context.Context, id uuid.UUID) error {
logger := slogging.Get()
Expand Down
33 changes: 27 additions & 6 deletions api/addon_invocation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type AddonInvocationWorker struct {
running bool
stopChan chan struct{}
workChan chan uuid.UUID // Channel for invocation IDs to process
baseURL string // Server base URL for callback URLs
}

// AddonInvocationPayload represents the payload sent to webhook endpoints
Expand All @@ -48,9 +49,15 @@ func NewAddonInvocationWorker() *AddonInvocationWorker {
},
stopChan: make(chan struct{}),
workChan: make(chan uuid.UUID, 100), // Buffer up to 100 pending invocations
baseURL: "http://localhost:8080", // Default, should be set via SetBaseURL
}
}

// SetBaseURL sets the server's base URL for callback URLs
func (w *AddonInvocationWorker) SetBaseURL(baseURL string) {
w.baseURL = baseURL
}

// Start begins processing invocations
func (w *AddonInvocationWorker) Start(ctx context.Context) error {
logger := slogging.Get()
Expand Down Expand Up @@ -145,9 +152,8 @@ func (w *AddonInvocationWorker) processInvocation(ctx context.Context, invocatio

logger.Debug("sending addon invocation to %s (invocation: %s)", webhook.Url, invocationID)

// Build callback URL
// TODO: Get base URL from configuration
callbackURL := fmt.Sprintf("https://tmi.example.com/invocations/%s/status", invocationID)
// Build callback URL using configured base URL
callbackURL := fmt.Sprintf("%s/invocations/%s/status", w.baseURL, invocationID)

// Build payload
payload := AddonInvocationPayload{
Expand Down Expand Up @@ -212,9 +218,24 @@ func (w *AddonInvocationWorker) processInvocation(ctx context.Context, invocatio
logger.Info("addon invocation sent successfully to %s (invocation: %s, status: %d)",
webhook.Url, invocationID, resp.StatusCode)

// Mark as in_progress (webhook will update to completed/failed via callback)
invocation.Status = InvocationStatusInProgress
invocation.StatusMessage = "Invocation sent to webhook"
// Check if webhook wants to use async callbacks
// If X-TMI-Callback: async is set, the webhook will call back with status updates
// Otherwise, auto-complete the invocation (webhook handles work internally)
callbackMode := resp.Header.Get("X-TMI-Callback")

if callbackMode == "async" {
// Webhook will call back with status updates
invocation.Status = InvocationStatusInProgress
invocation.StatusMessage = "Invocation sent to webhook, awaiting callback"
logger.Debug("webhook requested async callback mode for invocation %s", invocationID)
} else {
// Auto-complete: webhook accepted and will handle internally
invocation.Status = InvocationStatusCompleted
invocation.StatusMessage = "Invocation delivered successfully"
invocation.StatusPercent = 100
logger.Debug("auto-completing invocation %s (no async callback requested)", invocationID)
}

if err := GlobalAddonInvocationStore.Update(ctx, invocation); err != nil {
logger.Error("failed to update invocation status: %v", err)
}
Expand Down
59 changes: 49 additions & 10 deletions api/addon_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (rl *AddonRateLimiter) buildRateLimitKey(userID uuid.UUID) string {
return fmt.Sprintf("addon:ratelimit:hour:%s", userID.String())
}

// CheckActiveInvocationLimit checks if user has an active invocation (blocks if they do)
// CheckActiveInvocationLimit checks if user has reached their concurrent invocation limit
func (rl *AddonRateLimiter) CheckActiveInvocationLimit(ctx context.Context, userID uuid.UUID) error {
logger := slogging.Get()

Expand All @@ -43,24 +43,63 @@ func (rl *AddonRateLimiter) CheckActiveInvocationLimit(ctx context.Context, user
return fmt.Errorf("failed to check quota: %w", err)
}

// Check if user has an active invocation
activeInvocation, err := GlobalAddonInvocationStore.GetActiveForUser(ctx, userID)
// Get active invocations for user (fetch one more than limit to check if over)
activeInvocations, err := GlobalAddonInvocationStore.ListActiveForUser(ctx, userID, quota.MaxActiveInvocations+1)
if err != nil {
logger.Error("Failed to check active invocation for user %s: %v", userID, err)
return fmt.Errorf("failed to check active invocation: %w", err)
logger.Error("Failed to check active invocations for user %s: %v", userID, err)
return fmt.Errorf("failed to check active invocations: %w", err)
}

if activeInvocation != nil {
logger.Warn("User %s has active invocation (limit: %d): invocation_id=%s",
userID, quota.MaxActiveInvocations, activeInvocation.ID)
if len(activeInvocations) >= quota.MaxActiveInvocations {
logger.Warn("User %s has %d active invocations (limit: %d)",
userID, len(activeInvocations), quota.MaxActiveInvocations)

// Build blocking invocation details for the error response
blockingInvocations := make([]map[string]interface{}, 0, len(activeInvocations))
var earliestTimeout time.Time

for _, inv := range activeInvocations {
timeout := inv.LastActivityAt.Add(AddonInvocationTimeout)
if earliestTimeout.IsZero() || timeout.Before(earliestTimeout) {
earliestTimeout = timeout
}

blockingInvocations = append(blockingInvocations, map[string]interface{}{
"invocation_id": inv.ID.String(),
"addon_id": inv.AddonID.String(),
"status": inv.Status,
"created_at": inv.CreatedAt.Format(time.RFC3339),
"expires_at": timeout.Format(time.RFC3339),
"seconds_remaining": int(time.Until(timeout).Seconds()),
})
}

// Calculate retry_after (time until oldest invocation times out)
retryAfter := int(time.Until(earliestTimeout).Seconds())
if retryAfter < 0 {
retryAfter = 0
}

suggestion := fmt.Sprintf("Wait for an existing invocation to complete, or retry after %d seconds when the oldest will timeout.", retryAfter)

return &RequestError{
Status: 429,
Code: "rate_limit_exceeded",
Message: fmt.Sprintf("You already have an active add-on invocation. Please wait for it to complete. (Limit: %d concurrent invocation)", quota.MaxActiveInvocations),
Message: fmt.Sprintf("Active invocation limit reached: %d/%d concurrent invocations.", len(activeInvocations), quota.MaxActiveInvocations),
Details: &ErrorDetails{
Context: map[string]interface{}{
"limit": quota.MaxActiveInvocations,
"current": len(activeInvocations),
"retry_after": retryAfter,
"blocking_invocations": blockingInvocations,
},
Suggestion: &suggestion,
},
}
}

logger.Debug("Active invocation check passed for user %s", userID)
logger.Debug("Active invocation check passed for user %s: %d/%d",
userID, len(activeInvocations), quota.MaxActiveInvocations)
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions api/request_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,15 @@ func HandleRequestError(c *gin.Context, err error) {
SetWWWAuthenticateHeader(c, WWWAuthInvalidToken, reqErr.Message)
}

// Add Retry-After header for 429 Too Many Requests responses per RFC 6585
if reqErr.Status == http.StatusTooManyRequests && reqErr.Details != nil {
if retryAfter, ok := reqErr.Details.Context["retry_after"]; ok {
if retryAfterInt, ok := retryAfter.(int); ok {
c.Header("Retry-After", fmt.Sprintf("%d", retryAfterInt))
}
}
}

c.JSON(reqErr.Status, response)
} else {
// SECURITY: Truncate error message before any stack trace markers to prevent
Expand Down
2 changes: 1 addition & 1 deletion api/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
// Minor version number
VersionMinor = "272"
// Patch version number
VersionPatch = "1"
VersionPatch = "2"
// GitCommit is the git commit hash from build
GitCommit = "development"
// BuildDate is the build timestamp
Expand Down
5 changes: 3 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ func setupRouter(config *config.Config) (*gin.Engine, *api.Server) {
}

// startWebhookWorkers initializes and starts all webhook workers
func startWebhookWorkers(ctx context.Context) (*api.WebhookEventConsumer, *api.WebhookChallengeWorker, *api.WebhookDeliveryWorker, *api.WebhookCleanupWorker) {
func startWebhookWorkers(ctx context.Context, cfg *config.Config) (*api.WebhookEventConsumer, *api.WebhookChallengeWorker, *api.WebhookDeliveryWorker, *api.WebhookCleanupWorker) {
logger := slogging.Get()

// Start webhook workers if database and Redis are available
Expand Down Expand Up @@ -1354,6 +1354,7 @@ func startWebhookWorkers(ctx context.Context) (*api.WebhookEventConsumer, *api.W

// Start addon invocation worker
api.GlobalAddonInvocationWorker = api.NewAddonInvocationWorker()
api.GlobalAddonInvocationWorker.SetBaseURL(cfg.GetBaseURL())
if err := api.GlobalAddonInvocationWorker.Start(ctx); err != nil {
logger.Error("Failed to start addon invocation worker: %v", err)
}
Expand Down Expand Up @@ -1444,7 +1445,7 @@ func main() {
apiServer.StartWebSocketHub(ctx)

// Initialize and start webhook workers
webhookConsumer, challengeWorker, deliveryWorker, cleanupWorker := startWebhookWorkers(ctx)
webhookConsumer, challengeWorker, deliveryWorker, cleanupWorker := startWebhookWorkers(ctx, cfg)

// Prepare address
addr := fmt.Sprintf("%s:%s", cfg.Server.Interface, cfg.Server.Port)
Expand Down
36 changes: 33 additions & 3 deletions docs/developer/addons/addon-development-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,42 @@ def handle_invocation():
task_queue.enqueue(process_invocation, payload)

# Respond immediately
return '', 200 # TMI marks as in_progress
return '', 200 # TMI auto-completes the invocation
```

### Step 4: Update Status During Processing
### Callback Modes

Call back to TMI to update progress:
TMI supports two callback modes, controlled by the `X-TMI-Callback` response header:

**Auto-Complete Mode (Default)**

When your webhook returns a 2xx response without the `X-TMI-Callback` header (or with any value other than `async`), TMI automatically marks the invocation as `completed`. Use this mode when:
- Your webhook handles the work synchronously
- You don't need to report progress updates
- The invocation is "fire and forget"

```python
# Auto-complete mode - invocation marked complete immediately
return '', 200
```

**Async Callback Mode**

When your webhook returns the `X-TMI-Callback: async` header, TMI marks the invocation as `in_progress` and expects your service to call back with status updates. Use this mode when:
- Your processing takes significant time
- You want to report progress percentages
- You need to report success or failure after processing

```python
# Async callback mode - you must call back with status updates
return '', 200, {'X-TMI-Callback': 'async'}
```

**Important:** If you use async mode but never call back, the invocation will timeout after 15 minutes of inactivity and be marked as `failed`.

### Step 4: Update Status During Processing (Async Mode Only)

If using async callback mode, call back to TMI to update progress:

```python
import requests
Expand Down
28 changes: 28 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Config struct {
type ServerConfig struct {
Port string `yaml:"port" env:"SERVER_PORT"`
Interface string `yaml:"interface" env:"SERVER_INTERFACE"`
BaseURL string `yaml:"base_url" env:"SERVER_BASE_URL"` // Public base URL for callbacks (auto-inferred if empty)
ReadTimeout time.Duration `yaml:"read_timeout" env:"SERVER_READ_TIMEOUT"`
WriteTimeout time.Duration `yaml:"write_timeout" env:"SERVER_WRITE_TIMEOUT"`
IdleTimeout time.Duration `yaml:"idle_timeout" env:"SERVER_IDLE_TIMEOUT"`
Expand Down Expand Up @@ -924,3 +925,30 @@ func (c *Config) GetOAuthProvider(providerID string) (OAuthProviderConfig, bool)
func (c *Config) GetWebSocketInactivityTimeout() time.Duration {
return time.Duration(c.WebSocket.InactivityTimeoutSeconds) * time.Second
}

// GetBaseURL returns the server's public base URL for callbacks.
// If BaseURL is explicitly configured, it is returned as-is.
// Otherwise, the URL is auto-inferred from Interface, Port, and TLSEnabled.
func (c *Config) GetBaseURL() string {
if c.Server.BaseURL != "" {
return c.Server.BaseURL
}

// Auto-infer from server configuration
scheme := "http"
if c.Server.TLSEnabled {
scheme = "https"
}

host := c.Server.Interface
if host == "" {
host = "localhost"
}

port := c.Server.Port
if port == "" {
port = "8080"
}

return fmt.Sprintf("%s://%s:%s", scheme, host, port)
}