Skip to content

Commit 23ab5dc

Browse files
Merge pull request #133 from 73ai/backup-failures
fix: unified scheduler with sleep/wake catch-up and backup failure alerts
2 parents ab76b06 + efca4eb commit 23ab5dc

File tree

7 files changed

+632
-136
lines changed

7 files changed

+632
-136
lines changed

daemon/jobs/backup.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55
"fmt"
66
"log/slog"
77
"net/http"
8+
"time"
89

910
"github.com/riverqueue/river"
1011

12+
"github.com/73ai/openbotkit/channel"
1113
"github.com/73ai/openbotkit/config"
1214
"github.com/73ai/openbotkit/oauth/google"
1315
"github.com/73ai/openbotkit/provider"
@@ -34,16 +36,33 @@ func (w *BackupWorker) Work(ctx context.Context, job *river.Job[BackupArgs]) err
3436
return nil
3537
}
3638

39+
schedule, err := time.ParseDuration(w.Cfg.Backup.Schedule)
40+
if err == nil && schedule > 0 {
41+
manifest, mErr := backupsvc.LoadManifest(config.BackupLastManifestPath())
42+
if mErr == nil && manifest.ID != "" && time.Since(manifest.Timestamp) < schedule {
43+
slog.Info("backup: last backup is recent, skipping",
44+
"age", time.Since(manifest.Timestamp).Round(time.Minute),
45+
"schedule", schedule)
46+
return nil
47+
}
48+
}
49+
3750
slog.Info("starting backup job")
3851

3952
backend, err := backupsvc.ResolveBackend(ctx, backendOpts(w.Cfg))
4053
if err != nil {
54+
if job.Attempt >= job.MaxAttempts {
55+
w.notifyFailure(ctx, err)
56+
}
4157
return fmt.Errorf("resolve backend: %w", err)
4258
}
4359

4460
svc := backupsvc.New(backend, config.Dir())
4561
result, err := svc.Run(ctx)
4662
if err != nil {
63+
if job.Attempt >= job.MaxAttempts {
64+
w.notifyFailure(ctx, err)
65+
}
4766
return fmt.Errorf("backup: %w", err)
4867
}
4968

@@ -56,6 +75,25 @@ func (w *BackupWorker) Work(ctx context.Context, job *river.Job[BackupArgs]) err
5675
return nil
5776
}
5877

78+
func (w *BackupWorker) notifyFailure(ctx context.Context, backupErr error) {
79+
if w.Cfg.Channels == nil {
80+
return
81+
}
82+
tg := w.Cfg.Channels.Telegram
83+
if tg == nil || tg.BotToken == "" {
84+
return
85+
}
86+
pusher, err := channel.NewTelegramPusher(tg.BotToken, tg.OwnerID)
87+
if err != nil {
88+
slog.Error("backup: create telegram pusher", "error", err)
89+
return
90+
}
91+
msg := fmt.Sprintf("Backup failed: %s", backupErr)
92+
if err := pusher.Push(ctx, msg); err != nil {
93+
slog.Error("backup: send failure alert", "error", err)
94+
}
95+
}
96+
5997
func backendOpts(cfg *config.Config) backupsvc.ResolveBackendOpts {
6098
opts := backupsvc.ResolveBackendOpts{
6199
ResolveCred: provider.ResolveAPIKey,

daemon/jobs/backup_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package jobs
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/73ai/openbotkit/config"
9+
)
10+
11+
func TestBackupArgs_Kind(t *testing.T) {
12+
args := BackupArgs{}
13+
if args.Kind() != "backup" {
14+
t.Errorf("Kind() = %q, want %q", args.Kind(), "backup")
15+
}
16+
}
17+
18+
func TestBackupWorker_notifyFailure_NoTelegram(t *testing.T) {
19+
w := &BackupWorker{Cfg: &config.Config{}}
20+
// Should not panic when Channels is nil.
21+
w.notifyFailure(context.Background(), errors.New("test error"))
22+
}
23+
24+
func TestBackupWorker_notifyFailure_EmptyBotToken(t *testing.T) {
25+
w := &BackupWorker{
26+
Cfg: &config.Config{
27+
Channels: &config.ChannelsConfig{
28+
Telegram: &config.TelegramConfig{BotToken: "", OwnerID: 0},
29+
},
30+
},
31+
}
32+
// Should return early when BotToken is empty.
33+
w.notifyFailure(context.Background(), errors.New("test error"))
34+
}

daemon/river.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,6 @@ func newRiverClient(ctx context.Context, cfg *config.Config, notifier *SyncNotif
5959
),
6060
}
6161

62-
if cfg.Backup != nil && cfg.Backup.Enabled && cfg.Backup.Schedule != "" {
63-
backupPeriod, err := time.ParseDuration(cfg.Backup.Schedule)
64-
if err == nil {
65-
periodicJobs = append(periodicJobs, river.NewPeriodicJob(
66-
river.PeriodicInterval(backupPeriod),
67-
func() (river.JobArgs, *river.InsertOpts) {
68-
return jobs.BackupArgs{}, nil
69-
},
70-
&river.PeriodicJobOpts{RunOnStart: false},
71-
))
72-
}
73-
}
74-
7562
riverCfg := &river.Config{
7663
Queues: map[string]river.QueueConfig{
7764
river.QueueDefault: {MaxWorkers: 5},

0 commit comments

Comments
 (0)