Skip to content

Commit 4574295

Browse files
Thomas StrombergThomas Stromberg
authored andcommitted
update status of DM'd PRs
1 parent 067ed15 commit 4574295

File tree

5 files changed

+166
-38
lines changed

5 files changed

+166
-38
lines changed

cmd/server/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
167167
RecordDM(userID, prURL string, sentAt time.Time) error
168168
DMMessage(userID, prURL string) (state.DMInfo, bool)
169169
SaveDMMessage(userID, prURL string, info state.DMInfo) error
170+
ListDMUsers(prURL string) []string
170171
LastDigest(userID, date string) (time.Time, bool)
171172
RecordDigest(userID, date string, sentAt time.Time) error
172173
WasProcessed(eventKey string) bool
@@ -684,6 +685,7 @@ func runBotCoordinators(
684685
RecordDM(userID, prURL string, sentAt time.Time) error
685686
DMMessage(userID, prURL string) (state.DMInfo, bool)
686687
SaveDMMessage(userID, prURL string, info state.DMInfo) error
688+
ListDMUsers(prURL string) []string
687689
LastDigest(userID, date string) (time.Time, bool)
688690
RecordDigest(userID, date string, sentAt time.Time) error
689691
WasProcessed(eventKey string) bool

internal/bot/bot.go

Lines changed: 87 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type StateStore interface {
102102
SaveThread(owner, repo string, number int, channelID string, info ThreadInfo) error
103103
LastDM(userID, prURL string) (time.Time, bool)
104104
RecordDM(userID, prURL string, sentAt time.Time) error
105+
ListDMUsers(prURL string) []string
105106
WasProcessed(eventKey string) bool
106107
MarkProcessed(eventKey string, ttl time.Duration) error
107108
LastNotification(prURL string) time.Time
@@ -793,18 +794,82 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes
793794
return blockedUsers
794795
}
795796

796-
// updateDMMessagesForPR updates DM messages for all blocked users on a PR.
797-
func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *turn.CheckResponse, owner, repo string, prNumber int, title, author, prState, prURL string) {
798-
if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 {
799-
slog.Debug("no blocked users, skipping DM updates",
800-
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
801-
return
797+
// prUpdateInfo groups PR information for DM updates.
798+
type prUpdateInfo struct {
799+
checkRes *turn.CheckResponse
800+
owner string
801+
repo string
802+
title string
803+
author string
804+
state string
805+
url string
806+
number int
807+
}
808+
809+
// updateDMMessagesForPR updates DM messages for all relevant users on a PR.
810+
// For merged/closed PRs, updates all users who previously received DMs.
811+
// For other states, updates users in NextAction (currently blocked).
812+
func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo) {
813+
owner, repo, prNumber := pr.owner, pr.repo, pr.number
814+
prState, prURL := pr.state, pr.url
815+
checkResult := pr.checkRes
816+
// Determine which users to update based on PR state
817+
var slackUserIDs []string
818+
819+
// For terminal states (merged/closed), update all users who received DMs
820+
if prState == "merged" || prState == "closed" {
821+
slackUserIDs = c.stateStore.ListDMUsers(prURL)
822+
if len(slackUserIDs) == 0 {
823+
slog.Debug("no DM recipients found for merged/closed PR",
824+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
825+
"pr_state", prState)
826+
return
827+
}
828+
slog.Info("updating DMs for merged/closed PR",
829+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
830+
"pr_state", prState,
831+
"dm_recipients", len(slackUserIDs))
832+
} else {
833+
// For other states, update only users who are currently blocked
834+
if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 {
835+
slog.Debug("no blocked users, skipping DM updates",
836+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
837+
return
838+
}
839+
840+
// Map GitHub users to Slack users
841+
domain := c.configManager.Domain(owner)
842+
for githubUser := range checkResult.Analysis.NextAction {
843+
if githubUser == "_system" {
844+
continue
845+
}
846+
847+
slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain)
848+
if err != nil || slackUserID == "" {
849+
slog.Debug("no Slack mapping for GitHub user, skipping",
850+
"github_user", githubUser,
851+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
852+
"error", err)
853+
continue
854+
}
855+
slackUserIDs = append(slackUserIDs, slackUserID)
856+
}
857+
858+
if len(slackUserIDs) == 0 {
859+
slog.Debug("no Slack users found for blocked GitHub users",
860+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
861+
return
862+
}
802863
}
803864

804865
// Format the DM message (same format as initial send)
805866
prefix := notify.PrefixForState(prState)
806867
var action string
807868
switch prState {
869+
case "merged":
870+
action = "merged"
871+
case "closed":
872+
action = "closed"
808873
case "tests_broken":
809874
action = "fix tests"
810875
case "awaiting_review":
@@ -820,41 +885,22 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *tu
820885
message := fmt.Sprintf(
821886
"%s %s <%s|%s#%d> · %s → %s",
822887
prefix,
823-
title,
888+
pr.title,
824889
prURL,
825890
repo,
826891
prNumber,
827-
author,
892+
pr.author,
828893
action,
829894
)
830895

831-
// Update DM for each blocked user
896+
// Update DM for each user
832897
updatedCount := 0
833898
skippedCount := 0
834-
domain := c.configManager.Domain(owner)
835-
836-
for githubUser := range checkResult.Analysis.NextAction {
837-
// Skip _system user
838-
if githubUser == "_system" {
839-
continue
840-
}
841-
842-
// Map GitHub user to Slack user
843-
slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain)
844-
if err != nil || slackUserID == "" {
845-
slog.Debug("no Slack mapping for GitHub user, skipping DM update",
846-
"github_user", githubUser,
847-
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
848-
"error", err)
849-
skippedCount++
850-
continue
851-
}
852899

853-
// Update the DM message
900+
for _, slackUserID := range slackUserIDs {
854901
if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil {
855902
slog.Debug("failed to update DM message",
856903
"user", slackUserID,
857-
"github_user", githubUser,
858904
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
859905
"error", err,
860906
"reason", "DM may not exist or too old")
@@ -869,7 +915,8 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *tu
869915
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
870916
"pr_state", prState,
871917
"updated", updatedCount,
872-
"skipped", skippedCount)
918+
"skipped", skippedCount,
919+
"total_recipients", len(slackUserIDs))
873920
}
874921
}
875922

@@ -1212,7 +1259,16 @@ func (c *Coordinator) processPRForChannel(
12121259
"pr_state", prState)
12131260

12141261
// Also update DM messages for blocked users
1215-
c.updateDMMessagesForPR(ctx, checkResult, owner, repo, prNumber, event.PullRequest.Title, event.PullRequest.User.Login, prState, event.PullRequest.HTMLURL)
1262+
c.updateDMMessagesForPR(ctx, prUpdateInfo{
1263+
owner: owner,
1264+
repo: repo,
1265+
number: prNumber,
1266+
title: event.PullRequest.Title,
1267+
author: event.PullRequest.User.Login,
1268+
state: prState,
1269+
url: event.PullRequest.HTMLURL,
1270+
checkRes: checkResult,
1271+
})
12161272
}
12171273
} else {
12181274
slog.Debug("message already matches expected content, no update needed",

internal/state/datastore.go

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"log/slog"
7+
"strings"
78
"time"
89

910
"cloud.google.com/go/datastore"
@@ -317,13 +318,7 @@ func (s *DatastoreStore) DMMessage(userID, prURL string) (DMInfo, bool) {
317318
}
318319

319320
// Found in Datastore - update memory cache and return
320-
result := DMInfo{
321-
ChannelID: entity.ChannelID,
322-
MessageTS: entity.MessageTS,
323-
MessageText: entity.MessageText,
324-
UpdatedAt: entity.UpdatedAt,
325-
SentAt: entity.SentAt,
326-
}
321+
result := DMInfo(entity)
327322

328323
// Update memory cache async
329324
go func() {
@@ -372,6 +367,59 @@ func (s *DatastoreStore) SaveDMMessage(userID, prURL string, info DMInfo) error
372367
return nil
373368
}
374369

370+
// ListDMUsers returns all user IDs who have received DMs for a given PR.
371+
// Queries both memory cache and Datastore to ensure data persists across restarts.
372+
func (s *DatastoreStore) ListDMUsers(prURL string) []string {
373+
// Check memory cache first (fast path)
374+
users := s.memory.ListDMUsers(prURL)
375+
if len(users) > 0 || s.disabled || s.ds == nil {
376+
return users
377+
}
378+
379+
// PERFORMANCE NOTE: Datastore queries are expensive and lack substring/regex filtering.
380+
// We must fetch all DM message keys and filter client-side. This is acceptable because:
381+
// 1. Uses KeysOnly() to minimize data transfer (no entity bodies)
382+
// 2. Bounded by reasonable limit (1000 - PRs rarely have >100 participants)
383+
// 3. Only runs on cache miss (typically at startup)
384+
// 4. Results populate memory cache for future fast lookups
385+
//
386+
// Alternative considered: Ancestor queries require schema change (breaking existing data)
387+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
388+
defer cancel()
389+
390+
query := datastore.NewQuery(kindDMMessage).KeysOnly().Limit(1000)
391+
keys, err := s.ds.GetAll(ctx, query, nil)
392+
if err != nil {
393+
slog.Warn("failed to query Datastore for DM users",
394+
"pr_url", prURL,
395+
"error", err,
396+
"fallback", "returning empty list")
397+
return nil
398+
}
399+
400+
// Filter keys matching this PR URL
401+
// Key format: "dm:{userID}:{prURL}"
402+
suffix := ":" + prURL
403+
filtered := make([]string, 0, min(len(keys), 50)) // Most PRs have <50 DM recipients
404+
405+
for _, key := range keys {
406+
if strings.HasSuffix(key.Name, suffix) {
407+
// Extract userID from key
408+
parts := strings.SplitN(key.Name, ":", 3)
409+
if len(parts) == 3 {
410+
filtered = append(filtered, parts[1])
411+
}
412+
}
413+
}
414+
415+
slog.Debug("queried Datastore for DM users",
416+
"pr_url", prURL,
417+
"users_found", len(filtered),
418+
"keys_scanned", len(keys))
419+
420+
return filtered
421+
}
422+
375423
// LastDigest retrieves last digest time.
376424
func (s *DatastoreStore) LastDigest(userID, date string) (time.Time, bool) {
377425
// Check memory first

internal/state/json.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,27 @@ func (s *JSONStore) SaveDMMessage(userID, prURL string, info DMInfo) error {
164164
return s.save()
165165
}
166166

167+
// ListDMUsers returns all user IDs who have received DMs for a given PR.
168+
func (s *JSONStore) ListDMUsers(prURL string) []string {
169+
s.mu.RLock()
170+
defer s.mu.RUnlock()
171+
172+
var users []string
173+
suffix := ":" + prURL
174+
175+
for key := range s.dmMessages {
176+
if strings.HasSuffix(key, suffix) {
177+
// Extract userID from key format "dm:{userID}:{prURL}"
178+
parts := strings.SplitN(key, ":", 3)
179+
if len(parts) == 3 {
180+
users = append(users, parts[1])
181+
}
182+
}
183+
}
184+
185+
return users
186+
}
187+
167188
// LastDigest retrieves the last digest timestamp for a user and date.
168189
func (s *JSONStore) LastDigest(userID, date string) (time.Time, bool) {
169190
s.mu.RLock()

internal/state/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Store interface {
3838
// DM message tracking - store DM message info for updating
3939
DMMessage(userID, prURL string) (DMInfo, bool)
4040
SaveDMMessage(userID, prURL string, info DMInfo) error
41+
ListDMUsers(prURL string) []string
4142

4243
// Daily digest tracking - one per user per day
4344
LastDigest(userID, date string) (time.Time, bool)

0 commit comments

Comments
 (0)