Skip to content

Commit 31beddc

Browse files
committed
feat(notifications): add suppress-after-action notification-uri config option
1 parent 57b6276 commit 31beddc

File tree

5 files changed

+143
-39
lines changed

5 files changed

+143
-39
lines changed

cmd/internal/wrtagflag/wrtagflag.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,15 +161,28 @@ func (r researchLinkParser) String() string {
161161
type notificationsParser struct{ *notifications.Notifications }
162162

163163
func (n *notificationsParser) Set(value string) error {
164-
eventsRaw, uri, ok := strings.Cut(value, " ")
165-
if !ok {
166-
return errors.New("invalid notification uri format. expected eg \"ev1,ev2 uri\"")
164+
parts := strings.Fields(value)
165+
if len(parts) < 2 {
166+
return errors.New("invalid notification uri format. expected eg \"ev1,ev2 uri [suppress-after-action 30s]\"")
167+
}
168+
eventsRaw, uri := parts[0], parts[1]
169+
170+
var suppressAfterAction time.Duration
171+
if len(parts) >= 4 && parts[2] == "suppress-after-action" {
172+
var err error
173+
suppressAfterAction, err = time.ParseDuration(parts[3])
174+
if err != nil {
175+
return fmt.Errorf("parse suppress duration: %w", err)
176+
}
167177
}
178+
168179
var lineErrs []error
169180
for ev := range strings.SplitSeq(eventsRaw, ",") {
170-
ev, uri = strings.TrimSpace(ev), strings.TrimSpace(uri)
171-
err := n.AddURI(ev, uri)
172-
lineErrs = append(lineErrs, err)
181+
ev = strings.TrimSpace(ev)
182+
if err := n.AddDestination(ev, uri, suppressAfterAction); err != nil {
183+
lineErrs = append(lineErrs, err)
184+
continue
185+
}
173186
}
174187
return errors.Join(lineErrs...)
175188
}
@@ -178,10 +191,14 @@ func (n notificationsParser) String() string {
178191
return ""
179192
}
180193
var parts []string
181-
n.Notifications.IterMappings(func(e string, uri string) {
182-
url, _ := url.Parse(uri)
183-
parts = append(parts, fmt.Sprintf("%s: %s://%s/...", e, url.Scheme, url.Host))
184-
})
194+
for ev, dest := range n.Notifications.Destinations() {
195+
uri, _ := url.Parse(dest.URI)
196+
part := fmt.Sprintf("%s: %s://%s/...", ev, uri.Scheme, uri.Host)
197+
if dest.SuppressAfterAction > 0 {
198+
part += fmt.Sprintf(" (suppress after action %s)", dest.SuppressAfterAction)
199+
}
200+
parts = append(parts, part)
201+
}
185202
return strings.Join(parts, ", ")
186203
}
187204

cmd/wrtag/main.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go.senan.xyz/wrtag/cmd/internal/wrtagflag"
2424
"go.senan.xyz/wrtag/cmd/internal/wrtaglog"
2525
"go.senan.xyz/wrtag/fileutil"
26+
"go.senan.xyz/wrtag/notifications"
2627
"go.senan.xyz/wrtag/researchlink"
2728
)
2829

@@ -49,7 +50,7 @@ func main() {
4950
wrtagflag.DefaultClient()
5051
var (
5152
cfg = wrtagflag.Config()
52-
notifications = wrtagflag.Notifications()
53+
notifs = wrtagflag.Notifications()
5354
researchLinkQuerier = wrtagflag.ResearchLinks()
5455
)
5556
wrtagflag.Parse()
@@ -74,6 +75,8 @@ func main() {
7475
)
7576
flag.Parse(args)
7677

78+
ctx := notifications.RecordAction(context.Background())
79+
7780
var importCondition wrtag.ImportCondition
7881
if *yes {
7982
importCondition = wrtag.Always
@@ -91,7 +94,7 @@ func main() {
9194
return
9295
}
9396

94-
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
97+
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
9598
defer cancel()
9699

97100
op, err := wrtagflag.OperationByName(command, *dryRun)
@@ -115,6 +118,8 @@ func main() {
115118
)
116119
flag.Parse(args)
117120

121+
ctx := notifications.RecordAction(context.Background())
122+
118123
// walk the whole root dir by default, or some user provided dirs if provided
119124
var dirs []string
120125
if args := flag.Args(); len(args) > 0 {
@@ -132,7 +137,7 @@ func main() {
132137
}
133138
}
134139

135-
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
140+
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
136141
defer cancel()
137142

138143
start := time.Now()
@@ -148,10 +153,10 @@ func main() {
148153
switch {
149154
case stats.errors.Load() > 0:
150155
slog.Error("sync finished", "took", took, "", &stats)
151-
notifications.Sendf(ctx, notifSyncError, "sync finished in %v %v", took, &stats)
156+
notifs.Sendf(ctx, notifSyncError, "sync finished in %v %v", took, &stats)
152157
default:
153158
slog.Info("sync finished", "took", took, "", &stats)
154-
notifications.Sendf(ctx, notifSyncComplete, "sync finished in %v %v", took, &stats)
159+
notifs.Sendf(ctx, notifSyncComplete, "sync finished in %v %v", took, &stats)
155160
}
156161

157162
default:

cmd/wrtagweb/main.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.senan.xyz/wrtag"
2929
wrtagflag "go.senan.xyz/wrtag/cmd/internal/wrtagflag"
3030
"go.senan.xyz/wrtag/cmd/internal/wrtaglog"
31+
"go.senan.xyz/wrtag/notifications"
3132
"go.senan.xyz/wrtag/researchlink"
3233

3334
_ "github.com/ncruces/go-sqlite3/driver"
@@ -58,7 +59,7 @@ func main() {
5859
wrtagflag.DefaultClient()
5960
var (
6061
cfg = wrtagflag.Config()
61-
notifications = wrtagflag.Notifications()
62+
notifs = wrtagflag.Notifications()
6263
researchLinkQuerier = wrtagflag.ResearchLinks()
6364
apiKey = flag.String("web-api-key", "", "API key for web interface")
6465
listenAddr = flag.String("web-listen-addr", ":7373", "Listen address for web interface (optional)")
@@ -185,11 +186,20 @@ func main() {
185186
return err
186187
}
187188

188-
switch job.Status {
189-
case StatusComplete:
190-
go notifications.Send(context.WithoutCancel(ctx), notifComplete, jobNotificationMessage(*publicURL, job))
191-
case StatusNeedsInput:
192-
go notifications.Send(context.WithoutCancel(ctx), notifNeedsInput, jobNotificationMessage(*publicURL, job))
189+
{
190+
ctx := context.WithoutCancel(ctx)
191+
192+
// If we have no action time in the ctx, use the job's updated at
193+
if job.UpdatedTime.Valid && notifications.ActionTime(ctx).IsZero() {
194+
ctx = notifications.RecordActionTime(ctx, job.UpdatedTime.Time)
195+
}
196+
197+
switch job.Status {
198+
case StatusComplete:
199+
go notifs.Send(ctx, notifComplete, jobNotificationMessage(*publicURL, job))
200+
case StatusNeedsInput:
201+
go notifs.Send(ctx, notifNeedsInput, jobNotificationMessage(*publicURL, job))
202+
}
193203
}
194204

195205
return nil
@@ -266,7 +276,9 @@ func main() {
266276
w.WriteHeader(http.StatusOK)
267277
rc.Flush()
268278

269-
for id := range sse.receive(r.Context(), 0) {
279+
ctx := r.Context()
280+
281+
for id := range sse.receive(ctx, 0) {
270282
fmt.Fprintf(w, "data: %d\n\n", id)
271283
rc.Flush()
272284
}
@@ -276,7 +288,10 @@ func main() {
276288
search := r.URL.Query().Get("search")
277289
filter := JobStatus(r.URL.Query().Get("filter"))
278290
page, _ := strconv.Atoi(r.URL.Query().Get("page"))
279-
jl, err := listJobs(r.Context(), filter, search, page)
291+
292+
ctx := r.Context()
293+
294+
jl, err := listJobs(ctx, filter, search, page)
280295
if err != nil {
281296
respErrf(w, http.StatusInternalServerError, "error listing jobs: %v", err)
282297
return
@@ -301,8 +316,10 @@ func main() {
301316
}
302317
path = filepath.Clean(path)
303318

319+
ctx := r.Context()
320+
304321
var job Job
305-
if err := sqlb.ScanRow(r.Context(), db, &job, "insert into jobs (source_path, operation, time) values (?, ?, ?) returning *", path, operationStr, time.Now()); err != nil {
322+
if err := sqlb.ScanRow(ctx, db, &job, "insert into jobs (source_path, operation, time) values (?, ?, ?) returning *", path, operationStr, time.Now()); err != nil {
306323
http.Error(w, fmt.Sprintf("error saving job: %v", err), http.StatusInternalServerError)
307324
return
308325
}
@@ -314,8 +331,11 @@ func main() {
314331

315332
mux.HandleFunc("GET /jobs/{id}", func(w http.ResponseWriter, r *http.Request) {
316333
id, _ := strconv.Atoi(r.PathValue("id"))
334+
335+
ctx := r.Context()
336+
317337
var job Job
318-
if err := sqlb.ScanRow(r.Context(), db, &job, "select * from jobs where id=?", id); err != nil {
338+
if err := sqlb.ScanRow(ctx, db, &job, "select * from jobs where id=?", id); err != nil {
319339
respErrf(w, http.StatusInternalServerError, "error getting job")
320340
return
321341
}
@@ -332,8 +352,10 @@ func main() {
332352
useMBID = filepath.Base(useMBID) // accept release URL
333353
}
334354

355+
ctx := r.Context()
356+
335357
var job Job
336-
if err := sqlb.ScanRow(r.Context(), db, &job, "update jobs set confirm=?, use_mbid=?, status=?, updated_time=? where id=? and status<>? returning *", confirm, useMBID, StatusEnqueued, time.Now(), id, StatusInProgress); err != nil {
358+
if err := sqlb.ScanRow(ctx, db, &job, "update jobs set confirm=?, use_mbid=?, status=?, updated_time=? where id=? and status<>? returning *", confirm, useMBID, StatusEnqueued, time.Now(), id, StatusInProgress); err != nil {
337359
respErrf(w, http.StatusInternalServerError, "error getting job")
338360
return
339361
}
@@ -345,7 +367,10 @@ func main() {
345367

346368
mux.HandleFunc("DELETE /jobs/{id}", func(w http.ResponseWriter, r *http.Request) {
347369
id, _ := strconv.Atoi(r.PathValue("id"))
348-
if err := sqlb.Exec(r.Context(), db, "delete from jobs where id=? and status<>?", id, StatusInProgress); err != nil {
370+
371+
ctx := r.Context()
372+
373+
if err := sqlb.Exec(ctx, db, "delete from jobs where id=? and status<>?", id, StatusInProgress); err != nil {
349374
respErrf(w, http.StatusInternalServerError, "error getting job")
350375
return
351376
}
@@ -384,7 +409,9 @@ func main() {
384409
})
385410

386411
mux.HandleFunc("/{$}", func(w http.ResponseWriter, r *http.Request) {
387-
jl, err := listJobs(r.Context(), "", "", 0)
412+
ctx := r.Context()
413+
414+
jl, err := listJobs(ctx, "", "", 0)
388415
if err != nil {
389416
respErrf(w, http.StatusInternalServerError, "error listing jobs: %v", err)
390417
return
@@ -417,7 +444,9 @@ func main() {
417444
}
418445
path = filepath.Clean(path)
419446

420-
if err := sqlb.Exec(r.Context(), db, "insert into jobs (source_path, operation, time) values (?, ?, ?)", path, operationStr, time.Now()); err != nil {
447+
ctx := r.Context()
448+
449+
if err := sqlb.Exec(ctx, db, "insert into jobs (source_path, operation, time) values (?, ?, ?)", path, operationStr, time.Now()); err != nil {
421450
http.Error(w, fmt.Sprintf("error saving job: %v", err), http.StatusInternalServerError)
422451
return
423452
}

config.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@
3636
# add notification uri that are triggered by various events
3737
# see all available services here https://containrrr.dev/shoutrrr/v0.8/services/generic/
3838
# possible events are complete, needs-input, sync-complete, sync-error
39+
# optional "suppress-after-action" prevents notifications for specified duration after manual actions
3940

40-
#notification-uri complete,needs-input,sync-error smtp://username:password@host:port/?from=from@example.com&to=recipient@example.com
41+
#notification-uri complete,needs-input,sync-error smtp://username:password@host:port/?from=from@example.com&to=recipient@example.com suppress-after-action 30s
4142
#notification-uri complete,sync-complete generic+https://my.subsonic.com/rest/startScan.view?c=wrtag&v=1.16&u=user&p=password
4243

4344
# set a list of files to keep when moving or copying

notifications/notifications.go

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"iter"
78
"log/slog"
9+
"maps"
810
"net/url"
11+
"time"
912

1013
"github.com/containrrr/shoutrrr"
1114
shoutrrrtypes "github.com/containrrr/shoutrrr/pkg/types"
@@ -15,36 +18,66 @@ var (
1518
ErrInvalidURI = errors.New("invalid URI")
1619
)
1720

21+
type Destination struct {
22+
URI string
23+
SuppressAfterAction time.Duration // suppress this long after manual actions
24+
}
25+
1826
type Notifications struct {
19-
mappings map[string][]string
27+
mappings map[string][]Destination
2028
}
2129

22-
func (n *Notifications) AddURI(event string, uri string) error {
30+
func (n *Notifications) AddDestination(event string, uri string, suppressAfterAction time.Duration) error {
2331
if n.mappings == nil {
24-
n.mappings = map[string][]string{}
32+
n.mappings = map[string][]Destination{}
2533
}
2634
if _, err := url.Parse(uri); err != nil {
2735
return fmt.Errorf("parse uri: %w", err)
2836
}
29-
n.mappings[event] = append(n.mappings[event], uri)
37+
n.mappings[event] = append(n.mappings[event], Destination{URI: uri, SuppressAfterAction: suppressAfterAction})
3038
return nil
3139
}
3240

33-
func (n *Notifications) IterMappings(f func(string, string)) {
34-
for event, uris := range n.mappings {
35-
for _, uri := range uris {
36-
f(event, uri)
41+
func (n *Notifications) Destinations() iter.Seq2[string, Destination] {
42+
mappings := maps.Clone(n.mappings)
43+
44+
return func(yield func(string, Destination) bool) {
45+
for event, destinations := range mappings {
46+
for _, destination := range destinations {
47+
if !yield(event, destination) {
48+
return
49+
}
50+
}
3751
}
3852
}
3953
}
54+
4055
func (n *Notifications) Sendf(ctx context.Context, event string, f string, a ...any) {
4156
n.Send(ctx, event, fmt.Sprintf(f, a...))
4257
}
4358

4459
// Send a simple string for now, maybe later message could instead be a type which
4560
// implements a notifications.Bodyer or something so that notifiers can send rich notifications.
4661
func (n *Notifications) Send(ctx context.Context, event string, message string) {
47-
uris := n.mappings[event]
62+
destinations := n.mappings[event]
63+
if len(destinations) == 0 {
64+
return
65+
}
66+
67+
var timeSinceAction time.Duration
68+
if actionTime, ok := ctx.Value(actionKey{}).(time.Time); ok {
69+
timeSinceAction = time.Since(actionTime)
70+
}
71+
72+
var uris []string
73+
for _, dest := range destinations {
74+
if timeSinceAction == 0 || timeSinceAction >= dest.SuppressAfterAction {
75+
uris = append(uris, dest.URI)
76+
} else {
77+
slog.DebugContext(ctx, "suppressing notification due to recent manual action",
78+
"event", event, "suppress_after_action", dest.SuppressAfterAction, "since_action", timeSinceAction)
79+
}
80+
}
4881
if len(uris) == 0 {
4982
return
5083
}
@@ -63,3 +96,22 @@ func (n *Notifications) Send(ctx context.Context, event string, message string)
6396
return
6497
}
6598
}
99+
100+
type actionKey struct{}
101+
102+
// RecordAction records the current time of a user action and returns a context which may
103+
// be used to suppres notifications later.
104+
func RecordAction(ctx context.Context) context.Context {
105+
return RecordActionTime(ctx, time.Now())
106+
}
107+
108+
// RecordActionTime records a specific time as a user action and returns a context which may
109+
// be used to suppress notifications later.
110+
func RecordActionTime(ctx context.Context, actionTime time.Time) context.Context {
111+
return context.WithValue(ctx, actionKey{}, actionTime)
112+
}
113+
114+
func ActionTime(ctx context.Context) time.Time {
115+
t, _ := ctx.Value(actionKey{}).(time.Time)
116+
return t
117+
}

0 commit comments

Comments
 (0)