Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,10 @@ func main() {
AddSource: true,
Level: slog.LevelInfo,
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
// Shorten source paths to relative paths for cleaner logs
// Use basename for source file for cleaner logs
if a.Key == slog.SourceKey {
if source, ok := a.Value.Any().(*slog.Source); ok {
// Find project root by looking for /slacker/ in path
if idx := strings.LastIndex(source.File, "/slacker/"); idx >= 0 {
source.File = source.File[idx+9:] // Skip "/slacker/"
}
source.File = filepath.Base(source.File)
}
}
return a
Expand Down
111 changes: 70 additions & 41 deletions pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,21 @@ type messageUpdateParams struct {
//
//nolint:govet // Field order optimized for logical grouping over memory alignment
type Coordinator struct {
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
stateStore StateStore // Persistent state across restarts
sprinklerURL string
workspaceName string // Track workspace name for better logging
slack SlackClient
github GitHubClient
configManager ConfigManager
notifier *notify.Manager
userMapper UserMapper
threadCache *cache.ThreadCache // In-memory cache for fast lookups
commitPRCache *cache.CommitPRCache // Maps commit SHAs to PR numbers for check events
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
dmLocks sync.Map // Per-user-PR locks to prevent duplicate DMs (key: "userID:prURL")
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
stateStore StateStore // Persistent state across restarts
sprinklerURL string
workspaceName string // Track workspace name for better logging
slack SlackClient
github GitHubClient
configManager ConfigManager
notifier *notify.Manager
userMapper UserMapper
threadCache *cache.ThreadCache // In-memory cache for fast lookups
commitPRCache *cache.CommitPRCache // Maps commit SHAs to PR numbers for check events
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
dmLocks sync.Map // Per-user-PR locks to prevent duplicate DMs (key: "userID:prURL")
recentUpdateLogsMu sync.Mutex // Protects recentUpdateLogs map
recentUpdateLogs map[string]time.Time // Tracks recent "updating message" logs to prevent spam (key: "channelID:threadTS:state")
}

// StateStore interface for persistent state - allows dependency injection for testing.
Expand Down Expand Up @@ -111,16 +113,17 @@ func New(
stateStore StateStore,
) *Coordinator {
c := &Coordinator{
slack: slackClient,
github: githubClient,
configManager: configManager,
notifier: notifier,
userMapper: usermapping.New(slackClient.API(), githubClient.InstallationToken(ctx)),
sprinklerURL: sprinklerURL,
stateStore: stateStore,
threadCache: cache.New(),
commitPRCache: cache.NewCommitPRCache(),
eventSemaphore: make(chan struct{}, 10), // Allow 10 concurrent events per org
slack: slackClient,
github: githubClient,
configManager: configManager,
notifier: notifier,
userMapper: usermapping.New(slackClient.API(), githubClient.InstallationToken(ctx)),
sprinklerURL: sprinklerURL,
stateStore: stateStore,
threadCache: cache.New(),
commitPRCache: cache.NewCommitPRCache(),
eventSemaphore: make(chan struct{}, 10), // Allow 10 concurrent events per org
recentUpdateLogs: make(map[string]time.Time),
}

// Set GitHub client in config manager for this org.
Expand Down Expand Up @@ -569,7 +572,7 @@ func (c *Coordinator) processEvent(ctx context.Context, msg SprinklerMessage) er
case "check_run", "check_suite":
// Check events update PR test status - handle like pull_request events
if msg.PRNumber > 0 {
slog.Info("received check event for PR, refreshing state",
slog.Debug("received check event for PR, refreshing state",
"owner", owner,
"repo", repo,
"pr", msg.PRNumber,
Expand Down Expand Up @@ -616,7 +619,7 @@ func (c *Coordinator) handlePullRequestEventWithData(ctx context.Context, owner,
) {
prNumber := event.Number

slog.Info("PR event with pre-fetched data",
slog.Debug("PR event with pre-fetched data",
logFieldOwner, owner,
logFieldRepo, repo,
"number", prNumber,
Expand Down Expand Up @@ -997,12 +1000,10 @@ func (c *Coordinator) processChannelsInParallel(
if !exists {
// First time seeing this user
taggedUsers[userID] = info
} else {
} else if info.IsInAnyChannel {
// User already tagged in another channel - update IsInAnyChannel if this channel has them
if info.IsInAnyChannel {
existing.IsInAnyChannel = true
taggedUsers[userID] = existing
}
existing.IsInAnyChannel = true
taggedUsers[userID] = existing
}
}
taggedUsersMu.Unlock()
Expand Down Expand Up @@ -1270,16 +1271,44 @@ func (c *Coordinator) updateMessageIfNeeded(ctx context.Context, params messageU
return
}

slog.Info("updating message - content changed",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, params.Owner, params.Repo, params.PRNumber),
"channel", params.ChannelDisplay,
"channel_id", params.ChannelID,
"thread_ts", params.ThreadTS,
"pr_state", params.PRState,
"old_state", params.OldState,
"current_message_preview", params.CurrentText[:min(100, len(params.CurrentText))],
"expected_message_preview", expectedText[:min(100, len(expectedText))])
// Deduplicate "updating message" logs within 1 second to reduce noise from concurrent events
logKey := fmt.Sprintf("%s:%s:%s", params.ChannelID, params.ThreadTS, params.PRState)
shouldLog := true // Default to logging if map not initialized
if c.recentUpdateLogs != nil {
c.recentUpdateLogsMu.Lock()
if lastLogged, exists := c.recentUpdateLogs[logKey]; !exists || time.Since(lastLogged) > time.Second {
c.recentUpdateLogs[logKey] = time.Now()
shouldLog = true
// Clean up old entries (older than 5 seconds) to prevent memory leak
for key, timestamp := range c.recentUpdateLogs {
if time.Since(timestamp) > 5*time.Second {
delete(c.recentUpdateLogs, key)
}
}
} else {
shouldLog = false
}
c.recentUpdateLogsMu.Unlock()
}

if shouldLog {
slog.Info("updating message - content changed",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, params.Owner, params.Repo, params.PRNumber),
"channel", params.ChannelDisplay,
"channel_id", params.ChannelID,
"thread_ts", params.ThreadTS,
"pr_state", params.PRState,
"old_state", params.OldState,
"current_message_preview", params.CurrentText[:min(100, len(params.CurrentText))],
"expected_message_preview", expectedText[:min(100, len(expectedText))])
} else {
slog.Debug("suppressing duplicate 'updating message' log (within 1s)",
"workspace", c.workspaceName,
logFieldPR, fmt.Sprintf(prFormatString, params.Owner, params.Repo, params.PRNumber),
"channel", params.ChannelDisplay,
"pr_state", params.PRState)
}

if err := c.slack.UpdateMessage(ctx, params.ChannelID, params.ThreadTS, expectedText); err != nil {
slog.Error("failed to update PR message",
Expand Down Expand Up @@ -1326,7 +1355,7 @@ func (c *Coordinator) updateMessageIfNeeded(ctx context.Context, params messageU
func (c *Coordinator) handlePullRequestFromSprinkler(
ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time,
) {
slog.Info("handling PR event from sprinkler using turnclient",
slog.Debug("handling PR event from sprinkler using turnclient",
logFieldOwner, owner,
logFieldRepo, repo,
"pr_number", prNumber,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bot/bot_sprinkler.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
return
}

slog.Info("accepted event for async processing",
slog.Debug("accepted event for async processing",
"organization", organization,
"type", event.Type,
"url", event.URL,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bot/dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (c *Coordinator) findDMInHistory(ctx context.Context, userID, prURL string)
// Returns (shouldQueue bool, sendAfter time.Time).
// Channel membership is determined by caller - if channelID is non-empty, user was in at least one channel.
func (c *Coordinator) shouldDelayNewDM(
ctx context.Context,
_ context.Context,
userID, channelID, channelName string,
owner, _ string,
) (bool, time.Time) {
Expand Down Expand Up @@ -526,7 +526,7 @@ func (c *Coordinator) sendDMNotificationsToTaggedUsers(
err := c.sendPRNotification(ctx, dmNotificationRequest{
UserID: userInfo.UserID,
ChannelID: channelID, // "delay" or empty based on channel membership
ChannelName: "", // not used
ChannelName: "", // not used
Owner: owner,
Repo: repo,
PRNumber: prNumber,
Expand Down
Loading