Skip to content

Commit 839979b

Browse files
Thomas StrombergThomas Stromberg
authored andcommitted
fix dupe DM delivery
1 parent 4419d1d commit 839979b

File tree

3 files changed

+278
-1
lines changed

3 files changed

+278
-1
lines changed

pkg/bot/bot.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type Coordinator struct {
7171
threadCache *cache.ThreadCache // In-memory cache for fast lookups
7272
commitPRCache *cache.CommitPRCache // Maps commit SHAs to PR numbers for check events
7373
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
74+
dmLocks sync.Map // Per-user-PR locks to prevent duplicate DMs (key: "userID:prURL")
7475
}
7576

7677
// StateStore interface for persistent state - allows dependency injection for testing.

pkg/bot/dm.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"log/slog"
8+
"sync"
89
"time"
910

1011
"github.com/codeGROOVE-dev/slacker/pkg/notify"
@@ -32,12 +33,21 @@ type dmNotificationRequest struct {
3233
// Updates to existing DMs happen immediately (no delay).
3334
// New DMs respect reminder_dm_delay (queue for later if user in channel).
3435
func (c *Coordinator) sendPRNotification(ctx context.Context, req dmNotificationRequest) error {
36+
// Lock per user+PR to prevent concurrent goroutines from sending duplicate DMs
37+
lockKey := req.UserID + ":" + req.PRURL
38+
lockValue, _ := c.dmLocks.LoadOrStore(lockKey, &sync.Mutex{})
39+
mu := lockValue.(*sync.Mutex) //nolint:errcheck,revive // Type assertion always succeeds - we control what's stored
40+
mu.Lock()
41+
defer mu.Unlock()
42+
3543
prState := derivePRState(req.CheckResult)
3644

3745
// Get last notification from datastore
3846
lastNotif, exists := c.stateStore.DMMessage(ctx, req.UserID, req.PRURL)
3947

4048
// Idempotency: skip if state unchanged
49+
// The per-user-PR lock above ensures no race conditions from concurrent calls
50+
// This check ensures we only send/update when the PR state actually changes
4151
if exists && lastNotif.LastState == prState {
4252
slog.Debug("DM skipped - state unchanged",
4353
"user", req.UserID,
@@ -282,7 +292,28 @@ func (c *Coordinator) queueDMForUser(ctx context.Context, req dmNotificationRequ
282292
}
283293

284294
// Queue to state store - the notify scheduler will process it
285-
return c.stateStore.QueuePendingDM(ctx, dm)
295+
if err := c.stateStore.QueuePendingDM(ctx, dm); err != nil {
296+
return err
297+
}
298+
299+
// Save DM state immediately (with placeholder) so subsequent updates know about it
300+
// This prevents duplicate DMs when multiple webhook events arrive concurrently
301+
now := time.Now()
302+
if err := c.stateStore.SaveDMMessage(ctx, req.UserID, req.PRURL, state.DMInfo{
303+
SentAt: now,
304+
UpdatedAt: now,
305+
ChannelID: "", // Will be filled in when actually sent
306+
MessageTS: "", // Will be filled in when actually sent
307+
MessageText: "",
308+
LastState: prState,
309+
}); err != nil {
310+
slog.Warn("failed to save DM state after queueing",
311+
"user", req.UserID,
312+
"pr", req.PRURL,
313+
"error", err)
314+
}
315+
316+
return nil
286317
}
287318

288319
// generateUUID creates a simple UUID for pending DM tracking.

pkg/bot/dm_simplified_test.go

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,3 +701,248 @@ func TestSendDMNotificationsToBlockedUsers(t *testing.T) {
701701
t.Error("Expected DMs sent to U111 and U222")
702702
}
703703
}
704+
705+
// TestSendPRNotification_ConcurrentCallsNoDuplicates tests that concurrent calls don't send duplicate DMs.
706+
func TestSendPRNotification_ConcurrentCallsNoDuplicates(t *testing.T) {
707+
store := &mockStateStore{}
708+
slack := &mockSlackClient{}
709+
config := &mockConfigManager{
710+
dmDelay: 0, // No delay for simplicity
711+
}
712+
713+
c := &Coordinator{
714+
stateStore: store,
715+
slack: slack,
716+
configManager: config,
717+
userMapper: &mockUserMapper{},
718+
}
719+
720+
checkResult := newCheckResponse("awaiting_review")
721+
req := dmNotificationRequest{
722+
CheckResult: checkResult,
723+
UserID: "U123",
724+
ChannelID: "",
725+
ChannelName: "",
726+
Owner: "owner",
727+
Repo: "repo",
728+
PRNumber: 1,
729+
PRTitle: "Test PR",
730+
PRAuthor: "author",
731+
PRURL: "https://github.com/owner/repo/pull/1",
732+
}
733+
734+
// Call sendPRNotification multiple times concurrently (simulating concurrent webhook events)
735+
const concurrentCalls = 10
736+
errChan := make(chan error, concurrentCalls)
737+
738+
for range concurrentCalls {
739+
go func() {
740+
errChan <- c.sendPRNotification(context.Background(), req)
741+
}()
742+
}
743+
744+
// Collect errors
745+
for range concurrentCalls {
746+
if err := <-errChan; err != nil {
747+
t.Errorf("Unexpected error from concurrent call: %v", err)
748+
}
749+
}
750+
751+
// Verify only ONE DM was actually sent (not 10!)
752+
if len(slack.sentDirectMessages) != 1 {
753+
t.Errorf("Expected exactly 1 DM sent despite %d concurrent calls, got %d", concurrentCalls, len(slack.sentDirectMessages))
754+
}
755+
}
756+
757+
// TestSendPRNotification_QueuedThenUpdated tests that queued DMs don't create duplicates on updates.
758+
func TestSendPRNotification_QueuedThenUpdated(t *testing.T) {
759+
store := &mockStateStore{}
760+
slack := &mockSlackClient{
761+
isUserInChannelFunc: func(ctx context.Context, channelID, userID string) bool {
762+
return true // User is in channel, so DM will be queued
763+
},
764+
}
765+
config := &mockConfigManager{
766+
dmDelay: 30, // 30 minute delay
767+
workspace: "test-workspace",
768+
}
769+
770+
c := &Coordinator{
771+
stateStore: store,
772+
slack: slack,
773+
configManager: config,
774+
userMapper: &mockUserMapper{},
775+
}
776+
777+
// First call: Queue a DM (tests_running state)
778+
checkResult1 := newCheckResponse("tests_running")
779+
req1 := dmNotificationRequest{
780+
CheckResult: checkResult1,
781+
UserID: "U123",
782+
ChannelID: "C123",
783+
ChannelName: "general",
784+
Owner: "owner",
785+
Repo: "repo",
786+
PRNumber: 1,
787+
PRTitle: "Test PR",
788+
PRAuthor: "author",
789+
PRURL: "https://github.com/owner/repo/pull/1",
790+
}
791+
792+
err := c.sendPRNotification(context.Background(), req1)
793+
if err != nil {
794+
t.Fatalf("First call failed: %v", err)
795+
}
796+
797+
// Verify DM was queued
798+
if len(store.pendingDMs) != 1 {
799+
t.Fatalf("Expected 1 queued DM, got %d", len(store.pendingDMs))
800+
}
801+
802+
// Verify no DM sent yet
803+
if len(slack.sentDirectMessages) != 0 {
804+
t.Fatalf("Expected no immediate DM, got %d", len(slack.sentDirectMessages))
805+
}
806+
807+
// Second call immediately after: PR state changes to awaiting_review
808+
// This simulates a legitimate state change that should be processed
809+
checkResult2 := newCheckResponse("awaiting_review")
810+
req2 := req1 // Copy
811+
req2.CheckResult = checkResult2
812+
req2.ChannelID = "" // No channel info this time
813+
814+
err = c.sendPRNotification(context.Background(), req2)
815+
if err != nil {
816+
t.Fatalf("Second call failed: %v", err)
817+
}
818+
819+
// The state CHANGED, so this should send a new DM (queued DM isn't sent yet)
820+
// This is correct behavior - legitimate state changes should always be processed
821+
if len(slack.sentDirectMessages) != 1 {
822+
t.Errorf("Expected 1 DM sent for legitimate state change, got %d DMs", len(slack.sentDirectMessages))
823+
}
824+
825+
// Verify the sent DM has the new state
826+
if len(slack.sentDirectMessages) > 0 {
827+
// Check that state was updated to awaiting_review
828+
savedInfo, exists := store.DMMessage(context.Background(), "U123", "https://github.com/owner/repo/pull/1")
829+
if !exists {
830+
t.Error("Expected DM state to be saved")
831+
} else if savedInfo.LastState != "awaiting_review" {
832+
t.Errorf("Expected LastState 'awaiting_review', got '%s'", savedInfo.LastState)
833+
}
834+
}
835+
}
836+
837+
// TestUpdateDMMessagesForPR_MultipleConcurrentCalls tests that concurrent update calls don't send duplicates.
838+
func TestUpdateDMMessagesForPR_MultipleConcurrentCalls(t *testing.T) {
839+
store := &mockStateStore{
840+
dmUsers: map[string][]string{
841+
"https://github.com/owner/repo/pull/1": {"U123"},
842+
},
843+
}
844+
slack := &mockSlackClient{}
845+
846+
c := &Coordinator{
847+
stateStore: store,
848+
slack: slack,
849+
configManager: &mockConfigManager{domain: "example.com", dmDelay: 0},
850+
userMapper: &mockUserMapper{},
851+
}
852+
853+
checkResult := newCheckResponse("awaiting_review")
854+
info := prUpdateInfo{
855+
CheckResult: checkResult,
856+
Owner: "owner",
857+
Repo: "repo",
858+
PRNumber: 1,
859+
PRTitle: "Test PR",
860+
PRAuthor: "author",
861+
PRState: "awaiting_review",
862+
PRURL: "https://github.com/owner/repo/pull/1",
863+
}
864+
865+
// Call updateDMMessagesForPR multiple times concurrently (simulating multiple webhook events)
866+
const concurrentCalls = 6 // This is how many duplicates the user got
867+
doneChan := make(chan bool, concurrentCalls)
868+
869+
for range concurrentCalls {
870+
go func() {
871+
c.updateDMMessagesForPR(context.Background(), info)
872+
doneChan <- true
873+
}()
874+
}
875+
876+
// Wait for all to complete
877+
for range concurrentCalls {
878+
<-doneChan
879+
}
880+
881+
// Verify only ONE DM was actually sent (not 6!)
882+
if len(slack.sentDirectMessages) != 1 {
883+
t.Errorf("Expected exactly 1 DM despite %d concurrent calls, got %d DMs", concurrentCalls, len(slack.sentDirectMessages))
884+
}
885+
}
886+
887+
// TestSendPRNotification_RapidStateChanges tests that rapid legitimate state changes all get processed.
888+
func TestSendPRNotification_RapidStateChanges(t *testing.T) {
889+
store := &mockStateStore{}
890+
slack := &mockSlackClient{}
891+
config := &mockConfigManager{
892+
dmDelay: 0, // No delay for simplicity
893+
}
894+
895+
c := &Coordinator{
896+
stateStore: store,
897+
slack: slack,
898+
configManager: config,
899+
userMapper: &mockUserMapper{},
900+
}
901+
902+
// Send 5 notifications with DIFFERENT states in rapid succession (within 30 seconds)
903+
states := []string{"tests_running", "awaiting_review", "approved", "changes_requested", "awaiting_review"}
904+
905+
for i, state := range states {
906+
checkResult := newCheckResponse(state)
907+
req := dmNotificationRequest{
908+
CheckResult: checkResult,
909+
UserID: "U123",
910+
ChannelID: "",
911+
ChannelName: "",
912+
Owner: "owner",
913+
Repo: "repo",
914+
PRNumber: 1,
915+
PRTitle: "Test PR",
916+
PRAuthor: "author",
917+
PRURL: "https://github.com/owner/repo/pull/1",
918+
}
919+
920+
err := c.sendPRNotification(context.Background(), req)
921+
if err != nil {
922+
t.Errorf("Call %d with state %s failed: %v", i+1, state, err)
923+
}
924+
925+
// Verify state was saved
926+
savedInfo, exists := store.DMMessage(context.Background(), "U123", "https://github.com/owner/repo/pull/1")
927+
if !exists {
928+
t.Errorf("After call %d: Expected DM state to be saved", i+1)
929+
} else if savedInfo.LastState != state {
930+
t.Errorf("After call %d: Expected LastState '%s', got '%s'", i+1, state, savedInfo.LastState)
931+
}
932+
}
933+
934+
// First call sends DM, next 4 calls update it (or send new if update fails)
935+
// We should have either 1 DM with 4 updates, or up to 5 DMs if all "updates" became new sends
936+
// The key is: we should have processed all 5 state changes, not skipped any
937+
totalOperations := len(slack.sentDirectMessages) + len(slack.updatedMessages)
938+
if totalOperations < 4 { // At minimum: 1 send + 3 updates (one state appears twice)
939+
t.Errorf("Expected at least 4 DM operations for 5 state changes (one duplicate), got %d sends + %d updates = %d total",
940+
len(slack.sentDirectMessages), len(slack.updatedMessages), totalOperations)
941+
}
942+
943+
// Verify final state is correct
944+
savedInfo, _ := store.DMMessage(context.Background(), "U123", "https://github.com/owner/repo/pull/1")
945+
if savedInfo.LastState != "awaiting_review" {
946+
t.Errorf("Expected final state 'awaiting_review', got '%s'", savedInfo.LastState)
947+
}
948+
}

0 commit comments

Comments
 (0)