Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cda9b2a
feat(scheduler): add schedule types and ChannelMeta struct
priyanshujain Mar 11, 2026
1aa62ce
feat(scheduler): add schema and migration for schedules table
priyanshujain Mar 11, 2026
65fff23
feat(scheduler): add CRUD store operations for schedules
priyanshujain Mar 11, 2026
66fcbfc
feat(config): add SchedulerConfig with storage DSN and defaults
priyanshujain Mar 11, 2026
5ebd642
feat(channel): add Pusher interface for outbound message delivery
priyanshujain Mar 11, 2026
fdf7cc4
feat(channel): add TelegramPusher for scheduled task delivery
priyanshujain Mar 11, 2026
40001b7
feat(channel): add SlackPusher for scheduled task delivery
priyanshujain Mar 11, 2026
fa75327
feat(tools): add AutoApproveInteractor for headless scheduled tasks
priyanshujain Mar 11, 2026
3f905e1
feat(channel): add PusherFactory to create channel-specific pushers
priyanshujain Mar 11, 2026
914c3e3
feat(daemon): add ScheduledTaskWorker River job for running tasks
priyanshujain Mar 11, 2026
9f11d1e
feat(daemon): register ScheduledTaskWorker in River client
priyanshujain Mar 11, 2026
78109fe
feat(daemon): add Scheduler with cron, one-shot polling, and reload
priyanshujain Mar 11, 2026
18b617b
feat(daemon): wire Scheduler into Daemon lifecycle
priyanshujain Mar 11, 2026
9c111f3
feat(tools): add create_schedule, list_schedules, delete_schedule tools
priyanshujain Mar 11, 2026
60454d4
feat(tools): add scheduled tasks section to system prompt
priyanshujain Mar 11, 2026
76f19f1
feat(telegram): register schedule tools and add scheduler migration
priyanshujain Mar 11, 2026
8b32063
docs(skills): add schedule-task skill with reference documentation
priyanshujain Mar 11, 2026
7db9fe0
test(daemon): add scheduler integration test for one-shot polling
priyanshujain Mar 11, 2026
a81cb09
test(channel): add Pusher interface and factory tests
priyanshujain Mar 11, 2026
f5576bb
test: add missing tests for prompt, config DSN, ChatID, and worker
priyanshujain Mar 11, 2026
48e388c
refactor(daemon): add PusherFactory and AgentRunner hooks to worker
priyanshujain Mar 12, 2026
92ee6b0
test(spectest): add scheduler spec tests for create, list, delete, an…
priyanshujain Mar 12, 2026
ede228c
fix(scheduler): disable one-shot on enqueue instead of marking completed
priyanshujain Mar 12, 2026
086e9f3
fix(config): add nil guard in SchedulerDataDSN
priyanshujain Mar 12, 2026
74778cd
fix(jobs): return errors on pusher failure instead of silently succee…
priyanshujain Mar 12, 2026
8184f1b
fix(scheduler): return errors from parseTime and json.Unmarshal in sc…
priyanshujain Mar 12, 2026
91879ce
fix(scheduler): parameterize enabled column for Postgres compatibility
priyanshujain Mar 12, 2026
5660882
fix(scheduler): use parent context in cron callbacks instead of Backg…
priyanshujain Mar 12, 2026
1ac6057
fix(scheduler): return error when deleting non-existent schedule
priyanshujain Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agent/tools/auto_approve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package tools

type AutoApproveInteractor struct{}

var _ Interactor = AutoApproveInteractor{}

func (AutoApproveInteractor) Notify(_ string) error { return nil }
func (AutoApproveInteractor) NotifyLink(_, _ string) error { return nil }
func (AutoApproveInteractor) RequestApproval(_ string) (bool, error) { return true, nil }
22 changes: 22 additions & 0 deletions agent/tools/auto_approve_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package tools

import "testing"

func TestAutoApproveInteractor(t *testing.T) {
var i AutoApproveInteractor

approved, err := i.RequestApproval("dangerous action")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !approved {
t.Fatal("expected approval")
}

if err := i.Notify("msg"); err != nil {
t.Fatalf("notify: %v", err)
}
if err := i.NotifyLink("text", "http://example.com"); err != nil {
t.Fatalf("notify link: %v", err)
}
}
15 changes: 15 additions & 0 deletions agent/tools/prompt.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ Channel references accept: #name, C-ID, or Slack archive URL. User references ac
`)
}

// Scheduled tasks section — only if schedule tools are registered.
if reg.Has("create_schedule") {
b.WriteString(`
## Scheduled Tasks
Use create_schedule, list_schedules, and delete_schedule to manage scheduled tasks.
When scheduling a task:
1. Convert the user's request into a self-contained prompt that a fresh agent can execute without conversation context
2. Determine the user's timezone from their message or stored memories
3. Convert times to UTC cron expressions (5-field) or UTC ISO 8601 datetimes
4. Minimum recurring frequency: 1 hour
For one-shot tasks, use type "one_shot" with scheduled_at in UTC.
For recurring tasks, use type "recurring" with a UTC cron expression.
`)
}

// Skills section.
b.WriteString(`
## Skills
Expand Down
28 changes: 28 additions & 0 deletions agent/tools/prompt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package tools

import (
"strings"
"testing"
)

func TestBuildBaseSystemPrompt_IncludesScheduledTasks(t *testing.T) {
reg := NewRegistry()
reg.Register(NewCreateScheduleTool(ScheduleToolDeps{}))

prompt := BuildBaseSystemPrompt(reg)
if !strings.Contains(prompt, "Scheduled Tasks") {
t.Error("expected 'Scheduled Tasks' section in prompt")
}
if !strings.Contains(prompt, "create_schedule") {
t.Error("expected 'create_schedule' mention in prompt")
}
}

func TestBuildBaseSystemPrompt_OmitsScheduledTasksWhenNotRegistered(t *testing.T) {
reg := NewRegistry()

prompt := BuildBaseSystemPrompt(reg)
if strings.Contains(prompt, "Scheduled Tasks") {
t.Error("prompt should not include 'Scheduled Tasks' without schedule tools")
}
}
302 changes: 302 additions & 0 deletions agent/tools/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
package tools

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/robfig/cron/v3"

"github.com/priyanshujain/openbotkit/config"
"github.com/priyanshujain/openbotkit/source/scheduler"
"github.com/priyanshujain/openbotkit/store"
)

type ScheduleToolDeps struct {
Cfg *config.Config
Channel string
ChannelMeta scheduler.ChannelMeta
}

func (d ScheduleToolDeps) openDB() (*store.DB, error) {
if err := config.EnsureSourceDir("scheduler"); err != nil {
return nil, fmt.Errorf("ensure scheduler dir: %w", err)
}
db, err := store.Open(store.Config{
Driver: d.Cfg.Scheduler.Storage.Driver,
DSN: d.Cfg.SchedulerDataDSN(),
})
if err != nil {
return nil, fmt.Errorf("open scheduler db: %w", err)
}
if err := scheduler.Migrate(db); err != nil {
db.Close()
return nil, fmt.Errorf("migrate scheduler db: %w", err)
}
return db, nil
}

// CreateScheduleTool

type CreateScheduleTool struct {
deps ScheduleToolDeps
}

func NewCreateScheduleTool(deps ScheduleToolDeps) *CreateScheduleTool {
return &CreateScheduleTool{deps: deps}
}

func (t *CreateScheduleTool) Name() string { return "create_schedule" }
func (t *CreateScheduleTool) Description() string {
return "Create a recurring or one-shot scheduled task"
}
func (t *CreateScheduleTool) InputSchema() json.RawMessage {
return json.RawMessage(`{
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["recurring", "one_shot"],
"description": "Schedule type"
},
"cron_expr": {
"type": "string",
"description": "5-field UTC cron expression (for recurring)"
},
"scheduled_at": {
"type": "string",
"description": "UTC ISO 8601 datetime (for one_shot)"
},
"task": {
"type": "string",
"description": "Self-contained prompt for the scheduled agent"
},
"timezone": {
"type": "string",
"description": "User's timezone (e.g. America/New_York)"
},
"description": {
"type": "string",
"description": "Human-readable description of the schedule"
}
},
"required": ["type", "task", "timezone"]
}`)
}

type createScheduleInput struct {
Type string `json:"type"`
CronExpr string `json:"cron_expr"`
ScheduledAt string `json:"scheduled_at"`
Task string `json:"task"`
Timezone string `json:"timezone"`
Description string `json:"description"`
}

func (t *CreateScheduleTool) Execute(_ context.Context, input json.RawMessage) (string, error) {
var in createScheduleInput
if err := json.Unmarshal(input, &in); err != nil {
return "", fmt.Errorf("parse input: %w", err)
}

if in.Task == "" {
return "", fmt.Errorf("task is required")
}

loc, err := time.LoadLocation(in.Timezone)
if err != nil {
return "", fmt.Errorf("invalid timezone %q: %w", in.Timezone, err)
}

s := &scheduler.Schedule{
Type: scheduler.ScheduleType(in.Type),
Task: in.Task,
Channel: t.deps.Channel,
ChannelMeta: t.deps.ChannelMeta,
Timezone: in.Timezone,
Description: in.Description,
}

switch s.Type {
case scheduler.Recurring:
if in.CronExpr == "" {
return "", fmt.Errorf("cron_expr is required for recurring schedules")
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
cronSched, err := parser.Parse(in.CronExpr)
if err != nil {
return "", fmt.Errorf("invalid cron expression: %w", err)
}
now := time.Now().UTC()
first := cronSched.Next(now)
second := cronSched.Next(first)
if second.Sub(first) < time.Hour {
return "", fmt.Errorf("recurring schedules must run at most once per hour")
}
s.CronExpr = in.CronExpr

case scheduler.OneShot:
if in.ScheduledAt == "" {
return "", fmt.Errorf("scheduled_at is required for one-shot schedules")
}
scheduledAt, err := time.Parse(time.RFC3339, in.ScheduledAt)
if err != nil {
return "", fmt.Errorf("invalid scheduled_at (expected RFC3339): %w", err)
}
if scheduledAt.Before(time.Now().UTC()) {
return "", fmt.Errorf("scheduled_at must be in the future")
}
s.ScheduledAt = &scheduledAt

default:
return "", fmt.Errorf("type must be 'recurring' or 'one_shot'")
}

db, err := t.deps.openDB()
if err != nil {
return "", err
}
defer db.Close()

id, err := scheduler.Create(db, s)
if err != nil {
return "", fmt.Errorf("create schedule: %w", err)
}

nextRun := t.formatNextRun(s, loc)
return fmt.Sprintf("Schedule created (ID: %d). Description: %s. Next run: %s (in your timezone).", id, in.Description, nextRun), nil
}

func (t *CreateScheduleTool) formatNextRun(s *scheduler.Schedule, loc *time.Location) string {
if s.Type == scheduler.OneShot && s.ScheduledAt != nil {
return s.ScheduledAt.In(loc).Format("2006-01-02 15:04 MST")
}
if s.CronExpr != "" {
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if sched, err := parser.Parse(s.CronExpr); err == nil {
next := sched.Next(time.Now().UTC())
return next.In(loc).Format("2006-01-02 15:04 MST")
}
}
return "unknown"
}

// ListSchedulesTool

type ListSchedulesTool struct {
deps ScheduleToolDeps
}

func NewListSchedulesTool(deps ScheduleToolDeps) *ListSchedulesTool {
return &ListSchedulesTool{deps: deps}
}

func (t *ListSchedulesTool) Name() string { return "list_schedules" }
func (t *ListSchedulesTool) Description() string { return "List all scheduled tasks" }
func (t *ListSchedulesTool) InputSchema() json.RawMessage {
return json.RawMessage(`{"type": "object", "properties": {}}`)
}

func (t *ListSchedulesTool) Execute(_ context.Context, _ json.RawMessage) (string, error) {
db, err := t.deps.openDB()
if err != nil {
return "", err
}
defer db.Close()

schedules, err := scheduler.List(db)
if err != nil {
return "", fmt.Errorf("list schedules: %w", err)
}

if len(schedules) == 0 {
return "No scheduled tasks found.", nil
}

var b strings.Builder
for _, s := range schedules {
loc, _ := time.LoadLocation(s.Timezone)
if loc == nil {
loc = time.UTC
}

status := "enabled"
if !s.Enabled {
status = "disabled"
}

fmt.Fprintf(&b, "ID: %d | %s | %s | %s\n", s.ID, s.Type, status, s.Description)
if s.Type == scheduler.Recurring {
fmt.Fprintf(&b, " Cron: %s (UTC)\n", s.CronExpr)
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if sched, err := parser.Parse(s.CronExpr); err == nil {
next := sched.Next(time.Now().UTC())
fmt.Fprintf(&b, " Next run: %s\n", next.In(loc).Format("2006-01-02 15:04 MST"))
}
}
if s.Type == scheduler.OneShot && s.ScheduledAt != nil {
fmt.Fprintf(&b, " Scheduled at: %s\n", s.ScheduledAt.In(loc).Format("2006-01-02 15:04 MST"))
}
if s.LastRunAt != nil {
fmt.Fprintf(&b, " Last run: %s\n", s.LastRunAt.In(loc).Format("2006-01-02 15:04 MST"))
}
if s.LastError != "" {
fmt.Fprintf(&b, " Last error: %s\n", s.LastError)
}
}

return b.String(), nil
}

// DeleteScheduleTool

type DeleteScheduleTool struct {
deps ScheduleToolDeps
}

func NewDeleteScheduleTool(deps ScheduleToolDeps) *DeleteScheduleTool {
return &DeleteScheduleTool{deps: deps}
}

func (t *DeleteScheduleTool) Name() string { return "delete_schedule" }
func (t *DeleteScheduleTool) Description() string { return "Delete a scheduled task by ID" }
func (t *DeleteScheduleTool) InputSchema() json.RawMessage {
return json.RawMessage(`{
"type": "object",
"properties": {
"id": {
"type": "integer",
"description": "Schedule ID to delete"
}
},
"required": ["id"]
}`)
}

type deleteScheduleInput struct {
ID int64 `json:"id"`
}

func (t *DeleteScheduleTool) Execute(_ context.Context, input json.RawMessage) (string, error) {
var in deleteScheduleInput
if err := json.Unmarshal(input, &in); err != nil {
return "", fmt.Errorf("parse input: %w", err)
}
if in.ID == 0 {
return "", fmt.Errorf("id is required")
}

db, err := t.deps.openDB()
if err != nil {
return "", err
}
defer db.Close()

if err := scheduler.Delete(db, in.ID); err != nil {
return "", fmt.Errorf("delete schedule: %w", err)
}

return fmt.Sprintf("Schedule %d deleted.", in.ID), nil
}
Loading
Loading