Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 95 additions & 99 deletions pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ type messageUpdateParams struct {
PRNumber int
}

// threadCreationParams groups parameters for thread creation/lookup operations.
//
//nolint:govet // fieldalignment - struct layout optimized for readability over minimal padding
type threadCreationParams struct {
PullRequest struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
}
CheckResult *turn.CheckResponse
ChannelID string
ChannelName string
Owner string
Repo string
PRState string
PRNumber int
}

// Coordinator coordinates between GitHub, Slack, and notifications for a single org.
//
//nolint:govet // Field order optimized for logical grouping over memory alignment
Expand Down Expand Up @@ -176,39 +198,30 @@ func (c *Coordinator) saveThread(ctx context.Context, owner, repo string, number
// Returns (threadTS, wasNewlyCreated, currentMessageText, error).
//
//nolint:revive // Four return values needed to track thread state and creation status
func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner, repo string, prNumber int, prState string, pullRequest struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
}, checkResult *turn.CheckResponse,
) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) {
cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID)
func (c *Coordinator) findOrCreatePRThread(ctx context.Context, params threadCreationParams) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) {
cacheKey := fmt.Sprintf("%s/%s#%d:%s", params.Owner, params.Repo, params.PRNumber, params.ChannelID)

slog.Debug("finding or creating PR thread",
"pr", cacheKey,
logFieldChannel, channelID,
"pr_state", prState)
logFieldChannel, params.ChannelID,
"pr_state", params.PRState)

// Try to find existing thread
threadTS, messageText, found := c.findPRThread(ctx, cacheKey, channelID, owner, repo, prNumber, prState, pullRequest)
threadTS, messageText, found := c.findPRThread(ctx, cacheKey, params.ChannelID, params.Owner, params.Repo, params.PRNumber, params.PRState, params.PullRequest)
if found {
return threadTS, false, messageText, nil
}

// Thread not found - create new one with concurrent creation prevention
threadInfo, wasCreated, err := c.createPRThreadWithLocking(ctx, channelID, owner, repo, prNumber, prState, pullRequest, checkResult)
threadInfo, wasCreated, err := c.createPRThreadWithLocking(ctx, params)
if err != nil {
return "", false, "", err
}

return threadInfo.ThreadTS, wasCreated, threadInfo.MessageText, nil
}

// findPRThread searches for an existing PR thread in cache and Slack.
// findPRThread searches for an existing PR thread in cache, datastore, and Slack.
// Returns (threadTS, messageText, found).
func (c *Coordinator) findPRThread(
ctx context.Context, cacheKey, channelID, owner, repo string, prNumber int, prState string,
Expand All @@ -222,9 +235,9 @@ func (c *Coordinator) findPRThread(
Number int `json:"number"`
},
) (threadTS, messageText string, found bool) {
// Check cache first
// Check in-memory cache first (fastest)
if threadInfo, exists := c.threadCache.Get(cacheKey); exists {
slog.Debug("found PR thread in cache",
slog.Debug("found PR thread in memory cache",
"pr", cacheKey,
"thread_ts", threadInfo.ThreadTS,
logFieldChannel, channelID,
Expand All @@ -233,7 +246,20 @@ func (c *Coordinator) findPRThread(
return threadInfo.ThreadTS, threadInfo.MessageText, true
}

// Search Slack for existing thread
// Check datastore (survives restarts, shared across instances)
if threadInfo, exists := c.stateStore.Thread(ctx, owner, repo, prNumber, channelID); exists {
slog.Info("found PR thread in datastore (cache miss - warming cache)",
"pr", cacheKey,
"thread_ts", threadInfo.ThreadTS,
logFieldChannel, channelID,
"note", "this prevents duplicate thread creation after restarts/deployments")

// Warm the cache with datastore value
c.threadCache.Set(cacheKey, threadInfo)
return threadInfo.ThreadTS, threadInfo.MessageText, true
}

// Search Slack for existing thread (slowest, last resort)
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", owner, repo, prNumber)
searchFrom := pullRequest.CreatedAt
if searchFrom.IsZero() || time.Since(searchFrom) > 30*24*time.Hour {
Expand Down Expand Up @@ -274,19 +300,9 @@ func (c *Coordinator) findPRThread(
// Returns (threadInfo, wasCreated, error).
// wasCreated is true if a new thread was created, false if an existing thread was found.
func (c *Coordinator) createPRThreadWithLocking(
ctx context.Context, channelID, owner, repo string, prNumber int, prState string,
pullRequest struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
},
checkResult *turn.CheckResponse,
ctx context.Context, params threadCreationParams,
) (threadInfo cache.ThreadInfo, wasCreated bool, err error) {
cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID)
cacheKey := fmt.Sprintf("%s/%s#%d:%s", params.Owner, params.Repo, params.PRNumber, params.ChannelID)
// Prevent concurrent creation within this instance
if !c.threadCache.MarkCreating(cacheKey) {
// Wait for another goroutine to finish
Expand All @@ -313,53 +329,53 @@ func (c *Coordinator) createPRThreadWithLocking(
defer c.threadCache.UnmarkCreating(cacheKey)

// Cross-instance race prevention check
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", owner, repo, prNumber)
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", params.Owner, params.Repo, params.PRNumber)
time.Sleep(100 * time.Millisecond)
crossInstanceTS, crossInstanceText := c.searchForPRThread(ctx, channelID, prURL, pullRequest.CreatedAt)
crossInstanceTS, crossInstanceText := c.searchForPRThread(ctx, params.ChannelID, prURL, params.PullRequest.CreatedAt)
if crossInstanceTS != "" {
slog.Info("found thread created by another instance (cross-instance race avoided)",
"pr", cacheKey,
"thread_ts", crossInstanceTS,
logFieldChannel, channelID,
logFieldChannel, params.ChannelID,
"current_message_preview", crossInstanceText[:min(100, len(crossInstanceText))],
"note", "this prevented duplicate thread creation during rolling deployment")

info := cache.ThreadInfo{
ThreadTS: crossInstanceTS,
ChannelID: channelID,
LastState: prState,
ChannelID: params.ChannelID,
LastState: params.PRState,
MessageText: crossInstanceText,
}
c.saveThread(ctx, owner, repo, prNumber, channelID, info)
c.saveThread(ctx, params.Owner, params.Repo, params.PRNumber, params.ChannelID, info)
return info, false, nil
}

// Create new thread
slog.Info("creating new PR thread",
"pr", cacheKey,
logFieldChannel, channelID,
"pr_state", prState,
"pr_created_at", pullRequest.CreatedAt.Format(time.RFC3339))
logFieldChannel, params.ChannelID,
"pr_state", params.PRState,
"pr_created_at", params.PullRequest.CreatedAt.Format(time.RFC3339))

newThreadTS, newMessageText, err := c.createPRThread(ctx, channelID, owner, repo, prNumber, prState, pullRequest, checkResult)
newThreadTS, newMessageText, err := c.createPRThread(ctx, params)
if err != nil {
return cache.ThreadInfo{}, false, fmt.Errorf("failed to create PR thread: %w", err)
}

// Save the new thread
info := cache.ThreadInfo{
ThreadTS: newThreadTS,
ChannelID: channelID,
LastState: prState,
ChannelID: params.ChannelID,
LastState: params.PRState,
MessageText: newMessageText,
}
c.saveThread(ctx, owner, repo, prNumber, channelID, info)
c.saveThread(ctx, params.Owner, params.Repo, params.PRNumber, params.ChannelID, info)

slog.Info("created and cached new PR thread",
"pr", cacheKey,
"thread_ts", newThreadTS,
logFieldChannel, channelID,
"initial_state", prState,
logFieldChannel, params.ChannelID,
"initial_state", params.PRState,
"message_preview", newMessageText[:min(100, len(newMessageText))],
"creation_successful", true,
"note", "if you see duplicate threads, check if another instance created one during the same time window")
Expand Down Expand Up @@ -1082,9 +1098,16 @@ func (c *Coordinator) processPRForChannel(
Number: event.PullRequest.Number,
CreatedAt: event.PullRequest.CreatedAt,
}
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(
ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult,
)
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(ctx, threadCreationParams{
ChannelID: channelID,
ChannelName: channelName,
Owner: owner,
Repo: repo,
PRNumber: prNumber,
PRState: prState,
PullRequest: pullRequestStruct,
CheckResult: checkResult,
})
if err != nil {
slog.Error("failed to find or create PR thread",
"workspace", c.workspaceName,
Expand Down Expand Up @@ -1511,67 +1534,40 @@ func (c *Coordinator) handlePullRequestReviewFromSprinkler(
// createPRThread creates a new thread in Slack for a PR.
// Critical performance optimization: Posts thread immediately WITHOUT user mentions,
// then updates asynchronously once email lookups complete (which take 13-20 seconds each).
func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo string, number int, _ string, pr struct {
CreatedAt time.Time `json:"created_at"`
User struct {
Login string `json:"login"`
} `json:"user"`
HTMLURL string `json:"html_url"`
Title string `json:"title"`
Number int `json:"number"`
}, checkResult *turn.CheckResponse,
) (threadTS string, messageText string, err error) {
func (c *Coordinator) createPRThread(ctx context.Context, params threadCreationParams) (threadTS string, messageText string, err error) {
// Format initial message WITHOUT user mentions (fast path)
// Format: :emoji: Title repo#123 · author
domain := c.configManager.Domain(owner)
params := notify.MessageParams{
CheckResult: checkResult,
Owner: owner,
Repo: repo,
PRNumber: number,
Title: pr.Title,
Author: pr.User.Login,
HTMLURL: pr.HTMLURL,
// Format: :emoji: Title #123 or repo#123 · author
domain := c.configManager.Domain(params.Owner)
msgParams := notify.MessageParams{
CheckResult: params.CheckResult,
Owner: params.Owner,
Repo: params.Repo,
PRNumber: params.PRNumber,
Title: params.PullRequest.Title,
Author: params.PullRequest.User.Login,
HTMLURL: params.PullRequest.HTMLURL,
Domain: domain,
ChannelName: params.ChannelName,
UserMapper: c.userMapper,
}
initialText := notify.FormatChannelMessageBase(ctx, params)

// Resolve channel name to ID for consistent API calls
resolvedChannel := c.slack.ResolveChannelID(ctx, channel)

// Check if channel resolution failed (returns original name if not found)
if resolvedChannel == channel || (channel != "" && channel[0] == '#' && resolvedChannel == channel[1:]) {
// Only warn if it's not already a channel ID
if channel == "" || channel[0] != 'C' {
slog.Warn("could not resolve channel for thread creation",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number),
"channel", channel,
"action_required", "verify channel exists and bot has access")
return "", "", fmt.Errorf("could not resolve channel: %s", channel)
}
slog.Debug("channel is already a channel ID", "channel_id", channel)
} else {
slog.Debug("resolved channel for thread creation", "original", channel, "resolved", resolvedChannel)
}
initialText := notify.FormatChannelMessageBase(ctx, msgParams)

// Create thread with resolved channel ID - post immediately without waiting for user lookups
threadTS, err = c.slack.PostThread(ctx, resolvedChannel, initialText, nil)
threadTS, err = c.slack.PostThread(ctx, params.ChannelID, initialText, nil)
if err != nil {
return "", "", fmt.Errorf("failed to post thread: %w", err)
}

slog.Info("thread created immediately (async user mention enrichment pending)",
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, number),
logFieldChannel, resolvedChannel,
logFieldPR, fmt.Sprintf(prFormatString, params.Owner, params.Repo, params.PRNumber),
logFieldChannel, params.ChannelID,
"thread_ts", threadTS,
"message_preview", initialText[:min(100, len(initialText))],
"will_update_with_mentions", true)

// Asynchronously add user mentions once email lookups complete
// This avoids blocking thread creation on slow email lookups (13-20 seconds each)
if checkResult != nil && len(checkResult.Analysis.NextAction) > 0 {
if params.CheckResult != nil && len(params.CheckResult.Analysis.NextAction) > 0 {
// SECURITY NOTE: Use detached context to complete message enrichment even during shutdown.
// Operations bounded by 60-second timeout, allowing time for:
// - GitHub email lookup via gh-mailto (~5-10s with retries/timeouts)
Expand All @@ -1581,17 +1577,17 @@ func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo s
enrichCtx, enrichCancel := context.WithTimeout(context.WithoutCancel(ctx), 60*time.Second)
// Capture variables to avoid data race
capturedThreadTS := threadTS
capturedOwner := owner
capturedRepo := repo
capturedNumber := number
capturedChannel := resolvedChannel
capturedOwner := params.Owner
capturedRepo := params.Repo
capturedNumber := params.PRNumber
capturedChannel := params.ChannelID
capturedInitialText := initialText
capturedParams := params
capturedMsgParams := msgParams
go func() {
defer enrichCancel()

// Perform email lookups in background
nextActionsSuffix := notify.FormatNextActionsSuffix(enrichCtx, capturedParams)
nextActionsSuffix := notify.FormatNextActionsSuffix(enrichCtx, capturedMsgParams)
if nextActionsSuffix != "" {
// Update message with user mentions
enrichedText := capturedInitialText + nextActionsSuffix
Expand Down
2 changes: 1 addition & 1 deletion pkg/bot/bot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestSaveThread(t *testing.T) {
}

// Check persistent storage
persistedKey := "thread:testorg/testrepo#42:C123456"
persistedKey := "testorg/testrepo#42:C123456"
persistedInfo, ok := mockState.threads[persistedKey]
if !ok {
t.Error("expected thread to be in persistent storage")
Expand Down
2 changes: 1 addition & 1 deletion pkg/bot/coordinator_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (m *mockStateStore) SaveThread(ctx context.Context, owner, repo string, num
if m.saveThreadErr != nil {
return m.saveThreadErr
}
key := fmt.Sprintf("thread:%s/%s#%d:%s", owner, repo, number, channelID)
key := fmt.Sprintf("%s/%s#%d:%s", owner, repo, number, channelID)
if m.threads == nil {
m.threads = make(map[string]cache.ThreadInfo)
}
Expand Down
Loading
Loading