Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
aa7f075
fix(jobs): rename NextRetryAt to NextRetry to match River interface
priyanshujain Mar 20, 2026
03f2dff
feat(jobs): classify errors for adaptive retry timing
priyanshujain Mar 20, 2026
161ac8c
feat(jobs): cancel non-retryable jobs with river.JobCancel
priyanshujain Mar 20, 2026
a9c0dcb
feat(scheduler): increase MaxAttempts to 3
priyanshujain Mar 20, 2026
09185b2
feat(tools): add scrubEnv to remove sensitive env vars
priyanshujain Mar 20, 2026
a75d25b
fix(tools): scrub env for all external agent runners
priyanshujain Mar 20, 2026
9b2aa73
feat(tasks): add schema and migration
priyanshujain Mar 20, 2026
da1f86d
feat(tasks): add CRUD store functions
priyanshujain Mar 20, 2026
4a3bdbd
test(tasks): add store tests
priyanshujain Mar 20, 2026
70d501c
feat(tasks): add cleanup with 7-day retention
priyanshujain Mar 20, 2026
be06c19
feat(config): add tasks storage config
priyanshujain Mar 20, 2026
5541116
refactor(tools): add NewPersistentTaskTracker with DB backing
priyanshujain Mar 20, 2026
3973e15
test(tools): add persistent task tracker tests
priyanshujain Mar 20, 2026
7c08f05
feat(cli): wire persistent task tracker
priyanshujain Mar 20, 2026
352209d
feat(telegram): wire persistent task tracker
priyanshujain Mar 20, 2026
b331f30
feat(delegate): add audit logging with context=delegated
priyanshujain Mar 20, 2026
736de9f
feat(provider): extract shared pricing map
priyanshujain Mar 20, 2026
09023f0
refactor(cli): use shared pricing from provider package
priyanshujain Mar 20, 2026
25456cf
feat(agent): add BudgetTracker and BudgetChecker
priyanshujain Mar 20, 2026
28382fc
feat(agent): add WithBudgetChecker option to agent loop
priyanshujain Mar 20, 2026
c506ae4
feat(scheduler): update store for budget columns
priyanshujain Mar 20, 2026
f9c613f
feat(provider): export Router.Resolve method
priyanshujain Mar 20, 2026
13eaa0c
feat(jobs): use model tier and budget in scheduled task worker
priyanshujain Mar 20, 2026
baa74f6
feat(tools): add model_tier and max_budget_usd to schedule tools
priyanshujain Mar 20, 2026
265be6e
feat(scheduler): add reactive type and trigger fields to types
priyanshujain Mar 20, 2026
ad973e1
feat(scheduler): add trigger columns to schema
priyanshujain Mar 20, 2026
d042624
feat(scheduler): update store for trigger columns
priyanshujain Mar 20, 2026
ec00309
feat(scheduler): add trigger validation and query builder
priyanshujain Mar 20, 2026
591a599
test(scheduler): add trigger tests
priyanshujain Mar 20, 2026
3bc608d
feat(daemon): add SyncNotifier for sync completion signals
priyanshujain Mar 20, 2026
23107c7
feat(daemon): wire sync notifier into all sync goroutines
priyanshujain Mar 20, 2026
7233b6c
feat(scheduler): add reactive check loop
priyanshujain Mar 20, 2026
ddfdd2b
feat(tools): add reactive type to schedule tools
priyanshujain Mar 20, 2026
ae66a74
test(spectest): add reactive trigger end-to-end spec test
priyanshujain Mar 20, 2026
b02055c
test(coverage): add tests for reactive store, cleanup, and budget Total
priyanshujain Mar 20, 2026
565f166
fix(scheduler): replace trigger query denylist with allowlist to prev…
priyanshujain Mar 20, 2026
c63f1ee
fix(daemon): notify reactive triggers periodically during WhatsApp sync
priyanshujain Mar 20, 2026
6b68654
refactor(tools): extract OpenPersistentTaskTracker to deduplicate
priyanshujain Mar 20, 2026
59c1515
fix(daemon): format reactive trigger rows as key-value pairs
priyanshujain Mar 20, 2026
0820ad2
fix(jobs): remove dead code in NextRetry for cancelled error types
priyanshujain Mar 20, 2026
8c8b5c1
fix(tools): expand env scrub to catch DATABASE_URL, DSN, PRIVATE_KEY
priyanshujain Mar 20, 2026
ee84c06
test(agent): add tests for budget-exceeded in agent loop
priyanshujain Mar 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Agent struct {
summarizer Summarizer
rateLimiter *provider.RateLimiter
usageRecorder UsageRecorder
budgetChecker BudgetChecker
}

// Option configures an Agent.
Expand Down Expand Up @@ -95,6 +96,11 @@ func WithSummarizer(s Summarizer) Option {
return func(a *Agent) { a.summarizer = s }
}

// WithBudgetChecker sets a budget checker that is called after each LLM call.
func WithBudgetChecker(bc BudgetChecker) Option {
return func(a *Agent) { a.budgetChecker = bc }
}

// New creates a new Agent.
func New(p provider.Provider, model string, executor ToolExecutor, opts ...Option) *Agent {
a := &Agent{
Expand Down Expand Up @@ -143,6 +149,16 @@ func (a *Agent) Run(ctx context.Context, input string) (string, error) {
if a.usageRecorder != nil {
a.usageRecorder.RecordUsage(a.model, resp.Usage)
}
if a.budgetChecker != nil {
if err := a.budgetChecker.CheckBudget(); err != nil {
// Return partial text on budget exceeded (graceful).
text := resp.TextContent()
if text == "" {
text = "(budget exceeded before completion)"
}
return text, err
}
}

// Append assistant response to history.
a.history = append(a.history, provider.Message{
Expand Down
49 changes: 49 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,55 @@ func TestLoop_TokenBasedCompaction(t *testing.T) {
}
}

func TestLoop_BudgetExceeded(t *testing.T) {
mp := &mockProvider{
responses: []*provider.ChatResponse{
{
Content: []provider.ContentBlock{{Type: provider.ContentText, Text: "Partial result"}},
StopReason: provider.StopEndTurn,
Usage: provider.Usage{InputTokens: 1_000_000, OutputTokens: 1_000_000},
},
},
}
exec := &mockExecutor{results: map[string]string{}}
bt := NewBudgetTracker(0.001, nil) // $0.001 budget — 1M tokens of sonnet costs $18
a := New(mp, "claude-sonnet-4-6", exec, WithUsageRecorder(bt), WithBudgetChecker(bt))

result, err := a.Run(context.Background(), "test")
if err == nil {
t.Fatal("expected budget exceeded error")
}
if !strings.Contains(err.Error(), "budget exceeded") {
t.Errorf("error = %q, want budget exceeded", err.Error())
}
if result != "Partial result" {
t.Errorf("result = %q, want partial text returned", result)
}
}

func TestLoop_BudgetNotExceeded(t *testing.T) {
mp := &mockProvider{
responses: []*provider.ChatResponse{
{
Content: []provider.ContentBlock{{Type: provider.ContentText, Text: "Full result"}},
StopReason: provider.StopEndTurn,
Usage: provider.Usage{InputTokens: 100, OutputTokens: 100},
},
},
}
exec := &mockExecutor{results: map[string]string{}}
bt := NewBudgetTracker(10.0, nil) // $10 budget — more than enough
a := New(mp, "claude-sonnet-4-6", exec, WithUsageRecorder(bt), WithBudgetChecker(bt))

result, err := a.Run(context.Background(), "test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result != "Full result" {
t.Errorf("result = %q, want Full result", result)
}
}

func TestLoop_TracksLastInputTokens(t *testing.T) {
mp := &mockProvider{
responses: []*provider.ChatResponse{
Expand Down
58 changes: 58 additions & 0 deletions agent/budget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package agent

import (
"fmt"
"sync"

"github.com/73ai/openbotkit/provider"
)

// BudgetChecker checks whether the accumulated cost has exceeded a budget.
type BudgetChecker interface {
CheckBudget() error
}

// BudgetTracker wraps a UsageRecorder and accumulates cost per call.
// It implements both UsageRecorder and BudgetChecker.
type BudgetTracker struct {
maxBudget float64
inner UsageRecorder
mu sync.Mutex
total float64
}

// NewBudgetTracker creates a tracker that enforces a cost budget.
// If maxBudget is 0, budget checking is disabled (unlimited).
func NewBudgetTracker(maxBudget float64, inner UsageRecorder) *BudgetTracker {
return &BudgetTracker{maxBudget: maxBudget, inner: inner}
}

func (bt *BudgetTracker) RecordUsage(model string, usage provider.Usage) {
cost := provider.EstimateCost(model, usage)
bt.mu.Lock()
bt.total += cost
bt.mu.Unlock()
if bt.inner != nil {
bt.inner.RecordUsage(model, usage)
}
}

func (bt *BudgetTracker) CheckBudget() error {
if bt.maxBudget <= 0 {
return nil
}
bt.mu.Lock()
total := bt.total
bt.mu.Unlock()
if total >= bt.maxBudget {
return fmt.Errorf("budget exceeded: $%.4f spent of $%.4f limit", total, bt.maxBudget)
}
return nil
}

// Total returns the accumulated cost so far.
func (bt *BudgetTracker) Total() float64 {
bt.mu.Lock()
defer bt.mu.Unlock()
return bt.total
}
61 changes: 61 additions & 0 deletions agent/budget_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package agent

import (
"testing"

"github.com/73ai/openbotkit/provider"
)

type mockRecorder struct {
calls int
}

func (m *mockRecorder) RecordUsage(_ string, _ provider.Usage) {
m.calls++
}

func TestBudgetTracker_UnderBudget(t *testing.T) {
bt := NewBudgetTracker(1.0, nil)
bt.RecordUsage("claude-sonnet-4-6", provider.Usage{InputTokens: 1000, OutputTokens: 1000})
if err := bt.CheckBudget(); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

func TestBudgetTracker_ExceedsBudget(t *testing.T) {
bt := NewBudgetTracker(0.01, nil)
// 1M tokens of sonnet = $18, well over $0.01
bt.RecordUsage("claude-sonnet-4-6", provider.Usage{InputTokens: 1_000_000, OutputTokens: 1_000_000})
if err := bt.CheckBudget(); err == nil {
t.Error("expected budget exceeded error")
}
}

func TestBudgetTracker_Unlimited(t *testing.T) {
bt := NewBudgetTracker(0, nil)
bt.RecordUsage("claude-opus-4-6", provider.Usage{InputTokens: 10_000_000, OutputTokens: 10_000_000})
if err := bt.CheckBudget(); err != nil {
t.Errorf("unlimited budget should never error: %v", err)
}
}

func TestBudgetTracker_ChainsInnerRecorder(t *testing.T) {
inner := &mockRecorder{}
bt := NewBudgetTracker(1.0, inner)
bt.RecordUsage("claude-sonnet-4-6", provider.Usage{InputTokens: 100})
bt.RecordUsage("claude-sonnet-4-6", provider.Usage{InputTokens: 200})
if inner.calls != 2 {
t.Errorf("inner.calls = %d, want 2", inner.calls)
}
}

func TestBudgetTracker_Total(t *testing.T) {
bt := NewBudgetTracker(1.0, nil)
if bt.Total() != 0 {
t.Errorf("initial total = %f, want 0", bt.Total())
}
bt.RecordUsage("claude-sonnet-4-6", provider.Usage{InputTokens: 1000, OutputTokens: 1000})
if bt.Total() <= 0 {
t.Error("expected positive total after recording usage")
}
}
73 changes: 72 additions & 1 deletion agent/tools/agent_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (r *AgentRunner) buildArgs(opts runOptions) []string {
}

func (r *AgentRunner) buildEnv() []string {
env := os.Environ()
env := scrubEnv(os.Environ())
if r.info.Kind == AgentClaude {
return filterEnv(env, "CLAUDECODE")
}
Expand All @@ -137,3 +137,74 @@ func filterEnv(env []string, key string) []string {
}
return filtered
}

// scrubEnv removes environment variables that contain sensitive values
// (API keys, tokens, secrets, passwords) from the given list.
func scrubEnv(env []string) []string {
filtered := make([]string, 0, len(env))
for _, e := range env {
eqIdx := strings.IndexByte(e, '=')
if eqIdx < 0 {
filtered = append(filtered, e)
continue
}
key := e[:eqIdx]
if !isSensitiveKey(key) {
filtered = append(filtered, e)
}
}
return filtered
}

var safeKeys = map[string]bool{
"PATH": true, "HOME": true, "USER": true, "SHELL": true,
"TERM": true, "LANG": true, "TMPDIR": true, "PWD": true,
"GOPATH": true, "GOROOT": true, "GOBIN": true,
"EDITOR": true, "VISUAL": true, "PAGER": true,
"HOSTNAME": true, "LOGNAME": true, "DISPLAY": true,
"COLORTERM": true, "TERM_PROGRAM": true, "SHLVL": true,
}

var safePrefixes = []string{"LC_", "XDG_"}

var sensitivePrefixes = []string{
"ANTHROPIC_", "OPENAI_", "GOOGLE_API_", "GEMINI_",
"GROQ_", "OPENROUTER_", "AWS_SECRET_", "CLAUDECODE",
}

var sensitiveSuffixes = []string{
"_KEY", "_SECRET", "_TOKEN", "_PASSWORD", "_CREDENTIAL", "_AUTH",
"_PRIVATE_KEY", "_DSN",
}

var sensitiveExact = map[string]bool{
"GITHUB_TOKEN": true, "GH_TOKEN": true,
"DATABASE_URL": true, "REDIS_URL": true, "MONGODB_URI": true,
"AMQP_URL": true, "ELASTICSEARCH_URL": true,
}

func isSensitiveKey(key string) bool {
if safeKeys[key] {
return false
}
for _, p := range safePrefixes {
if strings.HasPrefix(key, p) {
return false
}
}
if sensitiveExact[key] {
return true
}
upper := strings.ToUpper(key)
for _, p := range sensitivePrefixes {
if strings.HasPrefix(upper, p) {
return true
}
}
for _, s := range sensitiveSuffixes {
if strings.HasSuffix(upper, s) {
return true
}
}
return false
}
Loading
Loading