Skip to content

Commit df41a38

Browse files
ericfitzclaude
andauthored
fix(api): fix addon invocation completion and rate limit UX (#113)
- Auto-complete invocations by default (opt-in to async with X-TMI-Callback header) - Increase default max active invocations from 1 to 3 - Add detailed 429 response with blocking invocation info and Retry-After header - Auto-infer server base URL for callback URLs if not configured - Update addon and webhook documentation Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 46dee7c commit df41a38

File tree

10 files changed

+216
-24
lines changed

10 files changed

+216
-24
lines changed

.version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"major": 0,
33
"minor": 272,
4-
"patch": 1
4+
"patch": 2
55
}

api/addon_invocation_quota_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
// Default quota values
1414
const (
15-
DefaultMaxActiveInvocations = 1
15+
DefaultMaxActiveInvocations = 3 // Allow 3 concurrent invocations per user by default
1616
DefaultMaxInvocationsPerHour = 10
1717
)
1818

api/addon_invocation_store.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type AddonInvocationStore interface {
6868
// GetActiveForUser retrieves the active invocation for a user (for quota enforcement)
6969
GetActiveForUser(ctx context.Context, userID uuid.UUID) (*AddonInvocation, error)
7070

71+
// ListActiveForUser retrieves all active invocations (pending/in_progress) for a user up to limit
72+
ListActiveForUser(ctx context.Context, userID uuid.UUID, limit int) ([]AddonInvocation, error)
73+
7174
// Delete removes an invocation (for cleanup)
7275
Delete(ctx context.Context, id uuid.UUID) error
7376

@@ -370,6 +373,67 @@ func (s *AddonInvocationRedisStore) GetActiveForUser(ctx context.Context, userID
370373
return s.Get(ctx, invocationID)
371374
}
372375

376+
// ListActiveForUser retrieves all active invocations (pending/in_progress) for a user up to limit
377+
func (s *AddonInvocationRedisStore) ListActiveForUser(ctx context.Context, userID uuid.UUID, limit int) ([]AddonInvocation, error) {
378+
logger := slogging.Get()
379+
380+
// Scan for all invocation keys
381+
pattern := "addon:invocation:*"
382+
var cursor uint64
383+
var activeInvocations []AddonInvocation
384+
385+
client := s.redis.GetClient()
386+
387+
for {
388+
var keys []string
389+
var newCursor uint64
390+
keys, newCursor, err := client.Scan(ctx, cursor, pattern, 100).Result()
391+
if err != nil {
392+
logger.Error("Failed to scan invocation keys: %v", err)
393+
return nil, fmt.Errorf("failed to scan invocations: %w", err)
394+
}
395+
396+
// Check each invocation
397+
for _, key := range keys {
398+
data, err := s.redis.Get(ctx, key)
399+
if err != nil {
400+
if err == redis.Nil {
401+
continue // Key expired between scan and get
402+
}
403+
logger.Error("Failed to get invocation from key %s: %v", key, err)
404+
continue
405+
}
406+
407+
var invocation AddonInvocation
408+
if err := json.Unmarshal([]byte(data), &invocation); err != nil {
409+
logger.Error("Failed to unmarshal invocation from key %s: %v", key, err)
410+
continue
411+
}
412+
413+
// Check if invocation belongs to user and is active
414+
if invocation.InvokedByUUID == userID &&
415+
(invocation.Status == InvocationStatusPending || invocation.Status == InvocationStatusInProgress) {
416+
activeInvocations = append(activeInvocations, invocation)
417+
418+
// Stop if we've reached the limit
419+
if len(activeInvocations) >= limit {
420+
logger.Debug("Found %d active invocations for user %s (limit reached)", len(activeInvocations), userID)
421+
return activeInvocations, nil
422+
}
423+
}
424+
}
425+
426+
cursor = newCursor
427+
if cursor == 0 {
428+
break
429+
}
430+
}
431+
432+
logger.Debug("Found %d active invocations for user %s", len(activeInvocations), userID)
433+
434+
return activeInvocations, nil
435+
}
436+
373437
// Delete removes an invocation
374438
func (s *AddonInvocationRedisStore) Delete(ctx context.Context, id uuid.UUID) error {
375439
logger := slogging.Get()

api/addon_invocation_worker.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type AddonInvocationWorker struct {
2222
running bool
2323
stopChan chan struct{}
2424
workChan chan uuid.UUID // Channel for invocation IDs to process
25+
baseURL string // Server base URL for callback URLs
2526
}
2627

2728
// AddonInvocationPayload represents the payload sent to webhook endpoints
@@ -48,9 +49,15 @@ func NewAddonInvocationWorker() *AddonInvocationWorker {
4849
},
4950
stopChan: make(chan struct{}),
5051
workChan: make(chan uuid.UUID, 100), // Buffer up to 100 pending invocations
52+
baseURL: "http://localhost:8080", // Default, should be set via SetBaseURL
5153
}
5254
}
5355

56+
// SetBaseURL sets the server's base URL for callback URLs
57+
func (w *AddonInvocationWorker) SetBaseURL(baseURL string) {
58+
w.baseURL = baseURL
59+
}
60+
5461
// Start begins processing invocations
5562
func (w *AddonInvocationWorker) Start(ctx context.Context) error {
5663
logger := slogging.Get()
@@ -145,9 +152,8 @@ func (w *AddonInvocationWorker) processInvocation(ctx context.Context, invocatio
145152

146153
logger.Debug("sending addon invocation to %s (invocation: %s)", webhook.Url, invocationID)
147154

148-
// Build callback URL
149-
// TODO: Get base URL from configuration
150-
callbackURL := fmt.Sprintf("https://tmi.example.com/invocations/%s/status", invocationID)
155+
// Build callback URL using configured base URL
156+
callbackURL := fmt.Sprintf("%s/invocations/%s/status", w.baseURL, invocationID)
151157

152158
// Build payload
153159
payload := AddonInvocationPayload{
@@ -212,9 +218,24 @@ func (w *AddonInvocationWorker) processInvocation(ctx context.Context, invocatio
212218
logger.Info("addon invocation sent successfully to %s (invocation: %s, status: %d)",
213219
webhook.Url, invocationID, resp.StatusCode)
214220

215-
// Mark as in_progress (webhook will update to completed/failed via callback)
216-
invocation.Status = InvocationStatusInProgress
217-
invocation.StatusMessage = "Invocation sent to webhook"
221+
// Check if webhook wants to use async callbacks
222+
// If X-TMI-Callback: async is set, the webhook will call back with status updates
223+
// Otherwise, auto-complete the invocation (webhook handles work internally)
224+
callbackMode := resp.Header.Get("X-TMI-Callback")
225+
226+
if callbackMode == "async" {
227+
// Webhook will call back with status updates
228+
invocation.Status = InvocationStatusInProgress
229+
invocation.StatusMessage = "Invocation sent to webhook, awaiting callback"
230+
logger.Debug("webhook requested async callback mode for invocation %s", invocationID)
231+
} else {
232+
// Auto-complete: webhook accepted and will handle internally
233+
invocation.Status = InvocationStatusCompleted
234+
invocation.StatusMessage = "Invocation delivered successfully"
235+
invocation.StatusPercent = 100
236+
logger.Debug("auto-completing invocation %s (no async callback requested)", invocationID)
237+
}
238+
218239
if err := GlobalAddonInvocationStore.Update(ctx, invocation); err != nil {
219240
logger.Error("failed to update invocation status: %v", err)
220241
}

api/addon_rate_limiter.go

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (rl *AddonRateLimiter) buildRateLimitKey(userID uuid.UUID) string {
3232
return fmt.Sprintf("addon:ratelimit:hour:%s", userID.String())
3333
}
3434

35-
// CheckActiveInvocationLimit checks if user has an active invocation (blocks if they do)
35+
// CheckActiveInvocationLimit checks if user has reached their concurrent invocation limit
3636
func (rl *AddonRateLimiter) CheckActiveInvocationLimit(ctx context.Context, userID uuid.UUID) error {
3737
logger := slogging.Get()
3838

@@ -43,24 +43,63 @@ func (rl *AddonRateLimiter) CheckActiveInvocationLimit(ctx context.Context, user
4343
return fmt.Errorf("failed to check quota: %w", err)
4444
}
4545

46-
// Check if user has an active invocation
47-
activeInvocation, err := GlobalAddonInvocationStore.GetActiveForUser(ctx, userID)
46+
// Get active invocations for user (fetch one more than limit to check if over)
47+
activeInvocations, err := GlobalAddonInvocationStore.ListActiveForUser(ctx, userID, quota.MaxActiveInvocations+1)
4848
if err != nil {
49-
logger.Error("Failed to check active invocation for user %s: %v", userID, err)
50-
return fmt.Errorf("failed to check active invocation: %w", err)
49+
logger.Error("Failed to check active invocations for user %s: %v", userID, err)
50+
return fmt.Errorf("failed to check active invocations: %w", err)
5151
}
5252

53-
if activeInvocation != nil {
54-
logger.Warn("User %s has active invocation (limit: %d): invocation_id=%s",
55-
userID, quota.MaxActiveInvocations, activeInvocation.ID)
53+
if len(activeInvocations) >= quota.MaxActiveInvocations {
54+
logger.Warn("User %s has %d active invocations (limit: %d)",
55+
userID, len(activeInvocations), quota.MaxActiveInvocations)
56+
57+
// Build blocking invocation details for the error response
58+
blockingInvocations := make([]map[string]interface{}, 0, len(activeInvocations))
59+
var earliestTimeout time.Time
60+
61+
for _, inv := range activeInvocations {
62+
timeout := inv.LastActivityAt.Add(AddonInvocationTimeout)
63+
if earliestTimeout.IsZero() || timeout.Before(earliestTimeout) {
64+
earliestTimeout = timeout
65+
}
66+
67+
blockingInvocations = append(blockingInvocations, map[string]interface{}{
68+
"invocation_id": inv.ID.String(),
69+
"addon_id": inv.AddonID.String(),
70+
"status": inv.Status,
71+
"created_at": inv.CreatedAt.Format(time.RFC3339),
72+
"expires_at": timeout.Format(time.RFC3339),
73+
"seconds_remaining": int(time.Until(timeout).Seconds()),
74+
})
75+
}
76+
77+
// Calculate retry_after (time until oldest invocation times out)
78+
retryAfter := int(time.Until(earliestTimeout).Seconds())
79+
if retryAfter < 0 {
80+
retryAfter = 0
81+
}
82+
83+
suggestion := fmt.Sprintf("Wait for an existing invocation to complete, or retry after %d seconds when the oldest will timeout.", retryAfter)
84+
5685
return &RequestError{
5786
Status: 429,
5887
Code: "rate_limit_exceeded",
59-
Message: fmt.Sprintf("You already have an active add-on invocation. Please wait for it to complete. (Limit: %d concurrent invocation)", quota.MaxActiveInvocations),
88+
Message: fmt.Sprintf("Active invocation limit reached: %d/%d concurrent invocations.", len(activeInvocations), quota.MaxActiveInvocations),
89+
Details: &ErrorDetails{
90+
Context: map[string]interface{}{
91+
"limit": quota.MaxActiveInvocations,
92+
"current": len(activeInvocations),
93+
"retry_after": retryAfter,
94+
"blocking_invocations": blockingInvocations,
95+
},
96+
Suggestion: &suggestion,
97+
},
6098
}
6199
}
62100

63-
logger.Debug("Active invocation check passed for user %s", userID)
101+
logger.Debug("Active invocation check passed for user %s: %d/%d",
102+
userID, len(activeInvocations), quota.MaxActiveInvocations)
64103
return nil
65104
}
66105

api/request_utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,15 @@ func HandleRequestError(c *gin.Context, err error) {
478478
SetWWWAuthenticateHeader(c, WWWAuthInvalidToken, reqErr.Message)
479479
}
480480

481+
// Add Retry-After header for 429 Too Many Requests responses per RFC 6585
482+
if reqErr.Status == http.StatusTooManyRequests && reqErr.Details != nil {
483+
if retryAfter, ok := reqErr.Details.Context["retry_after"]; ok {
484+
if retryAfterInt, ok := retryAfter.(int); ok {
485+
c.Header("Retry-After", fmt.Sprintf("%d", retryAfterInt))
486+
}
487+
}
488+
}
489+
481490
c.JSON(reqErr.Status, response)
482491
} else {
483492
// SECURITY: Truncate error message before any stack trace markers to prevent

api/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var (
2929
// Minor version number
3030
VersionMinor = "272"
3131
// Patch version number
32-
VersionPatch = "1"
32+
VersionPatch = "2"
3333
// GitCommit is the git commit hash from build
3434
GitCommit = "development"
3535
// BuildDate is the build timestamp

cmd/server/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,7 @@ func setupRouter(config *config.Config) (*gin.Engine, *api.Server) {
13061306
}
13071307

13081308
// startWebhookWorkers initializes and starts all webhook workers
1309-
func startWebhookWorkers(ctx context.Context) (*api.WebhookEventConsumer, *api.WebhookChallengeWorker, *api.WebhookDeliveryWorker, *api.WebhookCleanupWorker) {
1309+
func startWebhookWorkers(ctx context.Context, cfg *config.Config) (*api.WebhookEventConsumer, *api.WebhookChallengeWorker, *api.WebhookDeliveryWorker, *api.WebhookCleanupWorker) {
13101310
logger := slogging.Get()
13111311

13121312
// Start webhook workers if database and Redis are available
@@ -1354,6 +1354,7 @@ func startWebhookWorkers(ctx context.Context) (*api.WebhookEventConsumer, *api.W
13541354

13551355
// Start addon invocation worker
13561356
api.GlobalAddonInvocationWorker = api.NewAddonInvocationWorker()
1357+
api.GlobalAddonInvocationWorker.SetBaseURL(cfg.GetBaseURL())
13571358
if err := api.GlobalAddonInvocationWorker.Start(ctx); err != nil {
13581359
logger.Error("Failed to start addon invocation worker: %v", err)
13591360
}
@@ -1444,7 +1445,7 @@ func main() {
14441445
apiServer.StartWebSocketHub(ctx)
14451446

14461447
// Initialize and start webhook workers
1447-
webhookConsumer, challengeWorker, deliveryWorker, cleanupWorker := startWebhookWorkers(ctx)
1448+
webhookConsumer, challengeWorker, deliveryWorker, cleanupWorker := startWebhookWorkers(ctx, cfg)
14481449

14491450
// Prepare address
14501451
addr := fmt.Sprintf("%s:%s", cfg.Server.Interface, cfg.Server.Port)

docs/developer/addons/addon-development-guide.md

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,42 @@ def handle_invocation():
162162
task_queue.enqueue(process_invocation, payload)
163163

164164
# Respond immediately
165-
return '', 200 # TMI marks as in_progress
165+
return '', 200 # TMI auto-completes the invocation
166166
```
167167

168-
### Step 4: Update Status During Processing
168+
### Callback Modes
169169

170-
Call back to TMI to update progress:
170+
TMI supports two callback modes, controlled by the `X-TMI-Callback` response header:
171+
172+
**Auto-Complete Mode (Default)**
173+
174+
When your webhook returns a 2xx response without the `X-TMI-Callback` header (or with any value other than `async`), TMI automatically marks the invocation as `completed`. Use this mode when:
175+
- Your webhook handles the work synchronously
176+
- You don't need to report progress updates
177+
- The invocation is "fire and forget"
178+
179+
```python
180+
# Auto-complete mode - invocation marked complete immediately
181+
return '', 200
182+
```
183+
184+
**Async Callback Mode**
185+
186+
When your webhook returns the `X-TMI-Callback: async` header, TMI marks the invocation as `in_progress` and expects your service to call back with status updates. Use this mode when:
187+
- Your processing takes significant time
188+
- You want to report progress percentages
189+
- You need to report success or failure after processing
190+
191+
```python
192+
# Async callback mode - you must call back with status updates
193+
return '', 200, {'X-TMI-Callback': 'async'}
194+
```
195+
196+
**Important:** If you use async mode but never call back, the invocation will timeout after 15 minutes of inactivity and be marked as `failed`.
197+
198+
### Step 4: Update Status During Processing (Async Mode Only)
199+
200+
If using async callback mode, call back to TMI to update progress:
171201

172202
```python
173203
import requests

internal/config/config.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Config struct {
3838
type ServerConfig struct {
3939
Port string `yaml:"port" env:"SERVER_PORT"`
4040
Interface string `yaml:"interface" env:"SERVER_INTERFACE"`
41+
BaseURL string `yaml:"base_url" env:"SERVER_BASE_URL"` // Public base URL for callbacks (auto-inferred if empty)
4142
ReadTimeout time.Duration `yaml:"read_timeout" env:"SERVER_READ_TIMEOUT"`
4243
WriteTimeout time.Duration `yaml:"write_timeout" env:"SERVER_WRITE_TIMEOUT"`
4344
IdleTimeout time.Duration `yaml:"idle_timeout" env:"SERVER_IDLE_TIMEOUT"`
@@ -924,3 +925,30 @@ func (c *Config) GetOAuthProvider(providerID string) (OAuthProviderConfig, bool)
924925
func (c *Config) GetWebSocketInactivityTimeout() time.Duration {
925926
return time.Duration(c.WebSocket.InactivityTimeoutSeconds) * time.Second
926927
}
928+
929+
// GetBaseURL returns the server's public base URL for callbacks.
930+
// If BaseURL is explicitly configured, it is returned as-is.
931+
// Otherwise, the URL is auto-inferred from Interface, Port, and TLSEnabled.
932+
func (c *Config) GetBaseURL() string {
933+
if c.Server.BaseURL != "" {
934+
return c.Server.BaseURL
935+
}
936+
937+
// Auto-infer from server configuration
938+
scheme := "http"
939+
if c.Server.TLSEnabled {
940+
scheme = "https"
941+
}
942+
943+
host := c.Server.Interface
944+
if host == "" {
945+
host = "localhost"
946+
}
947+
948+
port := c.Server.Port
949+
if port == "" {
950+
port = "8080"
951+
}
952+
953+
return fmt.Sprintf("%s://%s:%s", scheme, host, port)
954+
}

0 commit comments

Comments
 (0)