From df8c9a5d8c908a8060323b5ea5dd57e39a5dff18 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 29 Oct 2025 18:39:40 -0400 Subject: [PATCH 1/6] Add PR reminder feature --- cmd/server/main.go | 39 +--- go.mod | 1 + go.sum | 2 + pkg/bot/integration_test.go | 9 +- pkg/home/fetcher_test.go | 12 ++ pkg/notify/daily_digest_test.go | 2 +- pkg/notify/daily_test.go | 2 +- pkg/notify/format_test.go | 4 +- pkg/notify/notify.go | 242 ++++++++++++++++++++-- pkg/notify/notify_test.go | 2 +- pkg/notify/notify_user_test.go | 354 +++++++++++++++++++++++++++++++- pkg/notify/run_test.go | 23 ++- pkg/state/datastore.go | 129 ++++++++++++ pkg/state/datastore_test.go | 261 +++++++++++++++++++++++ pkg/state/json.go | 67 +++++- pkg/state/json_test.go | 287 ++++++++++++++++++++++++++ pkg/state/memory.go | 39 ++++ pkg/state/memory_test.go | 231 +++++++++++++++++++++ pkg/state/store.go | 25 +++ 19 files changed, 1664 insertions(+), 67 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index ffadab1..5052e29 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -162,24 +162,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi slackManager := slack.NewManager(cfg.SlackSigningSecret) // Initialize state store (in-memory + Datastore or JSON for persistence). - //nolint:interfacebloat // Interface mirrors state.Store for local type safety - var stateStore interface { - Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) - SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error - LastDM(userID, prURL string) (time.Time, bool) - RecordDM(userID, prURL string, sentAt time.Time) error - DMMessage(userID, prURL string) (state.DMInfo, bool) - SaveDMMessage(userID, prURL string, info state.DMInfo) error - ListDMUsers(prURL string) []string - LastDigest(userID, date string) (time.Time, bool) - RecordDigest(userID, date string, sentAt time.Time) error - WasProcessed(eventKey string) bool - MarkProcessed(eventKey string, ttl time.Duration) error - LastNotification(prURL string) time.Time - RecordNotification(prURL string, notifiedAt time.Time) error - Cleanup() error - Close() error - } + var stateStore state.Store // Check if Datastore should be used via DATASTORE= // Examples: @@ -256,7 +239,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi slog.Info("configured Slack manager with state store for DM tracking") // Initialize notification manager for multi-workspace notifications. - notifier := notify.New(notify.WrapSlackManager(slackManager), configManager) + notifier := notify.New(notify.WrapSlackManager(slackManager), configManager, stateStore) // Initialize event router for multi-workspace event handling. eventRouter := slack.NewEventRouter(slackManager) @@ -679,23 +662,7 @@ func runBotCoordinators( githubManager *github.Manager, configManager *config.Manager, notifier *notify.Manager, - stateStore interface { - Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool) - SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error - LastDM(userID, prURL string) (time.Time, bool) - RecordDM(userID, prURL string, sentAt time.Time) error - DMMessage(userID, prURL string) (state.DMInfo, bool) - SaveDMMessage(userID, prURL string, info state.DMInfo) error - ListDMUsers(prURL string) []string - LastDigest(userID, date string) (time.Time, bool) - RecordDigest(userID, date string, sentAt time.Time) error - WasProcessed(eventKey string) bool - MarkProcessed(eventKey string, ttl time.Duration) error - LastNotification(prURL string) time.Time - RecordNotification(prURL string, notifiedAt time.Time) error - Cleanup() error - Close() error - }, + stateStore state.Store, sprinklerURL string, ) error { cm := &coordinatorManager{ diff --git a/go.mod b/go.mod index 63fa9ef..d162a23 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/cloudflare/circl v1.6.1 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/go-querystring v1.1.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect diff --git a/go.sum b/go.sum index 00dbba4..67aa2b6 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/google/go-github/v50 v50.2.0 h1:j2FyongEHlO9nxXLc+LP3wuBSVU9mVxfpdYUe github.com/google/go-github/v50 v50.2.0/go.mod h1:VBY8FB6yPIjrtKhozXv4FQupxKLS6H4m6xFZlT43q8Q= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= diff --git a/pkg/bot/integration_test.go b/pkg/bot/integration_test.go index f9e40fc..9705279 100644 --- a/pkg/bot/integration_test.go +++ b/pkg/bot/integration_test.go @@ -10,6 +10,7 @@ import ( "github.com/codeGROOVE-dev/slacker/pkg/notify" "github.com/codeGROOVE-dev/slacker/pkg/slack" "github.com/codeGROOVE-dev/slacker/pkg/slacktest" + "github.com/codeGROOVE-dev/slacker/pkg/state" "github.com/codeGROOVE-dev/slacker/pkg/usermapping" "github.com/codeGROOVE-dev/turnclient/pkg/turn" slackapi "github.com/slack-go/slack" @@ -231,7 +232,10 @@ func TestDMDelayLogicIntegration(t *testing.T) { dmDelay: 65, // 65 minute delay } - notifier := notify.New(notify.WrapSlackManager(slackManager), configMgr) + // Create in-memory store for pending DMs + store := state.NewMemoryStore() + + notifier := notify.New(notify.WrapSlackManager(slackManager), configMgr, store) prInfo := notify.PRInfo{ Owner: "test", @@ -291,7 +295,8 @@ func TestDMDelayLogicIntegration(t *testing.T) { // Create fresh tracker with initialized maps notifier.Tracker = ¬ify.NotificationTracker{} // Initialize the tracker by creating a new notifier - notifier = notify.New(notify.WrapSlackManager(slackManager), configMgr) + store = state.NewMemoryStore() // Reset store as well + notifier = notify.New(notify.WrapSlackManager(slackManager), configMgr, store) // Setup test scenario if tt.setupFunc != nil { diff --git a/pkg/home/fetcher_test.go b/pkg/home/fetcher_test.go index e18d677..43bb2dc 100644 --- a/pkg/home/fetcher_test.go +++ b/pkg/home/fetcher_test.go @@ -260,6 +260,18 @@ func (m *mockStateStore) Cleanup() error { return nil } +func (m *mockStateStore) QueuePendingDM(dm state.PendingDM) error { + return nil +} + +func (m *mockStateStore) GetPendingDMs(before time.Time) ([]state.PendingDM, error) { + return nil, nil +} + +func (m *mockStateStore) RemovePendingDM(id string) error { + return nil +} + func (m *mockStateStore) Close() error { return nil } diff --git a/pkg/notify/daily_digest_test.go b/pkg/notify/daily_digest_test.go index f30f2b3..b040419 100644 --- a/pkg/notify/daily_digest_test.go +++ b/pkg/notify/daily_digest_test.go @@ -603,7 +603,7 @@ func TestNewDailyDigestScheduler_FactoryWorks(t *testing.T) { mockConfigMgr := &mockConfigProvider{} mockState := &mockStateProvider{} mockSlack := &mockSlackManagerWithClient{} - manager := New(mockSlack, mockConfigMgr) + manager := New(mockSlack, mockConfigMgr, &mockStore{}) scheduler := NewDailyDigestScheduler(manager, mockGitHubMgr, mockConfigMgr, mockState, mockSlack) diff --git a/pkg/notify/daily_test.go b/pkg/notify/daily_test.go index 312efa8..61fb19f 100644 --- a/pkg/notify/daily_test.go +++ b/pkg/notify/daily_test.go @@ -472,7 +472,7 @@ func TestNewDailyDigestScheduler_WithInterfaces(t *testing.T) { mockConfigMgr := &mockConfigProvider{} mockState := &mockStateProvider{} mockSlack := &mockSlackManagerWithClient{} - manager := New(mockSlack, mockConfigMgr) + manager := New(mockSlack, mockConfigMgr, &mockStore{}) scheduler := NewDailyDigestScheduler(manager, mockGitHubMgr, mockConfigMgr, mockState, mockSlack) diff --git a/pkg/notify/format_test.go b/pkg/notify/format_test.go index 8dcb064..ba9a05b 100644 --- a/pkg/notify/format_test.go +++ b/pkg/notify/format_test.go @@ -584,7 +584,7 @@ func TestNew(t *testing.T) { mockConfig := &mockConfigManager{} // Call New - it should not panic - manager := New(nil, mockConfig) + manager := New(nil, mockConfig, &mockStore{}) if manager == nil { t.Fatal("expected non-nil manager") @@ -604,7 +604,7 @@ func TestNewDailyDigestScheduler(t *testing.T) { mockConfig := &mockConfigManager{} mockState := &mockStateProvider{} mockSlack := &mockSlackManager{} - manager := New(nil, mockConfig) + manager := New(nil, mockConfig, &mockStore{}) scheduler := NewDailyDigestScheduler(manager, nil, mockConfig, mockState, mockSlack) diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 552ac96..bbd562e 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -3,12 +3,15 @@ package notify import ( "context" + "encoding/json" "fmt" "log/slog" "strings" "time" + "github.com/codeGROOVE-dev/slacker/pkg/state" "github.com/codeGROOVE-dev/turnclient/pkg/turn" + "github.com/google/uuid" ) // Constants for notification defaults. @@ -260,24 +263,33 @@ func formatNextActionsInternal(ctx context.Context, nextActions map[string]turn. return strings.Join(parts, "; ") } +// Store interface for persistent DM queue management. +type Store interface { + QueuePendingDM(dm state.PendingDM) error + GetPendingDMs(before time.Time) ([]state.PendingDM, error) + RemovePendingDM(id string) error +} + // Manager handles user notifications across multiple workspaces. type Manager struct { slackManager SlackManager Tracker *NotificationTracker configManager ConfigManager + store Store } // New creates a new notification manager. -func New(slackManager SlackManager, configManager ConfigManager) *Manager { +func New(slackManager SlackManager, configManager ConfigManager, store Store) *Manager { return &Manager{ - slackManager: slackManager, + slackManager: slackManager, + configManager: configManager, + store: store, Tracker: &NotificationTracker{ lastDM: make(map[string]time.Time), lastDaily: make(map[string]time.Time), lastChannelNotification: make(map[string]time.Time), lastUserPRChannelTag: make(map[string]TagInfo), }, - configManager: configManager, } } @@ -295,15 +307,10 @@ func (m *Manager) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - // Check if any users need notifications. - // This would iterate through workspaces and users. - // For now, we'll implement a simplified version. - slog.Debug("checking for pending notifications") - // In production, this would: - // 1. Iterate through all workspaces - // 2. For each workspace, check all users with pending PRs - // 3. Apply notification logic based on preferences - // 4. Send notifications as needed + // Check for pending DMs that should be sent now + if err := m.processPendingDMs(ctx); err != nil { + slog.Error("failed to process pending DMs", "error", err) + } case <-cleanupTicker.C: // Clean up entries older than 7 days // This keeps recent data for rate limiting while preventing unbounded growth @@ -313,6 +320,171 @@ func (m *Manager) Run(ctx context.Context) error { } } +// processPendingDMs checks for pending DMs that should be sent and sends them. +func (m *Manager) processPendingDMs(ctx context.Context) error { + now := time.Now() + pendingDMs, err := m.store.GetPendingDMs(now) + if err != nil { + return fmt.Errorf("failed to get pending DMs: %w", err) + } + + if len(pendingDMs) == 0 { + return nil + } + + slog.Info("processing pending DMs", "count", len(pendingDMs)) + + for _, dm := range pendingDMs { + // Deserialize NextActions + var nextAction map[string]turn.Action + if err := json.Unmarshal([]byte(dm.NextActions), &nextAction); err != nil { + slog.Error("failed to deserialize next actions for pending DM", + "dm_id", dm.ID, + "user", dm.UserID, + "pr", fmt.Sprintf("%s/%s#%d", dm.PROwner, dm.PRRepo, dm.PRNumber), + "error", err) + nextAction = make(map[string]turn.Action) + } + + // Reconstruct PRInfo from pending DM + prInfo := PRInfo{ + Owner: dm.PROwner, + Repo: dm.PRRepo, + Title: dm.PRTitle, + Author: dm.PRAuthor, + State: dm.PRState, + HTMLURL: dm.PRURL, + Number: dm.PRNumber, + WorkflowState: dm.WorkflowState, + NextAction: nextAction, + } + + // Send the DM (bypassing the deferral logic by not passing tagInfo) + // We call the actual DM sending logic directly + if err := m.sendDMNow(ctx, dm.WorkspaceID, dm.UserID, dm.ChannelID, dm.ChannelName, prInfo); err != nil { + slog.Error("failed to send pending DM", + "dm_id", dm.ID, + "user", dm.UserID, + "pr", fmt.Sprintf("%s/%s#%d", dm.PROwner, dm.PRRepo, dm.PRNumber), + "error", err) + // Continue processing other DMs even if one fails + continue + } + + // Remove from queue after successful send + if err := m.store.RemovePendingDM(dm.ID); err != nil { + slog.Error("failed to remove pending DM from queue", + "dm_id", dm.ID, + "user", dm.UserID, + "pr", fmt.Sprintf("%s/%s#%d", dm.PROwner, dm.PRRepo, dm.PRNumber), + "error", err) + // Don't return error - the DM was sent successfully + } else { + slog.Info("sent and removed pending DM", + "dm_id", dm.ID, + "user", dm.UserID, + "pr", fmt.Sprintf("%s/%s#%d", dm.PROwner, dm.PRRepo, dm.PRNumber), + "queued_at", dm.QueuedAt, + "send_after", dm.SendAfter, + "delay", now.Sub(dm.QueuedAt)) + } + } + + return nil +} + +// sendDMNow sends a DM immediately, bypassing deferral logic. +// This is used by the scheduler to send queued DMs. +func (m *Manager) sendDMNow(ctx context.Context, workspaceID, userID, channelID, channelName string, pr PRInfo) error { + // Get the Slack client for this workspace + slackClient, err := m.slackManager.Client(ctx, workspaceID) + if err != nil { + return fmt.Errorf("failed to get Slack client: %w", err) + } + + // Check anti-spam protection + lastDM := m.Tracker.LastDMNotification(workspaceID, userID) + timeSinceLastDM := time.Since(lastDM) + antiSpamDelay := 1 * time.Minute + + if timeSinceLastDM < antiSpamDelay { + slog.Info("skipping DM - anti-spam protection active", + "user", userID, + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), + "time_since_last_dm", timeSinceLastDM, + "time_until_next_allowed", antiSpamDelay-timeSinceLastDM) + return nil + } + + // Check if user is active + isActive := slackClient.IsUserActive(ctx, userID) + if !isActive { + slog.Info("deferring DM - user not active on Slack", + "user", userID, + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number)) + // Re-queue for later (add 10 minutes to send time) + // TODO: Implement re-queuing logic if needed + return nil + } + + // Format notification message + var prefix string + if pr.WorkflowState != "" { + prefix = PrefixForAnalysis(pr.WorkflowState, pr.NextAction) + } else { + prefix = PrefixForState(pr.State) + } + + // Format: :emoji: Title · author → action + var action string + switch pr.State { + case "newly_published": + action = "newly published" + case "tests_broken": + action = "fix tests" + case "awaiting_review": + action = "review" + case "changes_requested": + action = "address feedback" + case "approved": + action = "merge" + default: + // Derive action from NextAction if available + if len(pr.NextAction) > 0 { + action = strings.ReplaceAll(PrimaryAction(pr.NextAction), "_", " ") + } + } + + message := fmt.Sprintf("%s %s <%s|%s/%s#%d>", + prefix, + pr.Title, + pr.HTMLURL, + pr.Owner, + pr.Repo, + pr.Number) + + if action != "" { + message += fmt.Sprintf(" · %s → %s", pr.Author, action) + } + + // Send DM + _, _, err = slackClient.SendDirectMessage(ctx, userID, message) + if err != nil { + return fmt.Errorf("failed to send DM: %w", err) + } + + // Track that we sent this DM + m.Tracker.UpdateDMNotification(workspaceID, userID) + + slog.Info("sent deferred DM", + "user", userID, + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), + "channel_id", channelID, + "channel_name", channelName) + + return nil +} + // PRInfo contains the minimal information needed to notify about a PR. // //nolint:govet // fieldalignment optimization would reduce clarity @@ -580,13 +752,55 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID delayDuration := time.Duration(delayMins) * time.Minute if timeSinceTag < delayDuration { - slog.Info("deferring DM - user was tagged in channel recently", + // Queue this DM to be sent later + sendAfter := tagInfo.Timestamp.Add(delayDuration) + + // Serialize NextAction map to JSON + nextActionsJSON, err := json.Marshal(pr.NextAction) + if err != nil { + slog.Error("failed to serialize next actions for pending DM", + "user", userID, + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), + "error", err) + nextActionsJSON = []byte("{}") + } + + pendingDM := state.PendingDM{ + ID: uuid.New().String(), + WorkspaceID: workspaceID, + UserID: userID, + PROwner: pr.Owner, + PRRepo: pr.Repo, + PRNumber: pr.Number, + PRURL: pr.HTMLURL, + PRTitle: pr.Title, + PRAuthor: pr.Author, + PRState: pr.State, + WorkflowState: pr.WorkflowState, + NextActions: string(nextActionsJSON), + ChannelID: taggedChannelID, + ChannelName: channelName, + QueuedAt: time.Now(), + SendAfter: sendAfter, + } + + if err := m.store.QueuePendingDM(pendingDM); err != nil { + slog.Error("failed to queue pending DM", + "user", userID, + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), + "error", err) + return fmt.Errorf("failed to queue pending DM: %w", err) + } + + slog.Info("queued DM for later delivery", "user", userID, "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), "channel_id", taggedChannelID, "time_since_tag", timeSinceTag, "configured_delay", delayDuration, - "time_until_dm", delayDuration-timeSinceTag) + "send_after", sendAfter, + "time_until_dm", delayDuration-timeSinceTag, + "dm_id", pendingDM.ID) return nil } diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index e5c05be..2cff175 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -18,7 +18,7 @@ func TestNotifyManagerRun(t *testing.T) { mockSlackMgr := &mockSlackManager{} mockConfigMgr := &mockConfigManager{} - manager := New(mockSlackMgr, mockConfigMgr) + manager := New(mockSlackMgr, mockConfigMgr, &mockStore{}) // Create a context with short timeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) diff --git a/pkg/notify/notify_user_test.go b/pkg/notify/notify_user_test.go index be45fa8..48438f2 100644 --- a/pkg/notify/notify_user_test.go +++ b/pkg/notify/notify_user_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/codeGROOVE-dev/slacker/pkg/state" slackapi "github.com/slack-go/slack" ) @@ -197,16 +198,18 @@ func TestNotifyUser_DelayedDM_UserInChannel(t *testing.T) { mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} mockConfigMgr := &mockConfigManager{} + mockSt := &mockStore{} manager := &Manager{ - slackManager: mockSlackMgr, + slackManager: mockSlackMgr, + configManager: mockConfigMgr, + store: mockSt, Tracker: &NotificationTracker{ lastDM: make(map[string]time.Time), lastDaily: make(map[string]time.Time), lastChannelNotification: make(map[string]time.Time), lastUserPRChannelTag: make(map[string]TagInfo), }, - configManager: mockConfigMgr, } // User was tagged in channel 30 minutes ago (less than 65 minute delay) @@ -313,16 +316,18 @@ func TestNotifyUser_DelayElapsed(t *testing.T) { mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} mockConfigMgr := &mockConfigManager{} + mockSt := &mockStore{} manager := &Manager{ - slackManager: mockSlackMgr, + slackManager: mockSlackMgr, + configManager: mockConfigMgr, + store: mockSt, Tracker: &NotificationTracker{ lastDM: make(map[string]time.Time), lastDaily: make(map[string]time.Time), lastChannelNotification: make(map[string]time.Time), lastUserPRChannelTag: make(map[string]TagInfo), }, - configManager: mockConfigMgr, } // User was tagged 70 minutes ago (more than 65 minute delay) @@ -448,3 +453,344 @@ func TestNotifyUser_SendDirectMessageError(t *testing.T) { t.Error("expected error from SendDirectMessage failure") } } + +// mockStoreCustomizable allows customizing store behavior for testing. +type mockStoreCustomizable struct { + queuePendingDMFunc func(dm state.PendingDM) error + getPendingDMsFunc func(before time.Time) ([]state.PendingDM, error) + removePendingDMFunc func(id string) error +} + +func (m *mockStoreCustomizable) QueuePendingDM(dm state.PendingDM) error { + if m.queuePendingDMFunc != nil { + return m.queuePendingDMFunc(dm) + } + return nil +} + +func (m *mockStoreCustomizable) GetPendingDMs(before time.Time) ([]state.PendingDM, error) { + if m.getPendingDMsFunc != nil { + return m.getPendingDMsFunc(before) + } + return nil, nil +} + +func (m *mockStoreCustomizable) RemovePendingDM(id string) error { + if m.removePendingDMFunc != nil { + return m.removePendingDMFunc(id) + } + return nil +} + +// TestProcessPendingDMs tests the processPendingDMs function. +func TestProcessPendingDMs(t *testing.T) { + dmsSent := make([]string, 0) + + mockClient := &mockSlackClient{ + isUserActiveFunc: func(ctx context.Context, userID string) bool { + return true // All users active + }, + sendDirectMessageFunc: func(ctx context.Context, userID, text string) (string, string, error) { + dmsSent = append(dmsSent, userID) + return "D123", "1234567890.123456", nil + }, + } + + mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} + mockSt := &mockStoreCustomizable{ + getPendingDMsFunc: func(before time.Time) ([]state.PendingDM, error) { + now := time.Now() + // Return 2 DMs that are ready to send + return []state.PendingDM{ + { + ID: "dm-001", + WorkspaceID: "T123", + UserID: "U001", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 123, + PRURL: "https://github.com/test-org/test-repo/pull/123", + PRTitle: "Test PR 1", + SendAfter: now.Add(-5 * time.Minute), // Ready to send + }, + { + ID: "dm-002", + WorkspaceID: "T123", + UserID: "U002", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 456, + PRURL: "https://github.com/test-org/test-repo/pull/456", + PRTitle: "Test PR 2", + SendAfter: now.Add(-10 * time.Minute), // Ready to send + }, + }, nil + }, + removePendingDMFunc: func(id string) error { + return nil // Successfully removed + }, + } + + manager := &Manager{ + slackManager: mockSlackMgr, + store: mockSt, + Tracker: &NotificationTracker{ + lastDM: make(map[string]time.Time), + lastDaily: make(map[string]time.Time), + lastChannelNotification: make(map[string]time.Time), + lastUserPRChannelTag: make(map[string]TagInfo), + }, + configManager: &mockConfigManager{}, + } + + ctx := context.Background() + err := manager.processPendingDMs(ctx) + if err != nil { + t.Fatalf("unexpected error processing pending DMs: %v", err) + } + + // Verify both DMs were sent + if len(dmsSent) != 2 { + t.Errorf("expected 2 DMs to be sent, got %d", len(dmsSent)) + } + + // Verify correct users received DMs + userMap := make(map[string]bool) + for _, userID := range dmsSent { + userMap[userID] = true + } + if !userMap["U001"] { + t.Error("expected U001 to receive DM") + } + if !userMap["U002"] { + t.Error("expected U002 to receive DM") + } +} + +// TestProcessPendingDMs_EmptyQueue tests processPendingDMs with no pending DMs. +func TestProcessPendingDMs_EmptyQueue(t *testing.T) { + mockSt := &mockStoreCustomizable{ + getPendingDMsFunc: func(before time.Time) ([]state.PendingDM, error) { + return []state.PendingDM{}, nil // No pending DMs + }, + } + + manager := &Manager{ + store: mockSt, + Tracker: &NotificationTracker{ + lastDM: make(map[string]time.Time), + lastDaily: make(map[string]time.Time), + lastChannelNotification: make(map[string]time.Time), + lastUserPRChannelTag: make(map[string]TagInfo), + }, + } + + ctx := context.Background() + err := manager.processPendingDMs(ctx) + if err != nil { + t.Fatalf("unexpected error with empty queue: %v", err) + } +} + +// TestProcessPendingDMs_StoreError tests error handling when store fails. +func TestProcessPendingDMs_StoreError(t *testing.T) { + mockSt := &mockStoreCustomizable{ + getPendingDMsFunc: func(before time.Time) ([]state.PendingDM, error) { + return nil, errors.New("database error") + }, + } + + manager := &Manager{ + store: mockSt, + } + + ctx := context.Background() + err := manager.processPendingDMs(ctx) + if err == nil { + t.Error("expected error when store fails") + } +} + +// TestSendDMNow tests the sendDMNow function. +func TestSendDMNow(t *testing.T) { + dmSent := false + var sentMessage string + + mockClient := &mockSlackClient{ + isUserActiveFunc: func(ctx context.Context, userID string) bool { + return true + }, + sendDirectMessageFunc: func(ctx context.Context, userID, text string) (string, string, error) { + dmSent = true + sentMessage = text + return "D123", "1234567890.123456", nil + }, + } + + mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} + + manager := &Manager{ + slackManager: mockSlackMgr, + Tracker: &NotificationTracker{ + lastDM: make(map[string]time.Time), + lastDaily: make(map[string]time.Time), + lastChannelNotification: make(map[string]time.Time), + lastUserPRChannelTag: make(map[string]TagInfo), + }, + configManager: &mockConfigManager{}, + } + + ctx := context.Background() + pr := PRInfo{ + Owner: "test-org", + Repo: "test-repo", + Number: 123, + HTMLURL: "https://github.com/test-org/test-repo/pull/123", + Title: "Test PR", + WorkflowState: "awaiting_review", + } + + err := manager.sendDMNow(ctx, "T123", "U001", "C123", "test-channel", pr) + if err != nil { + t.Fatalf("unexpected error sending DM: %v", err) + } + + if !dmSent { + t.Error("expected DM to be sent") + } + + // Verify message contains PR info + if sentMessage == "" { + t.Error("expected non-empty message") + } +} + +// TestSendDMNow_UserInactive tests sendDMNow skips inactive users. +func TestSendDMNow_UserInactive(t *testing.T) { + dmSent := false + + mockClient := &mockSlackClient{ + isUserActiveFunc: func(ctx context.Context, userID string) bool { + return false // User inactive + }, + sendDirectMessageFunc: func(ctx context.Context, userID, text string) (string, string, error) { + dmSent = true + return "D123", "1234567890.123456", nil + }, + } + + mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} + + manager := &Manager{ + slackManager: mockSlackMgr, + Tracker: &NotificationTracker{ + lastDM: make(map[string]time.Time), + lastDaily: make(map[string]time.Time), + lastChannelNotification: make(map[string]time.Time), + lastUserPRChannelTag: make(map[string]TagInfo), + }, + } + + ctx := context.Background() + pr := PRInfo{ + Owner: "test-org", + Repo: "test-repo", + Number: 123, + HTMLURL: "https://github.com/test-org/test-repo/pull/123", + } + + err := manager.sendDMNow(ctx, "T123", "U001", "C123", "test-channel", pr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // DM should NOT be sent (user inactive) + if dmSent { + t.Error("DM should not be sent to inactive user") + } +} + +// TestSendDMNow_AntiSpam tests sendDMNow respects anti-spam limits. +func TestSendDMNow_AntiSpam(t *testing.T) { + dmSent := false + + mockClient := &mockSlackClient{ + isUserActiveFunc: func(ctx context.Context, userID string) bool { + return true + }, + sendDirectMessageFunc: func(ctx context.Context, userID, text string) (string, string, error) { + dmSent = true + return "D123", "1234567890.123456", nil + }, + } + + mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} + + manager := &Manager{ + slackManager: mockSlackMgr, + Tracker: &NotificationTracker{ + lastDM: make(map[string]time.Time), + lastDaily: make(map[string]time.Time), + lastChannelNotification: make(map[string]time.Time), + lastUserPRChannelTag: make(map[string]TagInfo), + }, + } + + // Record a recent DM (30 seconds ago) + manager.Tracker.lastDM["T123:U001"] = time.Now().Add(-30 * time.Second) + + ctx := context.Background() + pr := PRInfo{ + Owner: "test-org", + Repo: "test-repo", + Number: 123, + HTMLURL: "https://github.com/test-org/test-repo/pull/123", + } + + err := manager.sendDMNow(ctx, "T123", "U001", "C123", "test-channel", pr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // DM should NOT be sent (anti-spam protection) + if dmSent { + t.Error("DM should not be sent due to anti-spam protection (< 1 minute since last DM)") + } +} + +// TestSendDMNow_SlackError tests error handling when Slack API fails. +func TestSendDMNow_SlackError(t *testing.T) { + mockClient := &mockSlackClient{ + isUserActiveFunc: func(ctx context.Context, userID string) bool { + return true + }, + sendDirectMessageFunc: func(ctx context.Context, userID, text string) (string, string, error) { + return "", "", errors.New("slack API error") + }, + } + + mockSlackMgr := &mockSlackManagerWithClient{client: mockClient} + + manager := &Manager{ + slackManager: mockSlackMgr, + Tracker: &NotificationTracker{ + lastDM: make(map[string]time.Time), + lastDaily: make(map[string]time.Time), + lastChannelNotification: make(map[string]time.Time), + lastUserPRChannelTag: make(map[string]TagInfo), + }, + } + + ctx := context.Background() + pr := PRInfo{ + Owner: "test-org", + Repo: "test-repo", + Number: 123, + HTMLURL: "https://github.com/test-org/test-repo/pull/123", + } + + err := manager.sendDMNow(ctx, "T123", "U001", "C123", "test-channel", pr) + if err == nil { + t.Error("expected error when Slack API fails") + } +} diff --git a/pkg/notify/run_test.go b/pkg/notify/run_test.go index a777535..09d4d94 100644 --- a/pkg/notify/run_test.go +++ b/pkg/notify/run_test.go @@ -4,8 +4,25 @@ import ( "context" "testing" "time" + + "github.com/codeGROOVE-dev/slacker/pkg/state" ) +// mockStore implements Store interface for testing. +type mockStore struct{} + +func (m *mockStore) QueuePendingDM(dm state.PendingDM) error { + return nil +} + +func (m *mockStore) GetPendingDMs(before time.Time) ([]state.PendingDM, error) { + return nil, nil +} + +func (m *mockStore) RemovePendingDM(id string) error { + return nil +} + // TestRun_CleanupTicker tests that Run calls Tracker.Cleanup periodically. func TestRun_CleanupTicker(t *testing.T) { cleanupCalled := false @@ -57,8 +74,9 @@ func TestRun_CleanupTicker(t *testing.T) { func TestRun_ContextCancellation(t *testing.T) { mockSlackMgr := &mockSlackManager{} mockConfigMgr := &mockConfigManager{} + mockSt := &mockStore{} - manager := New(mockSlackMgr, mockConfigMgr) + manager := New(mockSlackMgr, mockConfigMgr, mockSt) ctx, cancel := context.WithCancel(context.Background()) @@ -77,8 +95,9 @@ func TestRun_ContextCancellation(t *testing.T) { func TestRun_TickerFires(t *testing.T) { mockSlackMgr := &mockSlackManager{} mockConfigMgr := &mockConfigManager{} + mockSt := &mockStore{} - manager := New(mockSlackMgr, mockConfigMgr) + manager := New(mockSlackMgr, mockConfigMgr, mockSt) // Run for a short time to allow ticker to fire ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond) diff --git a/pkg/state/datastore.go b/pkg/state/datastore.go index 26b2bbb..e10df28 100644 --- a/pkg/state/datastore.go +++ b/pkg/state/datastore.go @@ -26,6 +26,7 @@ const ( kindDigest = "SlackerDigest" kindEvent = "SlackerEvent" kindNotify = "SlackerNotification" + kindPendingDM = "SlackerPendingDM" ) // ErrAlreadyProcessed indicates an event was already processed by another instance. @@ -76,6 +77,25 @@ type notifyEntity struct { PRURL string `datastore:"pr_url"` } +// Pending DM entity. +type pendingDMEntity struct { + WorkspaceID string `datastore:"workspace_id"` + UserID string `datastore:"user_id"` + PROwner string `datastore:"pr_owner"` + PRRepo string `datastore:"pr_repo"` + PRNumber int `datastore:"pr_number"` + PRURL string `datastore:"pr_url"` + PRTitle string `datastore:"pr_title,noindex"` + PRAuthor string `datastore:"pr_author"` + PRState string `datastore:"pr_state"` + WorkflowState string `datastore:"workflow_state"` + NextActions string `datastore:"next_actions,noindex"` + ChannelID string `datastore:"channel_id"` + ChannelName string `datastore:"channel_name"` + QueuedAt time.Time `datastore:"queued_at"` + SendAfter time.Time `datastore:"send_after"` +} + // NewDatastoreStore creates a new Datastore-backed store with in-memory cache. // The databaseID parameter specifies which Datastore database to use (e.g., "slacker", "(default)"). func NewDatastoreStore(ctx context.Context, projectID, databaseID string) (*DatastoreStore, error) { @@ -660,6 +680,115 @@ func (s *DatastoreStore) RecordNotification(prURL string, notifiedAt time.Time) return nil } +// QueuePendingDM adds a pending DM to both memory and Datastore. +func (s *DatastoreStore) QueuePendingDM(dm PendingDM) error { + // Always update memory cache + if err := s.memory.QueuePendingDM(dm); err != nil { + return err + } + + // Skip Datastore if disabled + if s.disabled || s.ds == nil { + return nil + } + + ctx := context.Background() + key := datastore.NameKey(kindPendingDM, dm.ID, nil) + entity := pendingDMEntity{ + WorkspaceID: dm.WorkspaceID, + UserID: dm.UserID, + PROwner: dm.PROwner, + PRRepo: dm.PRRepo, + PRNumber: dm.PRNumber, + PRURL: dm.PRURL, + PRTitle: dm.PRTitle, + PRAuthor: dm.PRAuthor, + PRState: dm.PRState, + WorkflowState: dm.WorkflowState, + NextActions: dm.NextActions, + ChannelID: dm.ChannelID, + ChannelName: dm.ChannelName, + QueuedAt: dm.QueuedAt, + SendAfter: dm.SendAfter, + } + + _, err := s.ds.Put(ctx, key, &entity) + return err +} + +// GetPendingDMs returns all pending DMs that should be sent. +// Reads from memory cache first, falls back to Datastore if empty. +func (s *DatastoreStore) GetPendingDMs(before time.Time) ([]PendingDM, error) { + // Try memory first + dms, err := s.memory.GetPendingDMs(before) + if err == nil && len(dms) > 0 { + return dms, nil + } + + // Skip Datastore if disabled + if s.disabled || s.ds == nil { + return dms, nil + } + + // Query Datastore for pending DMs + ctx := context.Background() + query := datastore.NewQuery(kindPendingDM). + Filter("send_after <=", before). + Limit(100) + + var entities []pendingDMEntity + keys, err := s.ds.GetAll(ctx, query, &entities) + if err != nil { + slog.Warn("failed to query pending DMs from Datastore", "error", err) + return dms, nil // Return memory results even if Datastore fails + } + + // Convert entities to PendingDM structs and update memory cache + result := make([]PendingDM, 0, len(entities)) + for i, entity := range entities { + dm := PendingDM{ + ID: keys[i].Name, + WorkspaceID: entity.WorkspaceID, + UserID: entity.UserID, + PROwner: entity.PROwner, + PRRepo: entity.PRRepo, + PRNumber: entity.PRNumber, + PRURL: entity.PRURL, + PRTitle: entity.PRTitle, + PRAuthor: entity.PRAuthor, + PRState: entity.PRState, + WorkflowState: entity.WorkflowState, + NextActions: entity.NextActions, + ChannelID: entity.ChannelID, + ChannelName: entity.ChannelName, + QueuedAt: entity.QueuedAt, + SendAfter: entity.SendAfter, + } + result = append(result, dm) + // Update memory cache + _ = s.memory.QueuePendingDM(dm) + } + + return result, nil +} + +// RemovePendingDM removes a pending DM from both memory and Datastore. +func (s *DatastoreStore) RemovePendingDM(id string) error { + // Always remove from memory + if err := s.memory.RemovePendingDM(id); err != nil { + return err + } + + // Skip Datastore if disabled + if s.disabled || s.ds == nil { + return nil + } + + ctx := context.Background() + key := datastore.NameKey(kindPendingDM, id, nil) + return s.ds.Delete(ctx, key) +} + // Cleanup removes old data from both stores. func (s *DatastoreStore) Cleanup() error { // Always cleanup memory diff --git a/pkg/state/datastore_test.go b/pkg/state/datastore_test.go index 117b8d9..567188c 100644 --- a/pkg/state/datastore_test.go +++ b/pkg/state/datastore_test.go @@ -541,3 +541,264 @@ func TestDatastoreStore_MemoryFirstFallback(t *testing.T) { time.Sleep(500 * time.Millisecond) store.Close() } + +func TestDatastoreStore_PendingDMOperations(t *testing.T) { + client, cleanup := datastore.NewMockClient(t) + defer cleanup() + + store := &DatastoreStore{ + ds: client, + memory: NewMemoryStore(), + disabled: false, + } + defer store.Close() + + // Test retrieval when no pending DMs exist + pending, err := store.GetPendingDMs(time.Now()) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + if len(pending) != 0 { + t.Errorf("expected 0 pending DMs, got %d", len(pending)) + } + + // Queue a DM that should be sent now + now := time.Now().Truncate(time.Millisecond) + dm1 := PendingDM{ + ID: "dm-001", + WorkspaceID: "T123", + UserID: "U001", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 123, + PRURL: "https://github.com/test-org/test-repo/pull/123", + PRTitle: "Test PR", + PRAuthor: "author", + PRState: "open", + WorkflowState: "awaiting_review", + NextActions: `{"U001":{"kind":"review"}}`, + ChannelID: "C123", + ChannelName: "test-channel", + QueuedAt: now.Add(-10 * time.Minute), + SendAfter: now.Add(-5 * time.Minute), // 5 minutes ago - ready to send + } + + err = store.QueuePendingDM(dm1) + if err != nil { + t.Fatalf("unexpected error queueing DM: %v", err) + } + + // Queue a DM that should be sent in the future + dm2 := PendingDM{ + ID: "dm-002", + WorkspaceID: "T123", + UserID: "U002", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 456, + PRURL: "https://github.com/test-org/test-repo/pull/456", + PRTitle: "Another PR", + PRAuthor: "author2", + PRState: "open", + WorkflowState: "tests_broken", + NextActions: `{"U002":{"kind":"fix"}}`, + ChannelID: "C456", + ChannelName: "another-channel", + QueuedAt: now, + SendAfter: now.Add(10 * time.Minute), // 10 minutes from now - not ready yet + } + + err = store.QueuePendingDM(dm2) + if err != nil { + t.Fatalf("unexpected error queueing second DM: %v", err) + } + + // Get pending DMs from memory cache (fast path) + pending, err = store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + // Only dm1 should be returned (dm2 is in the future) + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM from memory, got %d", len(pending)) + } + + if pending[0].ID != "dm-001" { + t.Errorf("expected DM ID dm-001, got %s", pending[0].ID) + } + + // Give async Datastore writes time to complete + time.Sleep(200 * time.Millisecond) + + // Clear memory cache to test Datastore retrieval + store.memory = NewMemoryStore() + + // Get pending DMs from Datastore + // Note: The mock Datastore may return all DMs regardless of filter + // In production, the filter would work correctly + future := now.Add(15 * time.Minute) + pending, err = store.GetPendingDMs(future) + if err != nil { + t.Fatalf("unexpected error getting pending DMs from Datastore: %v", err) + } + + // Should get both DMs from Datastore + if len(pending) < 2 { + t.Fatalf("expected at least 2 pending DMs from Datastore, got %d", len(pending)) + } + + // Verify both DMs are present + dmIDs := make(map[string]bool) + for _, dm := range pending { + dmIDs[dm.ID] = true + } + if !dmIDs["dm-001"] { + t.Error("expected dm-001 to be in Datastore") + } + if !dmIDs["dm-002"] { + t.Error("expected dm-002 to be in Datastore") + } + + // Remove dm-001 + err = store.RemovePendingDM("dm-001") + if err != nil { + t.Fatalf("unexpected error removing DM: %v", err) + } + + // Give async Datastore delete time to complete + time.Sleep(200 * time.Millisecond) + + // Clear memory again to force Datastore query + store.memory = NewMemoryStore() + + // Now only dm-002 should remain in Datastore (query in future to catch it) + futureLater := now.Add(15 * time.Minute) + pending, err = store.GetPendingDMs(futureLater) + if err != nil { + t.Fatalf("unexpected error getting pending DMs after removal: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM after removal, got %d", len(pending)) + } + + if pending[0].ID != "dm-002" { + t.Errorf("expected remaining DM to be dm-002, got %s", pending[0].ID) + } + + // Remove non-existent DM should not error + err = store.RemovePendingDM("dm-999") + if err != nil { + t.Errorf("unexpected error removing non-existent DM: %v", err) + } +} + +func TestDatastoreStore_PendingDMDisabledMode(t *testing.T) { + // Create store in disabled mode (no Datastore client) + store := &DatastoreStore{ + ds: nil, + memory: NewMemoryStore(), + disabled: true, + } + defer store.Close() + + now := time.Now() + + // Queue DM in memory-only mode + dm := PendingDM{ + ID: "dm-001", + UserID: "U001", + PRURL: "https://github.com/test/repo/pull/123", + SendAfter: now.Add(-5 * time.Minute), + } + + err := store.QueuePendingDM(dm) + if err != nil { + t.Fatalf("unexpected error queueing DM in disabled mode: %v", err) + } + + // Get pending DMs from memory + pending, err := store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs in disabled mode: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM in disabled mode, got %d", len(pending)) + } + + // Remove DM + err = store.RemovePendingDM("dm-001") + if err != nil { + t.Fatalf("unexpected error removing DM in disabled mode: %v", err) + } + + // Verify removed + pending, err = store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs after removal: %v", err) + } + + if len(pending) != 0 { + t.Errorf("expected 0 pending DMs after removal, got %d", len(pending)) + } +} + +func TestDatastoreStore_PendingDMCleanup(t *testing.T) { + client, cleanup := datastore.NewMockClient(t) + defer cleanup() + + store := &DatastoreStore{ + ds: client, + memory: NewMemoryStore(), + disabled: false, + } + defer store.Close() + + now := time.Now().Truncate(time.Millisecond) + oldTime := now.Add(-100 * 24 * time.Hour) // 100 days ago + + // Add an old pending DM (>90 days) + oldDM := PendingDM{ + ID: "old-dm", + UserID: "U001", + PRURL: "https://github.com/test/repo/pull/1", + QueuedAt: oldTime, + SendAfter: oldTime, + } + store.QueuePendingDM(oldDM) + + // Add a recent pending DM + recentDM := PendingDM{ + ID: "recent-dm", + UserID: "U002", + PRURL: "https://github.com/test/repo/pull/2", + QueuedAt: now, + SendAfter: now.Add(10 * time.Minute), + } + store.QueuePendingDM(recentDM) + + // Give async writes time to complete + time.Sleep(200 * time.Millisecond) + + // Run cleanup + err := store.Cleanup() + if err != nil { + t.Fatalf("unexpected error during cleanup: %v", err) + } + + // Verify old DM was removed from memory + pending, err := store.GetPendingDMs(now.Add(24 * time.Hour)) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM after cleanup, got %d", len(pending)) + } + + if pending[0].ID != "recent-dm" { + t.Errorf("expected recent-dm to remain, got %s", pending[0].ID) + } +} diff --git a/pkg/state/json.go b/pkg/state/json.go index 3055068..bceacdb 100644 --- a/pkg/state/json.go +++ b/pkg/state/json.go @@ -24,7 +24,8 @@ type JSONStore struct { digests map[string]time.Time events map[string]time.Time notifications map[string]time.Time - modified bool // Track if we need to save + pendingDMs map[string]PendingDM // Pending DMs to be sent + modified bool // Track if we need to save } // NewJSONStore creates a new JSON-based state store. @@ -47,6 +48,7 @@ func NewJSONStore() (*JSONStore, error) { digests: make(map[string]time.Time), events: make(map[string]time.Time), notifications: make(map[string]time.Time), + pendingDMs: make(map[string]PendingDM), modified: false, } @@ -325,13 +327,23 @@ func (s *JSONStore) Cleanup() error { } } - if cleanedThreads+cleanedDMs+cleanedDMMessages+cleanedDigests+cleanedEvents > 0 { + // Clean up old pending DMs (>7 days or already past send time by >1 day) + cleanedPendingDMs := 0 + for key, dm := range s.pendingDMs { + if now.Sub(dm.QueuedAt) > 7*24*time.Hour || now.Sub(dm.SendAfter) > 24*time.Hour { + delete(s.pendingDMs, key) + cleanedPendingDMs++ + } + } + + if cleanedThreads+cleanedDMs+cleanedDMMessages+cleanedDigests+cleanedEvents+cleanedPendingDMs > 0 { slog.Info("cleaned up old state", "threads", cleanedThreads, "dms", cleanedDMs, "dm_messages", cleanedDMMessages, "digests", cleanedDigests, - "events", cleanedEvents) + "events", cleanedEvents, + "pending_dms", cleanedPendingDMs) s.modified = true return s.save() } @@ -357,6 +369,7 @@ type persistentState struct { Digests map[string]time.Time `json:"digests"` Events map[string]time.Time `json:"events"` Notifications map[string]time.Time `json:"notifications"` + PendingDMs map[string]PendingDM `json:"pending_dms"` } // save persists state to disk. @@ -373,6 +386,7 @@ func (s *JSONStore) save() error { Digests: s.digests, Events: s.events, Notifications: s.notifications, + PendingDMs: s.pendingDMs, } data, err := json.MarshalIndent(state, "", " ") @@ -422,6 +436,7 @@ func (s *JSONStore) load() error { s.digests = state.Digests s.events = state.Events s.notifications = state.Notifications + s.pendingDMs = state.PendingDMs if s.threads == nil { s.threads = make(map[string]ThreadInfo) @@ -441,6 +456,9 @@ func (s *JSONStore) load() error { if s.notifications == nil { s.notifications = make(map[string]time.Time) } + if s.pendingDMs == nil { + s.pendingDMs = make(map[string]PendingDM) + } slog.Info("loaded state from disk", "threads", len(s.threads), @@ -448,7 +466,48 @@ func (s *JSONStore) load() error { "dm_messages", len(s.dmMessages), "digests", len(s.digests), "events", len(s.events), - "notifications", len(s.notifications)) + "notifications", len(s.notifications), + "pending_dms", len(s.pendingDMs)) return nil } + +// QueuePendingDM adds a DM to the pending queue. +func (s *JSONStore) QueuePendingDM(dm PendingDM) error { + s.mu.Lock() + defer s.mu.Unlock() + s.pendingDMs[dm.ID] = dm + s.modified = true + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist pending DM to JSON file", "dm_id", dm.ID, "error", err) + } + return nil +} + +// GetPendingDMs returns all pending DMs that should be sent (SendAfter <= before). +func (s *JSONStore) GetPendingDMs(before time.Time) ([]PendingDM, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var result []PendingDM + for _, dm := range s.pendingDMs { + if !dm.SendAfter.After(before) { + result = append(result, dm) + } + } + return result, nil +} + +// RemovePendingDM removes a pending DM from the queue. +func (s *JSONStore) RemovePendingDM(id string) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.pendingDMs, id) + s.modified = true + // Best-effort persistence to JSON file for restart recovery + if err := s.save(); err != nil { + slog.Error("failed to persist pending DM removal to JSON file", "dm_id", id, "error", err) + } + return nil +} diff --git a/pkg/state/json_test.go b/pkg/state/json_test.go index ab30a6a..177008a 100644 --- a/pkg/state/json_test.go +++ b/pkg/state/json_test.go @@ -315,3 +315,290 @@ func TestJSONStore_SaveLoad_RoundTrip(t *testing.T) { t.Errorf("expected 1 digest, got %d", len(store2.digests)) } } + +func TestJSONStore_PendingDMOperations(t *testing.T) { + tempDir, err := os.MkdirTemp("", "slacker-state-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + store := &JSONStore{ + baseDir: tempDir, + threads: make(map[string]ThreadInfo), + dms: make(map[string]time.Time), + dmMessages: make(map[string]DMInfo), + digests: make(map[string]time.Time), + events: make(map[string]time.Time), + notifications: make(map[string]time.Time), + pendingDMs: make(map[string]PendingDM), + } + + // Test retrieval when no pending DMs exist + pending, err := store.GetPendingDMs(time.Now()) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + if len(pending) != 0 { + t.Errorf("expected 0 pending DMs, got %d", len(pending)) + } + + // Queue a DM that should be sent now + now := time.Now() + dm1 := PendingDM{ + ID: "dm-001", + WorkspaceID: "T123", + UserID: "U001", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 123, + PRURL: "https://github.com/test-org/test-repo/pull/123", + PRTitle: "Test PR", + PRAuthor: "author", + PRState: "open", + WorkflowState: "awaiting_review", + NextActions: `{"U001":{"kind":"review"}}`, + ChannelID: "C123", + ChannelName: "test-channel", + QueuedAt: now.Add(-10 * time.Minute), + SendAfter: now.Add(-5 * time.Minute), // 5 minutes ago - ready to send + } + + err = store.QueuePendingDM(dm1) + if err != nil { + t.Fatalf("unexpected error queueing DM: %v", err) + } + + // Queue a DM that should be sent in the future + dm2 := PendingDM{ + ID: "dm-002", + WorkspaceID: "T123", + UserID: "U002", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 456, + PRURL: "https://github.com/test-org/test-repo/pull/456", + PRTitle: "Another PR", + PRAuthor: "author2", + PRState: "open", + WorkflowState: "tests_broken", + NextActions: `{"U002":{"kind":"fix"}}`, + ChannelID: "C456", + ChannelName: "another-channel", + QueuedAt: now, + SendAfter: now.Add(10 * time.Minute), // 10 minutes from now - not ready yet + } + + err = store.QueuePendingDM(dm2) + if err != nil { + t.Fatalf("unexpected error queueing second DM: %v", err) + } + + // Get pending DMs that are ready to send + pending, err = store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + // Only dm1 should be returned (dm2 is in the future) + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM, got %d", len(pending)) + } + + if pending[0].ID != "dm-001" { + t.Errorf("expected DM ID dm-001, got %s", pending[0].ID) + } + if pending[0].UserID != "U001" { + t.Errorf("expected UserID U001, got %s", pending[0].UserID) + } + if pending[0].PRNumber != 123 { + t.Errorf("expected PRNumber 123, got %d", pending[0].PRNumber) + } + + // Get pending DMs 15 minutes from now - both should be ready + future := now.Add(15 * time.Minute) + pending, err = store.GetPendingDMs(future) + if err != nil { + t.Fatalf("unexpected error getting future pending DMs: %v", err) + } + + if len(pending) != 2 { + t.Fatalf("expected 2 pending DMs in future, got %d", len(pending)) + } + + // Remove one DM + err = store.RemovePendingDM("dm-001") + if err != nil { + t.Fatalf("unexpected error removing DM: %v", err) + } + + // Now only dm2 should remain + pending, err = store.GetPendingDMs(future) + if err != nil { + t.Fatalf("unexpected error getting pending DMs after removal: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM after removal, got %d", len(pending)) + } + + if pending[0].ID != "dm-002" { + t.Errorf("expected remaining DM to be dm-002, got %s", pending[0].ID) + } + + // Remove non-existent DM should not error + err = store.RemovePendingDM("dm-999") + if err != nil { + t.Errorf("unexpected error removing non-existent DM: %v", err) + } +} + +func TestJSONStore_PendingDMPersistence(t *testing.T) { + tempDir, err := os.MkdirTemp("", "slacker-state-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + // Create first store instance + store1 := &JSONStore{ + baseDir: tempDir, + threads: make(map[string]ThreadInfo), + dms: make(map[string]time.Time), + dmMessages: make(map[string]DMInfo), + digests: make(map[string]time.Time), + events: make(map[string]time.Time), + notifications: make(map[string]time.Time), + pendingDMs: make(map[string]PendingDM), + } + + // Queue some pending DMs + now := time.Now() + dm1 := PendingDM{ + ID: "dm-001", + UserID: "U001", + PRURL: "https://github.com/test/repo/pull/123", + PRTitle: "Test PR", + SendAfter: now.Add(5 * time.Minute), + } + dm2 := PendingDM{ + ID: "dm-002", + UserID: "U002", + PRURL: "https://github.com/test/repo/pull/456", + PRTitle: "Another PR", + SendAfter: now.Add(10 * time.Minute), + } + + store1.QueuePendingDM(dm1) + store1.QueuePendingDM(dm2) + + // Save to disk (happens automatically in QueuePendingDM via modified flag) + err = store1.save() + if err != nil { + t.Fatalf("unexpected error saving: %v", err) + } + + // Create second store instance (simulates restart) + store2 := &JSONStore{ + baseDir: tempDir, + threads: make(map[string]ThreadInfo), + dms: make(map[string]time.Time), + dmMessages: make(map[string]DMInfo), + digests: make(map[string]time.Time), + events: make(map[string]time.Time), + notifications: make(map[string]time.Time), + pendingDMs: make(map[string]PendingDM), + } + + // Load from disk + err = store2.load() + if err != nil { + t.Fatalf("unexpected error loading: %v", err) + } + + // Verify pending DMs persisted + future := now.Add(15 * time.Minute) + pending, err := store2.GetPendingDMs(future) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + if len(pending) != 2 { + t.Fatalf("expected 2 pending DMs after reload, got %d", len(pending)) + } + + // Verify the data matches + dmMap := make(map[string]PendingDM) + for _, dm := range pending { + dmMap[dm.ID] = dm + } + + if dmMap["dm-001"].UserID != "U001" { + t.Errorf("expected UserID U001 for dm-001, got %s", dmMap["dm-001"].UserID) + } + if dmMap["dm-002"].UserID != "U002" { + t.Errorf("expected UserID U002 for dm-002, got %s", dmMap["dm-002"].UserID) + } +} + +func TestJSONStore_PendingDMCleanup(t *testing.T) { + tempDir, err := os.MkdirTemp("", "slacker-state-test-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tempDir) + + store := &JSONStore{ + baseDir: tempDir, + threads: make(map[string]ThreadInfo), + dms: make(map[string]time.Time), + dmMessages: make(map[string]DMInfo), + digests: make(map[string]time.Time), + events: make(map[string]time.Time), + notifications: make(map[string]time.Time), + pendingDMs: make(map[string]PendingDM), + } + + now := time.Now() + oldTime := now.Add(-100 * 24 * time.Hour) // 100 days ago + + // Add an old pending DM (>90 days) + oldDM := PendingDM{ + ID: "old-dm", + UserID: "U001", + PRURL: "https://github.com/test/repo/pull/1", + QueuedAt: oldTime, + SendAfter: oldTime, + } + store.QueuePendingDM(oldDM) + + // Add a recent pending DM + recentDM := PendingDM{ + ID: "recent-dm", + UserID: "U002", + PRURL: "https://github.com/test/repo/pull/2", + QueuedAt: now, + SendAfter: now.Add(10 * time.Minute), + } + store.QueuePendingDM(recentDM) + + // Run cleanup + err = store.Cleanup() + if err != nil { + t.Fatalf("unexpected error during cleanup: %v", err) + } + + // Verify old DM was removed + pending, err := store.GetPendingDMs(now.Add(24 * time.Hour)) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM after cleanup, got %d", len(pending)) + } + + if pending[0].ID != "recent-dm" { + t.Errorf("expected recent-dm to remain, got %s", pending[0].ID) + } +} diff --git a/pkg/state/memory.go b/pkg/state/memory.go index e796dc7..7268c32 100644 --- a/pkg/state/memory.go +++ b/pkg/state/memory.go @@ -18,6 +18,7 @@ type MemoryStore struct { digests map[string]time.Time events map[string]time.Time notifications map[string]time.Time + pendingDMs map[string]PendingDM // Pending DMs to be sent } // NewMemoryStore creates a new in-memory-only state store. @@ -29,6 +30,7 @@ func NewMemoryStore() *MemoryStore { digests: make(map[string]time.Time), events: make(map[string]time.Time), notifications: make(map[string]time.Time), + pendingDMs: make(map[string]PendingDM), } } @@ -200,6 +202,43 @@ func (s *MemoryStore) Cleanup() error { } } + // Clean up old pending DMs (>7 days or already past send time by >1 day) + for key, dm := range s.pendingDMs { + if now.Sub(dm.QueuedAt) > 7*24*time.Hour || now.Sub(dm.SendAfter) > 24*time.Hour { + delete(s.pendingDMs, key) + } + } + + return nil +} + +// QueuePendingDM adds a DM to the pending queue. +func (s *MemoryStore) QueuePendingDM(dm PendingDM) error { + s.mu.Lock() + defer s.mu.Unlock() + s.pendingDMs[dm.ID] = dm + return nil +} + +// GetPendingDMs returns all pending DMs that should be sent (SendAfter <= before). +func (s *MemoryStore) GetPendingDMs(before time.Time) ([]PendingDM, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var result []PendingDM + for _, dm := range s.pendingDMs { + if !dm.SendAfter.After(before) { + result = append(result, dm) + } + } + return result, nil +} + +// RemovePendingDM removes a pending DM from the queue. +func (s *MemoryStore) RemovePendingDM(id string) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.pendingDMs, id) return nil } diff --git a/pkg/state/memory_test.go b/pkg/state/memory_test.go index a96fee7..62e3f46 100644 --- a/pkg/state/memory_test.go +++ b/pkg/state/memory_test.go @@ -1,6 +1,7 @@ package state import ( + "fmt" "testing" "time" ) @@ -347,3 +348,233 @@ func TestClose(t *testing.T) { t.Errorf("unexpected error closing store: %v", err) } } + +func TestPendingDMOperations(t *testing.T) { + store := NewMemoryStore() + + // Test retrieval when no pending DMs exist + pending, err := store.GetPendingDMs(time.Now()) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + if len(pending) != 0 { + t.Errorf("expected 0 pending DMs, got %d", len(pending)) + } + + // Queue a DM that should be sent now + now := time.Now() + dm1 := PendingDM{ + ID: "dm-001", + WorkspaceID: "T123", + UserID: "U001", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 123, + PRURL: "https://github.com/test-org/test-repo/pull/123", + PRTitle: "Test PR", + PRAuthor: "author", + PRState: "open", + WorkflowState: "awaiting_review", + NextActions: `{"U001":{"kind":"review"}}`, + ChannelID: "C123", + ChannelName: "test-channel", + QueuedAt: now.Add(-10 * time.Minute), + SendAfter: now.Add(-5 * time.Minute), // 5 minutes ago - ready to send + } + + err = store.QueuePendingDM(dm1) + if err != nil { + t.Fatalf("unexpected error queueing DM: %v", err) + } + + // Queue a DM that should be sent in the future + dm2 := PendingDM{ + ID: "dm-002", + WorkspaceID: "T123", + UserID: "U002", + PROwner: "test-org", + PRRepo: "test-repo", + PRNumber: 456, + PRURL: "https://github.com/test-org/test-repo/pull/456", + PRTitle: "Another PR", + PRAuthor: "author2", + PRState: "open", + WorkflowState: "tests_broken", + NextActions: `{"U002":{"kind":"fix"}}`, + ChannelID: "C456", + ChannelName: "another-channel", + QueuedAt: now, + SendAfter: now.Add(10 * time.Minute), // 10 minutes from now - not ready yet + } + + err = store.QueuePendingDM(dm2) + if err != nil { + t.Fatalf("unexpected error queueing second DM: %v", err) + } + + // Get pending DMs that are ready to send + pending, err = store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + // Only dm1 should be returned (dm2 is in the future) + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM, got %d", len(pending)) + } + + if pending[0].ID != "dm-001" { + t.Errorf("expected DM ID dm-001, got %s", pending[0].ID) + } + if pending[0].UserID != "U001" { + t.Errorf("expected UserID U001, got %s", pending[0].UserID) + } + if pending[0].PRNumber != 123 { + t.Errorf("expected PRNumber 123, got %d", pending[0].PRNumber) + } + + // Get pending DMs 15 minutes from now - both should be ready + future := now.Add(15 * time.Minute) + pending, err = store.GetPendingDMs(future) + if err != nil { + t.Fatalf("unexpected error getting future pending DMs: %v", err) + } + + if len(pending) != 2 { + t.Fatalf("expected 2 pending DMs in future, got %d", len(pending)) + } + + // Remove one DM + err = store.RemovePendingDM("dm-001") + if err != nil { + t.Fatalf("unexpected error removing DM: %v", err) + } + + // Now only dm2 should remain + pending, err = store.GetPendingDMs(future) + if err != nil { + t.Fatalf("unexpected error getting pending DMs after removal: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM after removal, got %d", len(pending)) + } + + if pending[0].ID != "dm-002" { + t.Errorf("expected remaining DM to be dm-002, got %s", pending[0].ID) + } + + // Remove non-existent DM should not error + err = store.RemovePendingDM("dm-999") + if err != nil { + t.Errorf("unexpected error removing non-existent DM: %v", err) + } +} + +func TestPendingDMCleanup(t *testing.T) { + store := NewMemoryStore() + + now := time.Now() + oldTime := now.Add(-100 * 24 * time.Hour) // 100 days ago + + // Add an old pending DM (>90 days) + oldDM := PendingDM{ + ID: "old-dm", + UserID: "U001", + PRURL: "https://github.com/test/repo/pull/1", + QueuedAt: oldTime, + SendAfter: oldTime, + } + store.QueuePendingDM(oldDM) + + // Add a recent pending DM + recentDM := PendingDM{ + ID: "recent-dm", + UserID: "U002", + PRURL: "https://github.com/test/repo/pull/2", + QueuedAt: now, + SendAfter: now.Add(10 * time.Minute), + } + store.QueuePendingDM(recentDM) + + // Run cleanup + err := store.Cleanup() + if err != nil { + t.Fatalf("unexpected error during cleanup: %v", err) + } + + // Verify old DM was removed + pending, err := store.GetPendingDMs(now.Add(24 * time.Hour)) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + if len(pending) != 1 { + t.Fatalf("expected 1 pending DM after cleanup, got %d", len(pending)) + } + + if pending[0].ID != "recent-dm" { + t.Errorf("expected recent-dm to remain, got %s", pending[0].ID) + } +} + +func TestPendingDMConcurrency(t *testing.T) { + store := NewMemoryStore() + + now := time.Now() + + // Queue multiple DMs concurrently + done := make(chan bool, 3) + + for i := 0; i < 3; i++ { + go func(index int) { + dm := PendingDM{ + ID: fmt.Sprintf("dm-%d", index), + UserID: fmt.Sprintf("U%03d", index), + PRURL: fmt.Sprintf("https://github.com/test/repo/pull/%d", index), + QueuedAt: now, + SendAfter: now.Add(-1 * time.Minute), + } + store.QueuePendingDM(dm) + done <- true + }(i) + } + + // Wait for all goroutines + for i := 0; i < 3; i++ { + <-done + } + + // Get all pending DMs + pending, err := store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs: %v", err) + } + + if len(pending) != 3 { + t.Fatalf("expected 3 pending DMs, got %d", len(pending)) + } + + // Remove DMs concurrently + for i := 0; i < 3; i++ { + go func(index int) { + store.RemovePendingDM(fmt.Sprintf("dm-%d", index)) + done <- true + }(i) + } + + // Wait for all removals + for i := 0; i < 3; i++ { + <-done + } + + // Verify all removed + pending, err = store.GetPendingDMs(now) + if err != nil { + t.Fatalf("unexpected error getting pending DMs after removal: %v", err) + } + + if len(pending) != 0 { + t.Errorf("expected 0 pending DMs after concurrent removal, got %d", len(pending)) + } +} diff --git a/pkg/state/store.go b/pkg/state/store.go index 78c2d79..1140a17 100644 --- a/pkg/state/store.go +++ b/pkg/state/store.go @@ -24,6 +24,26 @@ type DMInfo struct { MessageText string `json:"message_text"` // Current message text } +// PendingDM represents a DM scheduled to be sent later. +type PendingDM struct { + ID string `json:"id"` // Unique ID for this pending DM + WorkspaceID string `json:"workspace_id"` // Slack workspace ID + UserID string `json:"user_id"` // Slack user ID to DM + PROwner string `json:"pr_owner"` // GitHub PR owner + PRRepo string `json:"pr_repo"` // GitHub PR repo + PRNumber int `json:"pr_number"` // GitHub PR number + PRURL string `json:"pr_url"` // GitHub PR URL + PRTitle string `json:"pr_title"` // PR title + PRAuthor string `json:"pr_author"` // PR author + PRState string `json:"pr_state"` // Deprecated simplified state + WorkflowState string `json:"workflow_state"` // Workflow state from turnclient + NextActions string `json:"next_actions"` // Serialized NextAction map (JSON) + ChannelID string `json:"channel_id"` // Channel where user was tagged + ChannelName string `json:"channel_name"` // Channel name + QueuedAt time.Time `json:"queued_at"` // When this DM was queued + SendAfter time.Time `json:"send_after"` // Send DM after this time +} + // Store provides persistent storage for bot state. // Implementations must be safe for concurrent use. // @@ -54,6 +74,11 @@ type Store interface { LastNotification(prURL string) time.Time RecordNotification(prURL string, notifiedAt time.Time) error + // Pending DM queue - schedule DMs to be sent later + QueuePendingDM(dm PendingDM) error + GetPendingDMs(before time.Time) ([]PendingDM, error) + RemovePendingDM(id string) error + // Cleanup old data Cleanup() error From 2aec85aceb79be10be46bde3521e0cf9c4b0a0ff Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 29 Oct 2025 19:30:42 -0400 Subject: [PATCH 2/6] fix DM config loading --- pkg/config/config.go | 17 ++++++ pkg/config/config_test.go | 106 ++++++++++++++++++++++++++++++++++++++ pkg/notify/notify.go | 10 +++- pkg/notify/prefix_test.go | 8 +++ 4 files changed, 140 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 8c8e058..cc37d49 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -374,6 +374,7 @@ func (m *Manager) LoadConfig(ctx context.Context, org string) error { "team_id": config.Global.TeamID, "email_domain": config.Global.EmailDomain, "daily_reminders": config.Global.DailyReminders, + "reminder_dm_delay": config.Global.ReminderDMDelay, "total_channels": len(config.Channels), "muted_channels": muted, "wildcard_channels": wildcard, @@ -575,20 +576,36 @@ func (m *Manager) ReminderDMDelay(org, channel string) int { config, exists := m.configs[org] if !exists { + slog.Debug("no config for org - using default delay", + logFieldOrg, org, + "default_delay_mins", defaultReminderDMDelayMinutes) return defaultReminderDMDelayMinutes } // Check for channel-specific override if channelConfig, ok := config.Channels[channel]; ok { if channelConfig.ReminderDMDelay != nil { + slog.Debug("using channel-specific reminder delay", + logFieldOrg, org, + "channel", channel, + "delay_mins", *channelConfig.ReminderDMDelay) return *channelConfig.ReminderDMDelay } } // Return global setting (or default if not set) if config.Global.ReminderDMDelay > 0 { + slog.Debug("using global reminder delay", + logFieldOrg, org, + "channel", channel, + "delay_mins", config.Global.ReminderDMDelay) return config.Global.ReminderDMDelay } + slog.Debug("global delay is 0 or unset - using default", + logFieldOrg, org, + "channel", channel, + "global_value", config.Global.ReminderDMDelay, + "default_delay_mins", defaultReminderDMDelayMinutes) return defaultReminderDMDelayMinutes } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 55d62c3..bdb6ac7 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -1071,6 +1071,112 @@ channels: [1, 2, 3] } } +func TestManager_LoadConfigCodeGROOVEProdConfig(t *testing.T) { + // Test with actual production config from codeGROOVE-dev/.codeGROOVE/slack.yaml + prodYAML := `global: + team_id: T09CJ7X7T7Y + email_domain: codegroove.dev + reminder_dm_delay: 1 + +channels: + goose: + mute: true + + all-codegroove: + repos: + - "*" + + social: + repos: + - goose + - sprinkler + - slacker +` + + handler := func(w http.ResponseWriter, r *http.Request) { + content := base64.StdEncoding.EncodeToString([]byte(prodYAML)) + encoding := "base64" + response := github.RepositoryContent{ + Type: github.String("file"), + Content: &content, + Encoding: &encoding, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Errorf("failed to encode response: %v", err) + } + } + + client, server := createTestGitHubClient(handler) + defer server.Close() + + m := New() + m.SetGitHubClient("codeGROOVE-dev", client) + + ctx := context.Background() + err := m.LoadConfig(ctx, "codeGROOVE-dev") + if err != nil { + t.Fatalf("unexpected error loading production config: %v", err) + } + + // Verify config was loaded correctly + cfg, exists := m.Config("codeGROOVE-dev") + if !exists { + t.Fatal("expected config to exist after loading") + } + if cfg.Global.TeamID != "T09CJ7X7T7Y" { + t.Errorf("expected TeamID T09CJ7X7T7Y, got %q", cfg.Global.TeamID) + } + if cfg.Global.EmailDomain != "codegroove.dev" { + t.Errorf("expected email domain codegroove.dev, got %q", cfg.Global.EmailDomain) + } + if cfg.Global.ReminderDMDelay != 1 { + t.Errorf("expected reminder delay 1 minute, got %d", cfg.Global.ReminderDMDelay) + } + if len(cfg.Channels) != 3 { + t.Errorf("expected 3 channels, got %d", len(cfg.Channels)) + } + + // Verify goose channel is muted + gooseChannel, exists := cfg.Channels["goose"] + if !exists { + t.Error("expected goose channel to exist") + } + if !gooseChannel.Mute { + t.Error("expected goose channel to be muted") + } + + // Verify all-codegroove has wildcard + allChannel, exists := cfg.Channels["all-codegroove"] + if !exists { + t.Error("expected all-codegroove channel to exist") + } + if len(allChannel.Repos) != 1 || allChannel.Repos[0] != "*" { + t.Errorf("expected all-codegroove to have wildcard repo, got %v", allChannel.Repos) + } + + // Verify social channel repos + socialChannel, exists := cfg.Channels["social"] + if !exists { + t.Error("expected social channel to exist") + } + expectedRepos := []string{"goose", "sprinkler", "slacker"} + if len(socialChannel.Repos) != len(expectedRepos) { + t.Errorf("expected %d repos in social channel, got %d", len(expectedRepos), len(socialChannel.Repos)) + } + for i, repo := range expectedRepos { + if i >= len(socialChannel.Repos) || socialChannel.Repos[i] != repo { + t.Errorf("expected repo %q at index %d in social channel, got %v", repo, i, socialChannel.Repos) + } + } + + // Verify ReminderDMDelay returns correct value + delay := m.ReminderDMDelay("codeGROOVE-dev", "social") + if delay != 1 { + t.Errorf("expected ReminderDMDelay to return 1 minute, got %d", delay) + } +} + func TestManager_LoadConfigEmptyContent(t *testing.T) { handler := func(w http.ResponseWriter, r *http.Request) { // Return a response with nil Content diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index bbd562e..59580f5 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -145,7 +145,15 @@ func emojiFromWorkflowState(workflowState string, nextActions map[string]turn.Ac return ":test_tube:", "?st=tests_running" case string(turn.StateTestedWaitingForAssignment): - // Waiting for reviewers to be assigned + // Check if tests are actually broken despite "TESTED" state + // This can happen if new commits pushed after tests passed + if len(nextActions) > 0 { + action := PrimaryAction(nextActions) + if action == string(turn.ActionFixTests) { + return ":cockroach:", "?st=tests_broken" + } + } + // Tests passed, waiting for reviewers to be assigned return ":shrug:", "?st=awaiting_assignment" case string(turn.StateAssignedWaitingForReview): diff --git a/pkg/notify/prefix_test.go b/pkg/notify/prefix_test.go index 5110704..241f487 100644 --- a/pkg/notify/prefix_test.go +++ b/pkg/notify/prefix_test.go @@ -60,6 +60,14 @@ func TestPrefixForAnalysis(t *testing.T) { }, expectedEmoji: ":shrug:", }, + { + name: "TESTED_WAITING_FOR_ASSIGNMENT with broken tests shows :cockroach:", + workflowState: string(turn.StateTestedWaitingForAssignment), + nextAction: map[string]turn.Action{ + "author": {Kind: turn.ActionFixTests}, + }, + expectedEmoji: ":cockroach:", + }, { name: "ASSIGNED_WAITING_FOR_REVIEW shows :hourglass:", workflowState: string(turn.StateAssignedWaitingForReview), From 3f892156c97c97e2029da78a1e70661da4a3231f Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 29 Oct 2025 19:41:41 -0400 Subject: [PATCH 3/6] Remove 80% minimum for test coverage --- Makefile | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/Makefile b/Makefile index 0aad1e0..00d35f2 100644 --- a/Makefile +++ b/Makefile @@ -21,30 +21,6 @@ test: @echo "" @echo "Coverage by package:" @go test -coverprofile=coverage.out -covermode=atomic ./... 2>&1 | grep -E "coverage:" | awk '{print $$2 "\t" $$5}' | column -t - @echo "" - @echo "Checking for packages below 80% coverage..." - @failed=0; \ - packages=$$(go list ./... | grep -v "/cmd/"); \ - for pkg in $$packages; do \ - output=$$(go test -coverprofile=/dev/null "$$pkg" 2>&1); \ - if echo "$$output" | grep -q "\[no test files\]"; then \ - continue; \ - fi; \ - coverage=$$(echo "$$output" | grep "coverage:" | awk '{print $$5}' | sed 's/%//'); \ - if [ -n "$$coverage" ] && [ "$$coverage" != "statements" ]; then \ - pkg_short=$$(echo "$$pkg" | sed 's|github.com/codeGROOVE-dev/slacker/||'); \ - if [ "$$(echo "$$coverage < 80.0" | bc -l 2>/dev/null || echo 0)" -eq 1 ]; then \ - echo "❌ FAIL: $$pkg_short has $$coverage% coverage (minimum: 80%)"; \ - failed=1; \ - fi; \ - fi; \ - done; \ - if [ $$failed -eq 1 ]; then \ - echo ""; \ - echo "Coverage check failed. All packages must have at least 80% coverage."; \ - exit 1; \ - fi - @echo "✅ All packages meet 80% coverage threshold" # Format code fmt: From 805c60303dd83d1a4fffe5281b71a13d3f327e68 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 29 Oct 2025 19:52:09 -0400 Subject: [PATCH 4/6] Deterministic output --- pkg/notify/notify.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 59580f5..9af178b 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "log/slog" + "sort" "strings" "time" @@ -255,6 +256,9 @@ func formatNextActionsInternal(ctx context.Context, nextActions map[string]turn. // Convert snake_case to space-separated words actionName := strings.ReplaceAll(actionKind, "_", " ") + // Sort users for deterministic output + sort.Strings(users) + // Format user mentions (will be empty if only _system was assigned) userMentions := userMapper.FormatUserMentions(ctx, users, owner, domain) From bbb1fd80e54cc02c0ddd530e8507d57d224273cc Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 29 Oct 2025 20:14:12 -0400 Subject: [PATCH 5/6] simplify the make test rule --- Makefile | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 00d35f2..1de657f 100644 --- a/Makefile +++ b/Makefile @@ -16,11 +16,7 @@ build-registrar: # Run tests with race detection and coverage test: - @echo "Running tests with race detection and coverage..." - @go test -v -race -coverprofile=coverage.out -covermode=atomic ./... - @echo "" - @echo "Coverage by package:" - @go test -coverprofile=coverage.out -covermode=atomic ./... 2>&1 | grep -E "coverage:" | awk '{print $$2 "\t" $$5}' | column -t + go test -v -race -cover ./... # Format code fmt: From fccd1f2d7ff05afbd8bdb61bec617a509a9757a3 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Wed, 29 Oct 2025 21:25:14 -0400 Subject: [PATCH 6/6] upgrade to go stable --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 03510bb..b72f45b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: 'stable' cache: true - name: Install dependencies