Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Coordinator struct {

// StateStore interface for persistent state - allows dependency injection for testing.
//
//nolint:interfacebloat // 13 methods needed for complete state management
//nolint:interfacebloat // 15 methods needed for complete state management
type StateStore interface {
Thread(ctx context.Context, owner, repo string, number int, channelID string) (cache.ThreadInfo, bool)
SaveThread(ctx context.Context, owner, repo string, number int, channelID string, info cache.ThreadInfo) error
Expand All @@ -86,6 +86,8 @@ type StateStore interface {
SaveDMMessage(ctx context.Context, userID, prURL string, info state.DMInfo) error
ListDMUsers(ctx context.Context, prURL string) []string
QueuePendingDM(ctx context.Context, dm *state.PendingDM) error
PendingDMs(ctx context.Context, before time.Time) ([]state.PendingDM, error)
RemovePendingDM(ctx context.Context, id string) error
WasProcessed(ctx context.Context, eventKey string) bool
MarkProcessed(ctx context.Context, eventKey string, ttl time.Duration) error
LastNotification(ctx context.Context, prURL string) time.Time
Expand Down
135 changes: 117 additions & 18 deletions pkg/bot/dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type dmNotificationRequest struct {
// It's idempotent - only sends/updates if state changed for this user.
// Updates to existing DMs happen immediately (no delay).
// New DMs respect reminder_dm_delay (queue for later if user in channel).
//
//nolint:maintidx,revive // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place
func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error {
// Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs
lockKey := req.UserID + ":" + req.PRURL
Expand All @@ -42,6 +44,79 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification

prState := derivePRState(req.CheckResult)

// Check if there's a queued (not-yet-sent) DM for this user+PR
pendingDMs, err := c.stateStore.PendingDMs(ctx, time.Now().Add(24*time.Hour))
if err != nil {
slog.Warn("failed to check for pending DMs",
"user", req.UserID,
"pr", req.PRURL,
"error", err)
}

// Find any pending DM for this user+PR
var pendingDM *state.PendingDM
for i := range pendingDMs {
if pendingDMs[i].UserID == req.UserID && pendingDMs[i].PRURL == req.PRURL {
pendingDM = &pendingDMs[i]
break
}
}

// If there's a queued DM, check if the user still needs to be notified
if pendingDM != nil {
// Check if user still has action to take (based on turnclient analysis)
userStillBlocked := false
if req.CheckResult != nil && req.CheckResult.Analysis.NextAction != nil {
// Check if this GitHub user is in NextAction
// We need to map Slack user back to GitHub user for this check
// For now, check if ANY action is needed (conservative approach)
userStillBlocked = len(req.CheckResult.Analysis.NextAction) > 0
}

// If user no longer blocked, cancel the queued DM
if !userStillBlocked {
slog.Info("cancelling queued DM - user no longer blocked",
"user", req.UserID,
"pr", req.PRURL,
"old_state", pendingDM.PRState,
"new_state", prState)
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
slog.Warn("failed to remove pending DM",
"user", req.UserID,
"pr", req.PRURL,
"dm_id", pendingDM.ID,
"error", err)
}
return nil
}

// User still blocked - update the queued DM if state changed
if pendingDM.PRState != prState {
slog.Info("updating queued DM with new state",
"user", req.UserID,
"pr", req.PRURL,
"old_state", pendingDM.PRState,
"new_state", prState,
"scheduled_send", pendingDM.SendAfter)
// Remove old queued DM and queue new one with updated state
if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil {
slog.Warn("failed to remove pending DM for update",
"user", req.UserID,
"pr", req.PRURL,
"dm_id", pendingDM.ID,
"error", err)
// Continue anyway - attempt to queue new DM
}
return c.queueDMForUser(ctx, req, prState, pendingDM.SendAfter)
}
// State unchanged, queued DM is still valid
slog.Debug("DM already queued with same state",
"user", req.UserID,
"pr", req.PRURL,
"state", prState)
return nil
}

// Get last notification from datastore
lastNotif, exists := c.stateStore.DMMessage(ctx, req.UserID, req.PRURL)

Expand Down Expand Up @@ -137,6 +212,12 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
"pr", req.PRURL,
"error", err)
}

// Cancel any pending queued DMs for this user+PR
// This handles the case where we updated immediately due to state change
// while a delayed DM was still queued
c.cancelPendingDMs(ctx, req.UserID, req.PRURL)

return nil
}
// All updates failed - fall through to send new DM
Expand Down Expand Up @@ -183,6 +264,11 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification
"error", err)
}

// Cancel any pending queued DMs for this user+PR
// This handles the case where we sent immediately due to state change
// while a delayed DM was still queued
c.cancelPendingDMs(ctx, req.UserID, req.PRURL)

return nil
}

Expand Down Expand Up @@ -292,28 +378,41 @@ func (c *Coordinator) queueDMForUser(ctx context.Context, req dmNotificationRequ
}

// Queue to state store - the notify scheduler will process it
if err := c.stateStore.QueuePendingDM(ctx, dm); err != nil {
return err
}
// Don't save to DMMessage yet - the queued DM is the source of truth until it's sent
// This way, if the state changes, we can update the queued DM instead of sending early
return c.stateStore.QueuePendingDM(ctx, dm)
}

// Save DM state immediately (with placeholder) so subsequent updates know about it
// This prevents duplicate DMs when multiple webhook events arrive concurrently
now := time.Now()
if err := c.stateStore.SaveDMMessage(ctx, req.UserID, req.PRURL, state.DMInfo{
SentAt: now,
UpdatedAt: now,
ChannelID: "", // Will be filled in when actually sent
MessageTS: "", // Will be filled in when actually sent
MessageText: "",
LastState: prState,
}); err != nil {
slog.Warn("failed to save DM state after queueing",
"user", req.UserID,
"pr", req.PRURL,
// cancelPendingDMs removes any queued DMs for a user+PR.
// Called after successfully sending or updating a DM to prevent duplicates.
func (c *Coordinator) cancelPendingDMs(ctx context.Context, userID, prURL string) {
// Get all pending DMs (even future ones)
pendingDMs, err := c.stateStore.PendingDMs(ctx, time.Now().Add(24*time.Hour))
if err != nil {
slog.Warn("failed to check for pending DMs to cancel",
"user", userID,
"pr", prURL,
"error", err)
return
}

return nil
// Remove any that match this user+PR
for i := range pendingDMs {
if pendingDMs[i].UserID == userID && pendingDMs[i].PRURL == prURL {
if err := c.stateStore.RemovePendingDM(ctx, pendingDMs[i].ID); err != nil {
slog.Warn("failed to cancel pending DM",
"user", userID,
"pr", prURL,
"dm_id", pendingDMs[i].ID,
"error", err)
} else {
slog.Debug("cancelled pending DM after immediate send/update",
"user", userID,
"pr", prURL,
"dm_id", pendingDMs[i].ID)
}
}
}
}

// generateUUID creates a simple UUID for pending DM tracking.
Expand Down
Loading
Loading