Skip to content

Commit f7097ed

Browse files
authored
Merge pull request #79 from codeGROOVE-dev/reliable
fix channel dupe bug across restarts
2 parents 3085992 + 27055ed commit f7097ed

File tree

8 files changed

+402
-315
lines changed

8 files changed

+402
-315
lines changed

pkg/bot/bot.go

Lines changed: 95 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,28 @@ type messageUpdateParams struct {
5555
PRNumber int
5656
}
5757

58+
// threadCreationParams groups parameters for thread creation/lookup operations.
59+
//
60+
//nolint:govet // fieldalignment - struct layout optimized for readability over minimal padding
61+
type threadCreationParams struct {
62+
PullRequest struct {
63+
CreatedAt time.Time `json:"created_at"`
64+
User struct {
65+
Login string `json:"login"`
66+
} `json:"user"`
67+
HTMLURL string `json:"html_url"`
68+
Title string `json:"title"`
69+
Number int `json:"number"`
70+
}
71+
CheckResult *turn.CheckResponse
72+
ChannelID string
73+
ChannelName string
74+
Owner string
75+
Repo string
76+
PRState string
77+
PRNumber int
78+
}
79+
5880
// Coordinator coordinates between GitHub, Slack, and notifications for a single org.
5981
//
6082
//nolint:govet // Field order optimized for logical grouping over memory alignment
@@ -176,39 +198,30 @@ func (c *Coordinator) saveThread(ctx context.Context, owner, repo string, number
176198
// Returns (threadTS, wasNewlyCreated, currentMessageText, error).
177199
//
178200
//nolint:revive // Four return values needed to track thread state and creation status
179-
func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner, repo string, prNumber int, prState string, pullRequest struct {
180-
CreatedAt time.Time `json:"created_at"`
181-
User struct {
182-
Login string `json:"login"`
183-
} `json:"user"`
184-
HTMLURL string `json:"html_url"`
185-
Title string `json:"title"`
186-
Number int `json:"number"`
187-
}, checkResult *turn.CheckResponse,
188-
) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) {
189-
cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID)
201+
func (c *Coordinator) findOrCreatePRThread(ctx context.Context, params threadCreationParams) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) {
202+
cacheKey := fmt.Sprintf("%s/%s#%d:%s", params.Owner, params.Repo, params.PRNumber, params.ChannelID)
190203

191204
slog.Debug("finding or creating PR thread",
192205
"pr", cacheKey,
193-
logFieldChannel, channelID,
194-
"pr_state", prState)
206+
logFieldChannel, params.ChannelID,
207+
"pr_state", params.PRState)
195208

196209
// Try to find existing thread
197-
threadTS, messageText, found := c.findPRThread(ctx, cacheKey, channelID, owner, repo, prNumber, prState, pullRequest)
210+
threadTS, messageText, found := c.findPRThread(ctx, cacheKey, params.ChannelID, params.Owner, params.Repo, params.PRNumber, params.PRState, params.PullRequest)
198211
if found {
199212
return threadTS, false, messageText, nil
200213
}
201214

202215
// Thread not found - create new one with concurrent creation prevention
203-
threadInfo, wasCreated, err := c.createPRThreadWithLocking(ctx, channelID, owner, repo, prNumber, prState, pullRequest, checkResult)
216+
threadInfo, wasCreated, err := c.createPRThreadWithLocking(ctx, params)
204217
if err != nil {
205218
return "", false, "", err
206219
}
207220

208221
return threadInfo.ThreadTS, wasCreated, threadInfo.MessageText, nil
209222
}
210223

211-
// findPRThread searches for an existing PR thread in cache and Slack.
224+
// findPRThread searches for an existing PR thread in cache, datastore, and Slack.
212225
// Returns (threadTS, messageText, found).
213226
func (c *Coordinator) findPRThread(
214227
ctx context.Context, cacheKey, channelID, owner, repo string, prNumber int, prState string,
@@ -222,9 +235,9 @@ func (c *Coordinator) findPRThread(
222235
Number int `json:"number"`
223236
},
224237
) (threadTS, messageText string, found bool) {
225-
// Check cache first
238+
// Check in-memory cache first (fastest)
226239
if threadInfo, exists := c.threadCache.Get(cacheKey); exists {
227-
slog.Debug("found PR thread in cache",
240+
slog.Debug("found PR thread in memory cache",
228241
"pr", cacheKey,
229242
"thread_ts", threadInfo.ThreadTS,
230243
logFieldChannel, channelID,
@@ -233,7 +246,20 @@ func (c *Coordinator) findPRThread(
233246
return threadInfo.ThreadTS, threadInfo.MessageText, true
234247
}
235248

236-
// Search Slack for existing thread
249+
// Check datastore (survives restarts, shared across instances)
250+
if threadInfo, exists := c.stateStore.Thread(ctx, owner, repo, prNumber, channelID); exists {
251+
slog.Info("found PR thread in datastore (cache miss - warming cache)",
252+
"pr", cacheKey,
253+
"thread_ts", threadInfo.ThreadTS,
254+
logFieldChannel, channelID,
255+
"note", "this prevents duplicate thread creation after restarts/deployments")
256+
257+
// Warm the cache with datastore value
258+
c.threadCache.Set(cacheKey, threadInfo)
259+
return threadInfo.ThreadTS, threadInfo.MessageText, true
260+
}
261+
262+
// Search Slack for existing thread (slowest, last resort)
237263
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", owner, repo, prNumber)
238264
searchFrom := pullRequest.CreatedAt
239265
if searchFrom.IsZero() || time.Since(searchFrom) > 30*24*time.Hour {
@@ -274,19 +300,9 @@ func (c *Coordinator) findPRThread(
274300
// Returns (threadInfo, wasCreated, error).
275301
// wasCreated is true if a new thread was created, false if an existing thread was found.
276302
func (c *Coordinator) createPRThreadWithLocking(
277-
ctx context.Context, channelID, owner, repo string, prNumber int, prState string,
278-
pullRequest struct {
279-
CreatedAt time.Time `json:"created_at"`
280-
User struct {
281-
Login string `json:"login"`
282-
} `json:"user"`
283-
HTMLURL string `json:"html_url"`
284-
Title string `json:"title"`
285-
Number int `json:"number"`
286-
},
287-
checkResult *turn.CheckResponse,
303+
ctx context.Context, params threadCreationParams,
288304
) (threadInfo cache.ThreadInfo, wasCreated bool, err error) {
289-
cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID)
305+
cacheKey := fmt.Sprintf("%s/%s#%d:%s", params.Owner, params.Repo, params.PRNumber, params.ChannelID)
290306
// Prevent concurrent creation within this instance
291307
if !c.threadCache.MarkCreating(cacheKey) {
292308
// Wait for another goroutine to finish
@@ -313,53 +329,53 @@ func (c *Coordinator) createPRThreadWithLocking(
313329
defer c.threadCache.UnmarkCreating(cacheKey)
314330

315331
// Cross-instance race prevention check
316-
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", owner, repo, prNumber)
332+
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", params.Owner, params.Repo, params.PRNumber)
317333
time.Sleep(100 * time.Millisecond)
318-
crossInstanceTS, crossInstanceText := c.searchForPRThread(ctx, channelID, prURL, pullRequest.CreatedAt)
334+
crossInstanceTS, crossInstanceText := c.searchForPRThread(ctx, params.ChannelID, prURL, params.PullRequest.CreatedAt)
319335
if crossInstanceTS != "" {
320336
slog.Info("found thread created by another instance (cross-instance race avoided)",
321337
"pr", cacheKey,
322338
"thread_ts", crossInstanceTS,
323-
logFieldChannel, channelID,
339+
logFieldChannel, params.ChannelID,
324340
"current_message_preview", crossInstanceText[:min(100, len(crossInstanceText))],
325341
"note", "this prevented duplicate thread creation during rolling deployment")
326342

327343
info := cache.ThreadInfo{
328344
ThreadTS: crossInstanceTS,
329-
ChannelID: channelID,
330-
LastState: prState,
345+
ChannelID: params.ChannelID,
346+
LastState: params.PRState,
331347
MessageText: crossInstanceText,
332348
}
333-
c.saveThread(ctx, owner, repo, prNumber, channelID, info)
349+
c.saveThread(ctx, params.Owner, params.Repo, params.PRNumber, params.ChannelID, info)
334350
return info, false, nil
335351
}
336352

337353
// Create new thread
338354
slog.Info("creating new PR thread",
339355
"pr", cacheKey,
340-
logFieldChannel, channelID,
341-
"pr_state", prState,
342-
"pr_created_at", pullRequest.CreatedAt.Format(time.RFC3339))
356+
logFieldChannel, params.ChannelID,
357+
"pr_state", params.PRState,
358+
"pr_created_at", params.PullRequest.CreatedAt.Format(time.RFC3339))
343359

344-
newThreadTS, newMessageText, err := c.createPRThread(ctx, channelID, owner, repo, prNumber, prState, pullRequest, checkResult)
360+
newThreadTS, newMessageText, err := c.createPRThread(ctx, params)
345361
if err != nil {
346362
return cache.ThreadInfo{}, false, fmt.Errorf("failed to create PR thread: %w", err)
347363
}
348364

349365
// Save the new thread
350366
info := cache.ThreadInfo{
351367
ThreadTS: newThreadTS,
352-
ChannelID: channelID,
353-
LastState: prState,
368+
ChannelID: params.ChannelID,
369+
LastState: params.PRState,
354370
MessageText: newMessageText,
355371
}
356-
c.saveThread(ctx, owner, repo, prNumber, channelID, info)
372+
c.saveThread(ctx, params.Owner, params.Repo, params.PRNumber, params.ChannelID, info)
357373

358374
slog.Info("created and cached new PR thread",
359375
"pr", cacheKey,
360376
"thread_ts", newThreadTS,
361-
logFieldChannel, channelID,
362-
"initial_state", prState,
377+
logFieldChannel, params.ChannelID,
378+
"initial_state", params.PRState,
363379
"message_preview", newMessageText[:min(100, len(newMessageText))],
364380
"creation_successful", true,
365381
"note", "if you see duplicate threads, check if another instance created one during the same time window")
@@ -1082,9 +1098,16 @@ func (c *Coordinator) processPRForChannel(
10821098
Number: event.PullRequest.Number,
10831099
CreatedAt: event.PullRequest.CreatedAt,
10841100
}
1085-
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(
1086-
ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult,
1087-
)
1101+
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(ctx, threadCreationParams{
1102+
ChannelID: channelID,
1103+
ChannelName: channelName,
1104+
Owner: owner,
1105+
Repo: repo,
1106+
PRNumber: prNumber,
1107+
PRState: prState,
1108+
PullRequest: pullRequestStruct,
1109+
CheckResult: checkResult,
1110+
})
10881111
if err != nil {
10891112
slog.Error("failed to find or create PR thread",
10901113
"workspace", c.workspaceName,
@@ -1511,67 +1534,40 @@ func (c *Coordinator) handlePullRequestReviewFromSprinkler(
15111534
// createPRThread creates a new thread in Slack for a PR.
15121535
// Critical performance optimization: Posts thread immediately WITHOUT user mentions,
15131536
// then updates asynchronously once email lookups complete (which take 13-20 seconds each).
1514-
func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo string, number int, _ string, pr struct {
1515-
CreatedAt time.Time `json:"created_at"`
1516-
User struct {
1517-
Login string `json:"login"`
1518-
} `json:"user"`
1519-
HTMLURL string `json:"html_url"`
1520-
Title string `json:"title"`
1521-
Number int `json:"number"`
1522-
}, checkResult *turn.CheckResponse,
1523-
) (threadTS string, messageText string, err error) {
1537+
func (c *Coordinator) createPRThread(ctx context.Context, params threadCreationParams) (threadTS string, messageText string, err error) {
15241538
// Format initial message WITHOUT user mentions (fast path)
1525-
// Format: :emoji: Title repo#123 · author
1526-
domain := c.configManager.Domain(owner)
1527-
params := notify.MessageParams{
1528-
CheckResult: checkResult,
1529-
Owner: owner,
1530-
Repo: repo,
1531-
PRNumber: number,
1532-
Title: pr.Title,
1533-
Author: pr.User.Login,
1534-
HTMLURL: pr.HTMLURL,
1539+
// Format: :emoji: Title #123 or repo#123 · author
1540+
domain := c.configManager.Domain(params.Owner)
1541+
msgParams := notify.MessageParams{
1542+
CheckResult: params.CheckResult,
1543+
Owner: params.Owner,
1544+
Repo: params.Repo,
1545+
PRNumber: params.PRNumber,
1546+
Title: params.PullRequest.Title,
1547+
Author: params.PullRequest.User.Login,
1548+
HTMLURL: params.PullRequest.HTMLURL,
15351549
Domain: domain,
1550+
ChannelName: params.ChannelName,
15361551
UserMapper: c.userMapper,
15371552
}
1538-
initialText := notify.FormatChannelMessageBase(ctx, params)
1539-
1540-
// Resolve channel name to ID for consistent API calls
1541-
resolvedChannel := c.slack.ResolveChannelID(ctx, channel)
1542-
1543-
// Check if channel resolution failed (returns original name if not found)
1544-
if resolvedChannel == channel || (channel != "" && channel[0] == '#' && resolvedChannel == channel[1:]) {
1545-
// Only warn if it's not already a channel ID
1546-
if channel == "" || channel[0] != 'C' {
1547-
slog.Warn("could not resolve channel for thread creation",
1548-
"workspace", c.workspaceName,
1549-
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number),
1550-
"channel", channel,
1551-
"action_required", "verify channel exists and bot has access")
1552-
return "", "", fmt.Errorf("could not resolve channel: %s", channel)
1553-
}
1554-
slog.Debug("channel is already a channel ID", "channel_id", channel)
1555-
} else {
1556-
slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel)
1557-
}
1553+
initialText := notify.FormatChannelMessageBase(ctx, msgParams)
15581554

15591555
// Create thread with resolved channel ID - post immediately without waiting for user lookups
1560-
threadTS, err = c.slack.PostThread(ctx, resolvedChannel, initialText, nil)
1556+
threadTS, err = c.slack.PostThread(ctx, params.ChannelID, initialText, nil)
15611557
if err != nil {
15621558
return "", "", fmt.Errorf("failed to post thread: %w", err)
15631559
}
15641560

15651561
slog.Info("thread created immediately (async user mention enrichment pending)",
1566-
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number),
1567-
logFieldChannel, resolvedChannel,
1562+
logFieldPR, fmt.Sprintf(prFormatString, params.Owner, params.Repo, params.PRNumber),
1563+
logFieldChannel, params.ChannelID,
15681564
"thread_ts", threadTS,
15691565
"message_preview", initialText[:min(100, len(initialText))],
15701566
"will_update_with_mentions", true)
15711567

15721568
// Asynchronously add user mentions once email lookups complete
15731569
// This avoids blocking thread creation on slow email lookups (13-20 seconds each)
1574-
if checkResult != nil && len(checkResult.Analysis.NextAction) > 0 {
1570+
if params.CheckResult != nil && len(params.CheckResult.Analysis.NextAction) > 0 {
15751571
// SECURITY NOTE: Use detached context to complete message enrichment even during shutdown.
15761572
// Operations bounded by 60-second timeout, allowing time for:
15771573
// - GitHub email lookup via gh-mailto (~5-10s with retries/timeouts)
@@ -1581,17 +1577,17 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s
15811577
enrichCtx, enrichCancel := context.WithTimeout(context.WithoutCancel(ctx), 60*time.Second)
15821578
// Capture variables to avoid data race
15831579
capturedThreadTS := threadTS
1584-
capturedOwner := owner
1585-
capturedRepo := repo
1586-
capturedNumber := number
1587-
capturedChannel := resolvedChannel
1580+
capturedOwner := params.Owner
1581+
capturedRepo := params.Repo
1582+
capturedNumber := params.PRNumber
1583+
capturedChannel := params.ChannelID
15881584
capturedInitialText := initialText
1589-
capturedParams := params
1585+
capturedMsgParams := msgParams
15901586
go func() {
15911587
defer enrichCancel()
15921588

15931589
// Perform email lookups in background
1594-
nextActionsSuffix := notify.FormatNextActionsSuffix(enrichCtx, capturedParams)
1590+
nextActionsSuffix := notify.FormatNextActionsSuffix(enrichCtx, capturedMsgParams)
15951591
if nextActionsSuffix != "" {
15961592
// Update message with user mentions
15971593
enrichedText := capturedInitialText + nextActionsSuffix

pkg/bot/bot_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func TestSaveThread(t *testing.T) {
230230
}
231231

232232
// Check persistent storage
233-
persistedKey := "thread:testorg/testrepo#42:C123456"
233+
persistedKey := "testorg/testrepo#42:C123456"
234234
persistedInfo, ok := mockState.threads[persistedKey]
235235
if !ok {
236236
t.Error("expected thread to be in persistent storage")

pkg/bot/coordinator_test_helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (m *mockStateStore) SaveThread(ctx context.Context, owner, repo string, num
4949
if m.saveThreadErr != nil {
5050
return m.saveThreadErr
5151
}
52-
key := fmt.Sprintf("thread:%s/%s#%d:%s", owner, repo, number, channelID)
52+
key := fmt.Sprintf("%s/%s#%d:%s", owner, repo, number, channelID)
5353
if m.threads == nil {
5454
m.threads = make(map[string]cache.ThreadInfo)
5555
}

0 commit comments

Comments
 (0)