diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 910c864..992c222 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -76,7 +76,7 @@ type Coordinator struct { // StateStore interface for persistent state - allows dependency injection for testing. // -//nolint:interfacebloat // 13 methods needed for complete state management +//nolint:interfacebloat // 15 methods needed for complete state management type StateStore interface { Thread(ctx context.Context, owner, repo string, number int, channelID string) (cache.ThreadInfo, bool) SaveThread(ctx context.Context, owner, repo string, number int, channelID string, info cache.ThreadInfo) error @@ -86,6 +86,8 @@ type StateStore interface { SaveDMMessage(ctx context.Context, userID, prURL string, info state.DMInfo) error ListDMUsers(ctx context.Context, prURL string) []string QueuePendingDM(ctx context.Context, dm *state.PendingDM) error + PendingDMs(ctx context.Context, before time.Time) ([]state.PendingDM, error) + RemovePendingDM(ctx context.Context, id string) error WasProcessed(ctx context.Context, eventKey string) bool MarkProcessed(ctx context.Context, eventKey string, ttl time.Duration) error LastNotification(ctx context.Context, prURL string) time.Time diff --git a/pkg/bot/dm.go b/pkg/bot/dm.go index 5791638..dafab34 100644 --- a/pkg/bot/dm.go +++ b/pkg/bot/dm.go @@ -32,6 +32,8 @@ type dmNotificationRequest struct { // It's idempotent - only sends/updates if state changed for this user. // Updates to existing DMs happen immediately (no delay). // New DMs respect reminder_dm_delay (queue for later if user in channel). +// +//nolint:maintidx,revive // This function coordinates all DM scenarios (queued/sent, update/create, delay logic) and benefits from being in one place func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error { // Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs lockKey := req.UserID + ":" + req.PRURL @@ -42,6 +44,79 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification prState := derivePRState(req.CheckResult) + // Check if there's a queued (not-yet-sent) DM for this user+PR + pendingDMs, err := c.stateStore.PendingDMs(ctx, time.Now().Add(24*time.Hour)) + if err != nil { + slog.Warn("failed to check for pending DMs", + "user", req.UserID, + "pr", req.PRURL, + "error", err) + } + + // Find any pending DM for this user+PR + var pendingDM *state.PendingDM + for i := range pendingDMs { + if pendingDMs[i].UserID == req.UserID && pendingDMs[i].PRURL == req.PRURL { + pendingDM = &pendingDMs[i] + break + } + } + + // If there's a queued DM, check if the user still needs to be notified + if pendingDM != nil { + // Check if user still has action to take (based on turnclient analysis) + userStillBlocked := false + if req.CheckResult != nil && req.CheckResult.Analysis.NextAction != nil { + // Check if this GitHub user is in NextAction + // We need to map Slack user back to GitHub user for this check + // For now, check if ANY action is needed (conservative approach) + userStillBlocked = len(req.CheckResult.Analysis.NextAction) > 0 + } + + // If user no longer blocked, cancel the queued DM + if !userStillBlocked { + slog.Info("cancelling queued DM - user no longer blocked", + "user", req.UserID, + "pr", req.PRURL, + "old_state", pendingDM.PRState, + "new_state", prState) + if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil { + slog.Warn("failed to remove pending DM", + "user", req.UserID, + "pr", req.PRURL, + "dm_id", pendingDM.ID, + "error", err) + } + return nil + } + + // User still blocked - update the queued DM if state changed + if pendingDM.PRState != prState { + slog.Info("updating queued DM with new state", + "user", req.UserID, + "pr", req.PRURL, + "old_state", pendingDM.PRState, + "new_state", prState, + "scheduled_send", pendingDM.SendAfter) + // Remove old queued DM and queue new one with updated state + if err := c.stateStore.RemovePendingDM(ctx, pendingDM.ID); err != nil { + slog.Warn("failed to remove pending DM for update", + "user", req.UserID, + "pr", req.PRURL, + "dm_id", pendingDM.ID, + "error", err) + // Continue anyway - attempt to queue new DM + } + return c.queueDMForUser(ctx, req, prState, pendingDM.SendAfter) + } + // State unchanged, queued DM is still valid + slog.Debug("DM already queued with same state", + "user", req.UserID, + "pr", req.PRURL, + "state", prState) + return nil + } + // Get last notification from datastore lastNotif, exists := c.stateStore.DMMessage(ctx, req.UserID, req.PRURL) @@ -137,6 +212,12 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification "pr", req.PRURL, "error", err) } + + // Cancel any pending queued DMs for this user+PR + // This handles the case where we updated immediately due to state change + // while a delayed DM was still queued + c.cancelPendingDMs(ctx, req.UserID, req.PRURL) + return nil } // All updates failed - fall through to send new DM @@ -183,6 +264,11 @@ func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotification "error", err) } + // Cancel any pending queued DMs for this user+PR + // This handles the case where we sent immediately due to state change + // while a delayed DM was still queued + c.cancelPendingDMs(ctx, req.UserID, req.PRURL) + return nil } @@ -292,28 +378,41 @@ func (c *Coordinator) queueDMForUser(ctx context.Context, req dmNotificationRequ } // Queue to state store - the notify scheduler will process it - if err := c.stateStore.QueuePendingDM(ctx, dm); err != nil { - return err - } + // Don't save to DMMessage yet - the queued DM is the source of truth until it's sent + // This way, if the state changes, we can update the queued DM instead of sending early + return c.stateStore.QueuePendingDM(ctx, dm) +} - // Save DM state immediately (with placeholder) so subsequent updates know about it - // This prevents duplicate DMs when multiple webhook events arrive concurrently - now := time.Now() - if err := c.stateStore.SaveDMMessage(ctx, req.UserID, req.PRURL, state.DMInfo{ - SentAt: now, - UpdatedAt: now, - ChannelID: "", // Will be filled in when actually sent - MessageTS: "", // Will be filled in when actually sent - MessageText: "", - LastState: prState, - }); err != nil { - slog.Warn("failed to save DM state after queueing", - "user", req.UserID, - "pr", req.PRURL, +// cancelPendingDMs removes any queued DMs for a user+PR. +// Called after successfully sending or updating a DM to prevent duplicates. +func (c *Coordinator) cancelPendingDMs(ctx context.Context, userID, prURL string) { + // Get all pending DMs (even future ones) + pendingDMs, err := c.stateStore.PendingDMs(ctx, time.Now().Add(24*time.Hour)) + if err != nil { + slog.Warn("failed to check for pending DMs to cancel", + "user", userID, + "pr", prURL, "error", err) + return } - return nil + // Remove any that match this user+PR + for i := range pendingDMs { + if pendingDMs[i].UserID == userID && pendingDMs[i].PRURL == prURL { + if err := c.stateStore.RemovePendingDM(ctx, pendingDMs[i].ID); err != nil { + slog.Warn("failed to cancel pending DM", + "user", userID, + "pr", prURL, + "dm_id", pendingDMs[i].ID, + "error", err) + } else { + slog.Debug("cancelled pending DM after immediate send/update", + "user", userID, + "pr", prURL, + "dm_id", pendingDMs[i].ID) + } + } + } } // generateUUID creates a simple UUID for pending DM tracking. diff --git a/pkg/bot/dm_simplified_test.go b/pkg/bot/dm_simplified_test.go index 91c5dc4..f1d84c6 100644 --- a/pkg/bot/dm_simplified_test.go +++ b/pkg/bot/dm_simplified_test.go @@ -805,7 +805,7 @@ func TestSendPRNotification_QueuedThenUpdated(t *testing.T) { } // Second call immediately after: PR state changes to awaiting_review - // This simulates a legitimate state change that should be processed + // This should UPDATE the queued DM, not send immediately! checkResult2 := newCheckResponse("awaiting_review") req2 := req1 // Copy req2.CheckResult = checkResult2 @@ -816,21 +816,21 @@ func TestSendPRNotification_QueuedThenUpdated(t *testing.T) { t.Fatalf("Second call failed: %v", err) } - // The state CHANGED, so this should send a new DM (queued DM isn't sent yet) - // This is correct behavior - legitimate state changes should always be processed - if len(slack.sentDirectMessages) != 1 { - t.Errorf("Expected 1 DM sent for legitimate state change, got %d DMs", len(slack.sentDirectMessages)) + // CRITICAL: No immediate DM should be sent - the queued DM should be updated instead + if len(slack.sentDirectMessages) != 0 { + t.Errorf("Expected no immediate DM (should update queued DM instead), got %d DMs", len(slack.sentDirectMessages)) } - // Verify the sent DM has the new state - if len(slack.sentDirectMessages) > 0 { - // Check that state was updated to awaiting_review - savedInfo, exists := store.DMMessage(context.Background(), "U123", "https://github.com/owner/repo/pull/1") - if !exists { - t.Error("Expected DM state to be saved") - } else if savedInfo.LastState != "awaiting_review" { - t.Errorf("Expected LastState 'awaiting_review', got '%s'", savedInfo.LastState) - } + // Verify the queued DM still exists and was updated with new state + pendingDMs, err := store.PendingDMs(context.Background(), time.Now().Add(1*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + if len(pendingDMs) != 1 { + t.Fatalf("Expected 1 queued DM after state change, got %d", len(pendingDMs)) + } + if pendingDMs[0].PRState != "awaiting_review" { + t.Errorf("Expected queued DM to have state 'awaiting_review', got '%s'", pendingDMs[0].PRState) } } @@ -946,3 +946,305 @@ func TestSendPRNotification_RapidStateChanges(t *testing.T) { t.Errorf("Expected final state 'awaiting_review', got '%s'", savedInfo.LastState) } } + +// TestSendPRNotification_QueuedDMCancelledWhenUserNoLongerBlocked tests that a queued DM +// is cancelled if the PR state changes such that the user no longer has actions to take. +func TestSendPRNotification_QueuedDMCancelledWhenUserNoLongerBlocked(t *testing.T) { + store := &mockStateStore{} + slack := &mockSlackClient{ + isUserInChannelFunc: func(ctx context.Context, channelID, userID string) bool { + return true // User is in channel, so DM should be queued + }, + } + config := &mockConfigManager{ + dmDelay: 30, // 30 minute delay + } + + c := &Coordinator{ + stateStore: store, + slack: slack, + configManager: config, + userMapper: &mockUserMapper{}, + } + + // First call: Queue a DM for the user (user needs to review) + checkResult1 := newCheckResponse("awaiting_review") + req1 := dmNotificationRequest{ + CheckResult: checkResult1, + UserID: "U123", + ChannelID: "C123", + ChannelName: "general", + Owner: "owner", + Repo: "repo", + PRNumber: 1, + PRTitle: "Test PR", + PRAuthor: "author", + PRURL: "https://github.com/owner/repo/pull/1", + } + + err := c.sendPRNotification(context.Background(), req1) + if err != nil { + t.Fatalf("First call failed: %v", err) + } + + // Verify DM was queued + pendingDMs, err := store.PendingDMs(context.Background(), time.Now().Add(1*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + if len(pendingDMs) != 1 { + t.Fatalf("Expected 1 queued DM, got %d", len(pendingDMs)) + } + + // Second call: PR state changes such that user no longer has actions (no NextAction) + // This simulates: user reviewed, PR now needs author to fix tests + checkResult2 := newCheckResponse("tests_broken") + checkResult2.Analysis.NextAction = nil // No actions needed (or empty map) + req2 := req1 + req2.CheckResult = checkResult2 + + err = c.sendPRNotification(context.Background(), req2) + if err != nil { + t.Fatalf("Second call failed: %v", err) + } + + // CRITICAL: The queued DM should be CANCELLED (not sent early, not updated) + pendingDMs, err = store.PendingDMs(context.Background(), time.Now().Add(1*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + if len(pendingDMs) != 0 { + t.Errorf("Expected queued DM to be cancelled (0 pending), got %d pending DMs", len(pendingDMs)) + } + + // No immediate DM should be sent + if len(slack.sentDirectMessages) != 0 { + t.Errorf("Expected no immediate DM when cancelling queued DM, got %d DMs", len(slack.sentDirectMessages)) + } +} + +// TestSendPRNotification_QueuedDMUpdatedWhenUserLeavesChannel tests that a queued DM +// is properly updated when the user leaves the channel (even though immediate send would now apply). +func TestSendPRNotification_QueuedDMUpdatedWhenUserLeavesChannel(t *testing.T) { + userInChannel := true // Track whether user is in channel + store := &mockStateStore{} + slack := &mockSlackClient{ + isUserInChannelFunc: func(ctx context.Context, channelID, userID string) bool { + return userInChannel + }, + } + config := &mockConfigManager{ + dmDelay: 30, // 30 minute delay + } + + c := &Coordinator{ + stateStore: store, + slack: slack, + configManager: config, + userMapper: &mockUserMapper{}, + } + + // First call: Queue a DM (delay enabled, user in channel) + checkResult1 := newCheckResponse("awaiting_review") + req1 := dmNotificationRequest{ + CheckResult: checkResult1, + UserID: "U123", + ChannelID: "C123", + ChannelName: "general", + Owner: "owner", + Repo: "repo", + PRNumber: 1, + PRTitle: "Test PR", + PRAuthor: "author", + PRURL: "https://github.com/owner/repo/pull/1", + } + + err := c.sendPRNotification(context.Background(), req1) + if err != nil { + t.Fatalf("First call failed: %v", err) + } + + // Verify DM was queued + pendingDMs, err := store.PendingDMs(context.Background(), time.Now().Add(1*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + if len(pendingDMs) != 1 { + t.Fatalf("Expected 1 queued DM, got %d", len(pendingDMs)) + } + + // User leaves channel (or gets kicked) + userInChannel = false + + // Second call: State changes, user NOT in channel anymore + // Even though immediate send would now apply, the queued DM should be updated (not sent early) + checkResult2 := newCheckResponse("tests_broken") + req2 := req1 + req2.CheckResult = checkResult2 + + err = c.sendPRNotification(context.Background(), req2) + if err != nil { + t.Fatalf("Second call failed: %v", err) + } + + // CRITICAL: The queued DM should be UPDATED (not sent early, even though user left channel) + // This ensures we respect the original delay decision + pendingDMs, err = store.PendingDMs(context.Background(), time.Now().Add(1*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + if len(pendingDMs) != 1 { + t.Errorf("Expected 1 queued DM (updated, not sent early), got %d pending DMs", len(pendingDMs)) + } + if pendingDMs[0].PRState != "tests_broken" { + t.Errorf("Expected queued DM state 'tests_broken', got '%s'", pendingDMs[0].PRState) + } + + // No immediate DM should be sent + if len(slack.sentDirectMessages) != 0 { + t.Errorf("Expected no immediate DM (should update queued DM), got %d DMs", len(slack.sentDirectMessages)) + } +} + +// TestSendPRNotification_PreviouslySentDMUpdatedEvenWhenUserNoLongerBlocked tests that +// a previously sent DM is updated when the PR state changes, even if the user no longer +// has actions to take. +func TestSendPRNotification_PreviouslySentDMUpdatedEvenWhenUserNoLongerBlocked(t *testing.T) { + store := &mockStateStore{} + slack := &mockSlackClient{} + config := &mockConfigManager{ + dmDelay: 0, // No delay - send immediately + } + + c := &Coordinator{ + stateStore: store, + slack: slack, + configManager: config, + userMapper: &mockUserMapper{}, + } + + // First call: Send a DM (user needs to review) + checkResult1 := newCheckResponse("awaiting_review") + req1 := dmNotificationRequest{ + CheckResult: checkResult1, + UserID: "U123", + ChannelID: "", + ChannelName: "", + Owner: "owner", + Repo: "repo", + PRNumber: 1, + PRTitle: "Test PR", + PRAuthor: "author", + PRURL: "https://github.com/owner/repo/pull/1", + } + + err := c.sendPRNotification(context.Background(), req1) + if err != nil { + t.Fatalf("First call failed: %v", err) + } + + // Verify DM was sent + if len(slack.sentDirectMessages) != 1 { + t.Fatalf("Expected 1 sent DM, got %d", len(slack.sentDirectMessages)) + } + + // Second call: State changes, user no longer has actions (no NextAction) + // This simulates: user reviewed, PR now needs author to fix tests + checkResult2 := newCheckResponse("tests_broken") + checkResult2.Analysis.NextAction = nil // No actions needed + req2 := req1 + req2.CheckResult = checkResult2 + + err = c.sendPRNotification(context.Background(), req2) + if err != nil { + t.Fatalf("Second call failed: %v", err) + } + + // CRITICAL: The previously sent DM should be UPDATED (not cancelled, not skipped) + // Even though user no longer has actions, we update to reflect new state + if len(slack.updatedMessages) != 1 { + t.Errorf("Expected 1 DM update, got %d updates", len(slack.updatedMessages)) + } + + // Verify state was saved with new state + savedInfo, exists := store.DMMessage(context.Background(), "U123", "https://github.com/owner/repo/pull/1") + if !exists { + t.Error("Expected DM state to be saved") + } else if savedInfo.LastState != "tests_broken" { + t.Errorf("Expected LastState 'tests_broken', got '%s'", savedInfo.LastState) + } +} + +// TestSendPRNotification_QueuedDMBehaviorAcrossRestarts tests that queued DMs persist +// across restarts and are properly handled when the bot restarts. +func TestSendPRNotification_QueuedDMBehaviorAcrossRestarts(t *testing.T) { + // Simulate a queued DM that exists in the datastore (from before restart) + existingQueuedDM := &state.PendingDM{ + ID: "dm-123", + UserID: "U123", + PRURL: "https://github.com/owner/repo/pull/1", + PROwner: "owner", + PRRepo: "repo", + PRNumber: 1, + PRTitle: "Test PR", + PRAuthor: "author", + PRState: "awaiting_review", + SendAfter: time.Now().Add(10 * time.Minute), + QueuedAt: time.Now().Add(-20 * time.Minute), + } + + store := &mockStateStore{ + pendingDMs: []*state.PendingDM{existingQueuedDM}, + } + slack := &mockSlackClient{} + config := &mockConfigManager{ + dmDelay: 30, + } + + c := &Coordinator{ + stateStore: store, + slack: slack, + configManager: config, + userMapper: &mockUserMapper{}, + } + + // After restart, we get a new webhook event for the same PR + // State has changed from awaiting_review to tests_broken + checkResult := newCheckResponse("tests_broken") + req := dmNotificationRequest{ + CheckResult: checkResult, + UserID: "U123", + ChannelID: "C123", + ChannelName: "general", + Owner: "owner", + Repo: "repo", + PRNumber: 1, + PRTitle: "Test PR", + PRAuthor: "author", + PRURL: "https://github.com/owner/repo/pull/1", + } + + err := c.sendPRNotification(context.Background(), req) + if err != nil { + t.Fatalf("Call after restart failed: %v", err) + } + + // CRITICAL: Should find the existing queued DM and update it (not send immediately) + pendingDMs, err := store.PendingDMs(context.Background(), time.Now().Add(1*time.Hour)) + if err != nil { + t.Fatal("Failed to get pending DMs:", err) + } + if len(pendingDMs) != 1 { + t.Fatalf("Expected 1 queued DM after restart, got %d", len(pendingDMs)) + } + + // Verify the queued DM has updated state + if pendingDMs[0].PRState != "tests_broken" { + t.Errorf("Expected queued DM to have state 'tests_broken', got '%s'", pendingDMs[0].PRState) + } + + // No immediate DM should be sent + if len(slack.sentDirectMessages) != 0 { + t.Errorf("Expected no immediate DM (should update queued DM), got %d DMs", len(slack.sentDirectMessages)) + } +}