Skip to content

Commit a4fbca7

Browse files
Thomas StrombergThomas Stromberg
authored andcommitted
Update states in DMs
1 parent 8f0fcd6 commit a4fbca7

File tree

9 files changed

+516
-118
lines changed

9 files changed

+516
-118
lines changed

cmd/server/main.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,14 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
159159
// Tokens are fetched from GSM based on team_id from org configs.
160160
slackManager := slack.NewManager(cfg.SlackSigningSecret)
161161

162-
// Initialize state store (Datastore + JSON fallback).
162+
// Initialize state store (in-memory + Datastore or JSON for persistence).
163163
var stateStore interface {
164164
Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
165165
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
166166
LastDM(userID, prURL string) (time.Time, bool)
167167
RecordDM(userID, prURL string, sentAt time.Time) error
168+
DMMessage(userID, prURL string) (state.DMInfo, bool)
169+
SaveDMMessage(userID, prURL string, info state.DMInfo) error
168170
LastDigest(userID, date string) (time.Time, bool)
169171
RecordDigest(userID, date string, sentAt time.Time) error
170172
WasProcessed(eventKey string) bool
@@ -196,35 +198,39 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
196198
}
197199

198200
if datastoreDB != "" && projectID != "" {
199-
slog.Info("initializing Cloud Datastore for persistent state",
201+
slog.Info("initializing Cloud Datastore for persistent state (with in-memory cache)",
200202
"project_id", projectID,
201-
"database", datastoreDB)
203+
"database", datastoreDB,
204+
"cache", "in-memory")
202205
var err error
203206
stateStore, err = state.NewDatastoreStore(ctx, projectID, datastoreDB)
204207
if err != nil {
205208
// FATAL: If DATASTORE is explicitly configured, fail startup on initialization errors.
206-
// This prevents silent fallback to JSON-only mode which causes duplicate messages
209+
// This prevents silent fallback to memory-only mode which causes duplicate messages
207210
// during rolling deployments (no cross-instance event deduplication).
208211
slog.Error("FATAL: failed to initialize Cloud Datastore - DATASTORE variable is set but initialization failed",
209212
"project_id", projectID,
210213
"database", datastoreDB,
211-
"error", err)
214+
"error", err,
215+
"note", "Set DATASTORE='' to use JSON files instead")
212216
cancel()
213217
return 1
214218
}
215-
slog.Info("successfully initialized Cloud Datastore",
219+
slog.Info("successfully initialized Cloud Datastore with in-memory cache",
216220
"project_id", projectID,
217-
"database", datastoreDB)
221+
"database", datastoreDB,
222+
"mode", "hybrid: in-memory + Datastore")
218223
} else {
219224
var reason string
220225
if datastoreDB == "" {
221226
reason = "DATASTORE not set"
222227
} else {
223228
reason = "GCP_PROJECT not set and could not auto-detect"
224229
}
225-
slog.Info("using JSON files for state storage",
230+
slog.Info("using JSON files for persistent state (with in-memory cache)",
226231
"path", "os.UserCacheDir()/slacker/state",
227-
"reason", reason)
232+
"reason", reason,
233+
"mode", "hybrid: in-memory + JSON files")
228234
var err error
229235
stateStore, err = state.NewJSONStore()
230236
if err != nil {
@@ -241,6 +247,10 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
241247
}
242248
}()
243249

250+
// Set state store on Slack manager for DM message tracking
251+
slackManager.SetStateStore(stateStore)
252+
slog.Info("configured Slack manager with state store for DM tracking")
253+
244254
// Initialize notification manager for multi-workspace notifications.
245255
notifier := notify.New(slackManager, configManager)
246256

@@ -672,6 +682,8 @@ func runBotCoordinators(
672682
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
673683
LastDM(userID, prURL string) (time.Time, bool)
674684
RecordDM(userID, prURL string, sentAt time.Time) error
685+
DMMessage(userID, prURL string) (state.DMInfo, bool)
686+
SaveDMMessage(userID, prURL string, info state.DMInfo) error
675687
LastDigest(userID, date string) (time.Time, bool)
676688
RecordDigest(userID, date string, sentAt time.Time) error
677689
WasProcessed(eventKey string) bool

internal/bot/bot.go

Lines changed: 141 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,86 @@ func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckRes
793793
return blockedUsers
794794
}
795795

796+
// updateDMMessagesForPR updates DM messages for all blocked users on a PR.
797+
func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, checkResult *turn.CheckResponse, owner, repo string, prNumber int, title, author, prState, prURL string) {
798+
if checkResult == nil || len(checkResult.Analysis.NextAction) == 0 {
799+
slog.Debug("no blocked users, skipping DM updates",
800+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber))
801+
return
802+
}
803+
804+
// Format the DM message (same format as initial send)
805+
prefix := notify.PrefixForState(prState)
806+
var action string
807+
switch prState {
808+
case "tests_broken":
809+
action = "fix tests"
810+
case "awaiting_review":
811+
action = "review"
812+
case "changes_requested":
813+
action = "address feedback"
814+
case "approved":
815+
action = "merge"
816+
default:
817+
action = "attention needed"
818+
}
819+
820+
message := fmt.Sprintf(
821+
"%s %s <%s|%s#%d> · %s → %s",
822+
prefix,
823+
title,
824+
prURL,
825+
repo,
826+
prNumber,
827+
author,
828+
action,
829+
)
830+
831+
// Update DM for each blocked user
832+
updatedCount := 0
833+
skippedCount := 0
834+
domain := c.configManager.Domain(owner)
835+
836+
for githubUser := range checkResult.Analysis.NextAction {
837+
// Skip _system user
838+
if githubUser == "_system" {
839+
continue
840+
}
841+
842+
// Map GitHub user to Slack user
843+
slackUserID, err := c.userMapper.SlackHandle(ctx, githubUser, owner, domain)
844+
if err != nil || slackUserID == "" {
845+
slog.Debug("no Slack mapping for GitHub user, skipping DM update",
846+
"github_user", githubUser,
847+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
848+
"error", err)
849+
skippedCount++
850+
continue
851+
}
852+
853+
// Update the DM message
854+
if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil {
855+
slog.Debug("failed to update DM message",
856+
"user", slackUserID,
857+
"github_user", githubUser,
858+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
859+
"error", err,
860+
"reason", "DM may not exist or too old")
861+
skippedCount++
862+
} else {
863+
updatedCount++
864+
}
865+
}
866+
867+
if updatedCount > 0 {
868+
slog.Info("updated DM messages for PR state change",
869+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
870+
"pr_state", prState,
871+
"updated", updatedCount,
872+
"skipped", skippedCount)
873+
}
874+
}
875+
796876
// formatNextActions formats NextAction map into a compact string like "fix tests: user1, user2; review: user3".
797877
// It groups users by action kind and formats each action as "action_name: user1, user2".
798878
// Multiple actions are separated by semicolons.
@@ -888,10 +968,23 @@ func (c *Coordinator) processChannelsInParallel(
888968
var validChannels []string
889969
for _, channelName := range channels {
890970
channelID := c.slack.ResolveChannelID(ctx, channelName)
971+
972+
// Check if channel resolution failed (returns original name if not found)
973+
if channelID == channelName || (channelName != "" && channelName[0] == '#' && channelID == channelName[1:]) {
974+
slog.Warn("could not resolve channel - may not exist or bot lacks permissions",
975+
"workspace", c.workspaceName,
976+
logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number),
977+
"channel", channelName,
978+
"action_required", "verify channel exists and bot has access")
979+
continue
980+
}
981+
891982
if c.slack.IsBotInChannel(ctx, channelID) {
892983
validChannels = append(validChannels, channelName)
893984
} else {
894985
slog.Warn("skipping channel - bot not a member",
986+
"workspace", c.workspaceName,
987+
logFieldPR, fmt.Sprintf(prFormatString, prCtx.Owner, prCtx.Repo, prCtx.Number),
895988
"channel", channelName,
896989
"channel_id", channelID,
897990
"action_required", "invite bot to channel")
@@ -960,6 +1053,16 @@ func (c *Coordinator) processPRForChannel(
9601053
// Resolve channel name to ID for API calls
9611054
channelID := c.slack.ResolveChannelID(ctx, channelName)
9621055

1056+
// Check if channel resolution failed
1057+
if channelID == channelName || (channelName != "" && channelName[0] == '#' && channelID == channelName[1:]) {
1058+
slog.Warn("could not resolve channel for PR processing",
1059+
"workspace", c.workspaceName,
1060+
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
1061+
"channel", channelName,
1062+
"action_required", "verify channel exists and bot has access")
1063+
return
1064+
}
1065+
9631066
// For display purposes, show both name and ID
9641067
var channelDisplay string
9651068
switch {
@@ -1025,30 +1128,28 @@ func (c *Coordinator) processPRForChannel(
10251128
blockedUsers := c.extractBlockedUsersFromTurnclient(checkResult)
10261129
domain := c.configManager.Domain(owner)
10271130
if len(blockedUsers) > 0 {
1028-
// Run email lookups in background to avoid blocking message delivery
1029-
// SECURITY NOTE: Use detached context to complete email lookups even during shutdown.
1030-
// Operations bounded by 15-second timeout. This ensures reasonably fast shutdown while
1031-
// completing active lookups for accurate DM delivery (most lookups hit cache instantly,
1032-
// but occasional cold lookups can take 10+ seconds).
1033-
lookupCtx, lookupCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
1034-
go func() {
1035-
defer lookupCancel()
1036-
for _, githubUser := range blockedUsers {
1037-
// Map GitHub username to Slack user ID
1038-
slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain)
1039-
if err == nil && slackUserID != "" {
1040-
// Track with channelID - this will only update on FIRST call per user/PR
1041-
c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber)
1042-
slog.Debug("tracked user tag in channel (async)",
1043-
"workspace", workspaceID,
1044-
"github_user", githubUser,
1045-
"slack_user", slackUserID,
1046-
"channel", channelDisplay,
1047-
"channel_id", channelID,
1048-
"pr", fmt.Sprintf(prFormatString, owner, repo, prNumber))
1049-
}
1131+
// Record tags for blocked users synchronously to prevent race with DM sending
1132+
// This must complete BEFORE DM notifications check tag info
1133+
// Note: Most lookups hit cache and are instant; occasional cold lookups may delay slightly
1134+
// but this is necessary for correct DM delay logic
1135+
lookupCtx, lookupCancel := context.WithTimeout(ctx, 5*time.Second)
1136+
defer lookupCancel()
1137+
1138+
for _, githubUser := range blockedUsers {
1139+
// Map GitHub username to Slack user ID
1140+
slackUserID, err := c.userMapper.SlackHandle(lookupCtx, githubUser, owner, domain)
1141+
if err == nil && slackUserID != "" {
1142+
// Track with channelID - this will only update on FIRST call per user/PR
1143+
c.notifier.Tracker.UpdateUserPRChannelTag(workspaceID, slackUserID, channelID, owner, repo, prNumber)
1144+
slog.Debug("tracked user tag in channel",
1145+
"workspace", workspaceID,
1146+
"github_user", githubUser,
1147+
"slack_user", slackUserID,
1148+
"channel", channelDisplay,
1149+
"channel_id", channelID,
1150+
"pr", fmt.Sprintf(prFormatString, owner, repo, prNumber))
10501151
}
1051-
}()
1152+
}
10521153
}
10531154

10541155
// Build what the message SHOULD be based on current PR state
@@ -1109,6 +1210,9 @@ func (c *Coordinator) processPRForChannel(
11091210
"channel_id", channelID,
11101211
"thread_ts", threadTS,
11111212
"pr_state", prState)
1213+
1214+
// Also update DM messages for blocked users
1215+
c.updateDMMessagesForPR(ctx, checkResult, owner, repo, prNumber, event.PullRequest.Title, event.PullRequest.User.Login, prState, event.PullRequest.HTMLURL)
11121216
}
11131217
} else {
11141218
slog.Debug("message already matches expected content, no update needed",
@@ -1291,10 +1395,21 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s
12911395

12921396
// Resolve channel name to ID for consistent API calls
12931397
resolvedChannel := c.slack.ResolveChannelID(ctx, channel)
1294-
if resolvedChannel != channel {
1295-
slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel)
1398+
1399+
// Check if channel resolution failed (returns original name if not found)
1400+
if resolvedChannel == channel || (channel != "" && channel[0] == '#' && resolvedChannel == channel[1:]) {
1401+
// Only warn if it's not already a channel ID
1402+
if channel == "" || channel[0] != 'C' {
1403+
slog.Warn("could not resolve channel for thread creation",
1404+
"workspace", c.workspaceName,
1405+
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number),
1406+
"channel", channel,
1407+
"action_required", "verify channel exists and bot has access")
1408+
return "", "", fmt.Errorf("could not resolve channel: %s", channel)
1409+
}
1410+
slog.Debug("channel is already a channel ID", "channel_id", channel)
12961411
} else {
1297-
slog.Debug("channel resolution did not change value", "channel", channel, "might_be_channel_id_already", resolvedChannel[0] == 'C')
1412+
slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel)
12981413
}
12991414

13001415
// Create thread with resolved channel ID - post immediately without waiting for user lookups

internal/bot/polling.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,10 +267,17 @@ func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSna
267267
n := 0
268268
for _, ch := range channels {
269269
id := c.slack.ResolveChannelID(ctx, ch)
270-
if id == "" {
271-
slog.Debug("could not resolve channel ID for closed PR thread update",
272-
"channel_name", ch,
273-
"pr", prKey)
270+
271+
// Check if channel resolution failed (returns original name if not found)
272+
if id == ch || (ch != "" && ch[0] == '#' && id == ch[1:]) {
273+
slog.Warn("could not resolve channel for closed PR thread update",
274+
"workspace", c.workspaceName,
275+
"pr", prKey,
276+
"owner", pr.Owner,
277+
"repo", pr.Repo,
278+
"number", pr.Number,
279+
"channel", ch,
280+
"action_required", "verify channel exists and bot has access")
274281
continue
275282
}
276283

internal/notify/notify.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID
297297
}
298298

299299
// Send DM to user.
300-
if err := slackClient.SendDirectMessage(ctx, userID, message); err != nil {
300+
dmChannelID, messageTS, err := slackClient.SendDirectMessage(ctx, userID, message)
301+
if err != nil {
301302
slog.Error("failed to send DM notification",
302303
"user", userID,
303304
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
@@ -309,12 +310,23 @@ func (m *Manager) NotifyUser(ctx context.Context, workspaceID, userID, channelID
309310
// Update last DM notification time.
310311
m.Tracker.UpdateDMNotification(workspaceID, userID)
311312

313+
// Save DM message info for future updates
314+
if err := slackClient.SaveDMMessageInfo(ctx, userID, pr.HTMLURL, dmChannelID, messageTS, message); err != nil {
315+
slog.Warn("failed to save DM message info",
316+
"user", userID,
317+
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
318+
"error", err,
319+
"impact", "DM won't be updated on state changes")
320+
}
321+
312322
slog.Info("successfully sent DM notification",
313323
"user", userID,
314324
"pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number),
315325
"pr_author", pr.Author,
316326
"pr_state", pr.State,
317327
"action_required", action,
318-
"notification_updated", true)
328+
"notification_updated", true,
329+
"dm_channel_id", dmChannelID,
330+
"message_ts", messageTS)
319331
return nil
320332
}

0 commit comments

Comments
 (0)