Skip to content

Commit 5aa3a5c

Browse files
Thomas StrombergThomas Stromberg
authored andcommitted
more dupe prevention
1 parent 839979b commit 5aa3a5c

File tree

3 files changed

+436
-33
lines changed

3 files changed

+436
-33
lines changed

pkg/bot/bot.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ type Coordinator struct {
7676

7777
// StateStore interface for persistent state - allows dependency injection for testing.
7878
//
79-
//nolint:interfacebloat // 13 methods needed for complete state management
79+
//nolint:interfacebloat // 15 methods needed for complete state management
8080
type StateStore interface {
8181
Thread(ctx context.Context, owner, repo string, number int, channelID string) (cache.ThreadInfo, bool)
8282
SaveThread(ctx context.Context, owner, repo string, number int, channelID string, info cache.ThreadInfo) error
@@ -86,6 +86,8 @@ type StateStore interface {
8686
SaveDMMessage(ctx context.Context, userID, prURL string, info state.DMInfo) error
8787
ListDMUsers(ctx context.Context, prURL string) []string
8888
QueuePendingDM(ctx context.Context, dm *state.PendingDM) error
89+
PendingDMs(ctx context.Context, before time.Time) ([]state.PendingDM, error)
90+
RemovePendingDM(ctx context.Context, id string) error
8991
WasProcessed(ctx context.Context, eventKey string) bool
9092
MarkProcessed(ctx context.Context, eventKey string, ttl time.Duration) error
9193
LastNotification(ctx context.Context, prURL string) time.Time

pkg/bot/dm.go

Lines changed: 117 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type dmNotificationRequest struct {
3232
// It's idempotent - only sends/updates if state changed for this user.
3333
// Updates to existing DMs happen immediately (no delay).
3434
// New DMs respect reminder_dm_delay (queue for later if user in channel).
35+
//
36+
//nolint:maintidx,revive // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place
3537
func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error {
3638
// Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs
3739
lockKey := req.UserID + ":" + req.PRURL
@@ -42,6 +44,79 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
4244

4345
prState := derivePRState(req.CheckResult)
4446

47+
// Check if there's a queued (not-yet-sent) DM for this user+PR
48+
pendingDMs, err := c.stateStore.PendingDMs(ctx, time.Now().Add(24*time.Hour))
49+
if err != nil {
50+
slog.Warn("failed to check for pending DMs",
51+
"user", req.UserID,
52+
"pr", req.PRURL,
53+
"error", err)
54+
}
55+
56+
// Find any pending DM for this user+PR
57+
var pendingDM *state.PendingDM
58+
for i := range pendingDMs {
59+
if pendingDMs[i].UserID == req.UserID && pendingDMs[i].PRURL == req.PRURL {
60+
pendingDM = &pendingDMs[i]
61+
break
62+
}
63+
}
64+
65+
// If there's a queued DM, check if the user still needs to be notified
66+
if pendingDM != nil {
67+
// Check if user still has action to take (based on turnclient analysis)
68+
userStillBlocked := false
69+
if req.CheckResult != nil && req.CheckResult.Analysis.NextAction != nil {
70+
// Check if this GitHub user is in NextAction
71+
// We need to map Slack user back to GitHub user for this check
72+
// For now, check if ANY action is needed (conservative approach)
73+
userStillBlocked = len(req.CheckResult.Analysis.NextAction) > 0
74+
}
75+
76+
// If user no longer blocked, cancel the queued DM
77+
if !userStillBlocked {
78+
slog.Info("cancelling queued DM - user no longer blocked",
79+
"user", req.UserID,
80+
"pr", req.PRURL,
81+
"old_state", pendingDM.PRState,
82+
"new_state", prState)
83+
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
84+
slog.Warn("failed to remove pending DM",
85+
"user", req.UserID,
86+
"pr", req.PRURL,
87+
"dm_id", pendingDM.ID,
88+
"error", err)
89+
}
90+
return nil
91+
}
92+
93+
// User still blocked - update the queued DM if state changed
94+
if pendingDM.PRState != prState {
95+
slog.Info("updating queued DM with new state",
96+
"user", req.UserID,
97+
"pr", req.PRURL,
98+
"old_state", pendingDM.PRState,
99+
"new_state", prState,
100+
"scheduled_send", pendingDM.SendAfter)
101+
// Remove old queued DM and queue new one with updated state
102+
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
103+
slog.Warn("failed to remove pending DM for update",
104+
"user", req.UserID,
105+
"pr", req.PRURL,
106+
"dm_id", pendingDM.ID,
107+
"error", err)
108+
// Continue anyway - attempt to queue new DM
109+
}
110+
return c.queueDMForUser(ctx, req, prState, pendingDM.SendAfter)
111+
}
112+
// State unchanged, queued DM is still valid
113+
slog.Debug("DM already queued with same state",
114+
"user", req.UserID,
115+
"pr", req.PRURL,
116+
"state", prState)
117+
return nil
118+
}
119+
45120
// Get last notification from datastore
46121
lastNotif, exists := c.stateStore.DMMessage(ctx, req.UserID, req.PRURL)
47122

@@ -137,6 +212,12 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
137212
"pr", req.PRURL,
138213
"error", err)
139214
}
215+
216+
// Cancel any pending queued DMs for this user+PR
217+
// This handles the case where we updated immediately due to state change
218+
// while a delayed DM was still queued
219+
c.cancelPendingDMs(ctx, req.UserID, req.PRURL)
220+
140221
return nil
141222
}
142223
// All updates failed - fall through to send new DM
@@ -183,6 +264,11 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
183264
"error", err)
184265
}
185266

267+
// Cancel any pending queued DMs for this user+PR
268+
// This handles the case where we sent immediately due to state change
269+
// while a delayed DM was still queued
270+
c.cancelPendingDMs(ctx, req.UserID, req.PRURL)
271+
186272
return nil
187273
}
188274

@@ -292,28 +378,41 @@ func (c *Coordinator) queueDMForUser(ctx context.Context, req dmNotificationRequ
292378
}
293379

294380
// Queue to state store - the notify scheduler will process it
295-
if err := c.stateStore.QueuePendingDM(ctx, dm); err != nil {
296-
return err
297-
}
381+
// Don't save to DMMessage yet - the queued DM is the source of truth until it's sent
382+
// This way, if the state changes, we can update the queued DM instead of sending early
383+
return c.stateStore.QueuePendingDM(ctx, dm)
384+
}
298385

299-
// Save DM state immediately (with placeholder) so subsequent updates know about it
300-
// This prevents duplicate DMs when multiple webhook events arrive concurrently
301-
now := time.Now()
302-
if err := c.stateStore.SaveDMMessage(ctx, req.UserID, req.PRURL, state.DMInfo{
303-
SentAt: now,
304-
UpdatedAt: now,
305-
ChannelID: "", // Will be filled in when actually sent
306-
MessageTS: "", // Will be filled in when actually sent
307-
MessageText: "",
308-
LastState: prState,
309-
}); err != nil {
310-
slog.Warn("failed to save DM state after queueing",
311-
"user", req.UserID,
312-
"pr", req.PRURL,
386+
// cancelPendingDMs removes any queued DMs for a user+PR.
387+
// Called after successfully sending or updating a DM to prevent duplicates.
388+
func (c *Coordinator) cancelPendingDMs(ctx context.Context, userID, prURL string) {
389+
// Get all pending DMs (even future ones)
390+
pendingDMs, err := c.stateStore.PendingDMs(ctx, time.Now().Add(24*time.Hour))
391+
if err != nil {
392+
slog.Warn("failed to check for pending DMs to cancel",
393+
"user", userID,
394+
"pr", prURL,
313395
"error", err)
396+
return
314397
}
315398

316-
return nil
399+
// Remove any that match this user+PR
400+
for i := range pendingDMs {
401+
if pendingDMs[i].UserID == userID && pendingDMs[i].PRURL == prURL {
402+
if err := c.stateStore.RemovePendingDM(ctx, pendingDMs[i].ID); err != nil {
403+
slog.Warn("failed to cancel pending DM",
404+
"user", userID,
405+
"pr", prURL,
406+
"dm_id", pendingDMs[i].ID,
407+
"error", err)
408+
} else {
409+
slog.Debug("cancelled pending DM after immediate send/update",
410+
"user", userID,
411+
"pr", prURL,
412+
"dm_id", pendingDMs[i].ID)
413+
}
414+
}
415+
}
317416
}
318417

319418
// generateUUID creates a simple UUID for pending DM tracking.

0 commit comments

Comments
 (0)