Skip to content

Commit bcdc683

Browse files
authored
Merge pull request #52 from codeGROOVE-dev/reliable
lint
2 parents 0e658fe + f816b8b commit bcdc683

File tree

21 files changed

+333
-140
lines changed

21 files changed

+333
-140
lines changed

cmd/registrar/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func main() {
3232
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
3333
AddSource: true,
3434
Level: slog.LevelInfo,
35-
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
35+
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
3636
// Shorten source paths to relative paths for cleaner logs
3737
if a.Key == slog.SourceKey {
3838
if source, ok := a.Value.Any().(*slog.Source); ok {

cmd/server/main.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ import (
3333
// Returns empty string if not running on GCP or detection fails.
3434
func detectGCPProjectID(ctx context.Context) string {
3535
// Try metadata service (works on Cloud Run, GCE, GKE, Cloud Functions)
36-
client := &http.Client{Timeout: 2 * time.Second}
36+
httpClient := &http.Client{Timeout: 2 * time.Second}
37+
//nolint:revive // GCP metadata service is internal and always accessed via HTTP
3738
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
3839
"http://metadata.google.internal/computeMetadata/v1/project/project-id", http.NoBody)
3940
if err != nil {
4041
return ""
4142
}
4243
req.Header.Set("Metadata-Flavor", "Google")
4344

44-
resp, err := client.Do(req)
45+
resp, err := httpClient.Do(req)
4546
if err != nil {
4647
slog.Debug("metadata service not available (not running on GCP?)", "error", err)
4748
return ""
@@ -87,7 +88,7 @@ func main() {
8788
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
8889
AddSource: true,
8990
Level: slog.LevelInfo,
90-
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
91+
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
9192
// Shorten source paths to relative paths for cleaner logs
9293
if a.Key == slog.SourceKey {
9394
if source, ok := a.Value.Any().(*slog.Source); ok {
@@ -121,6 +122,7 @@ func main() {
121122
os.Exit(exitCode)
122123
}
123124

125+
//nolint:revive,maintidx // Complex initialization requires length for clarity and maintainability
124126
func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfig) int {
125127
// Handle graceful shutdown.
126128
sigChan := make(chan os.Signal, 1)
@@ -160,6 +162,7 @@ func run(ctx context.Context, cancel context.CancelFunc, cfg *config.ServerConfi
160162
slackManager := slack.NewManager(cfg.SlackSigningSecret)
161163

162164
// Initialize state store (in-memory + Datastore or JSON for persistence).
165+
//nolint:interfacebloat // Interface mirrors state.Store for local type safety
163166
var stateStore interface {
164167
Thread(owner, repo string, number int, channelID string) (state.ThreadInfo, bool)
165168
SaveThread(owner, repo string, number int, channelID string, info state.ThreadInfo) error
@@ -668,6 +671,8 @@ func (cm *coordinatorManager) handleRefreshInstallations(ctx context.Context) {
668671
// runBotCoordinators manages bot coordinators for all GitHub installations.
669672
// It spawns one coordinator per org and refreshes the list every 5 minutes.
670673
// Failed coordinators are automatically restarted every minute.
674+
//
675+
//nolint:interfacebloat // Interface mirrors state.Store for local type safety
671676
func runBotCoordinators(
672677
ctx context.Context,
673678
slackManager *slack.Manager,

internal/bot/bot.go

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ type prContext struct {
4141
}
4242

4343
// ThreadCache manages PR thread IDs for a workspace.
44+
//
45+
//nolint:govet // Field order optimized for logical grouping over memory alignment
4446
type ThreadCache struct {
45-
prThreads map[string]ThreadInfo // "owner/repo#123" -> thread info
4647
mu sync.RWMutex
47-
creationLock sync.Mutex // Prevents concurrent creation of the same PR thread
48-
creating map[string]bool // Track PRs currently being created
48+
creationLock sync.Mutex // Prevents concurrent creation of the same PR thread
49+
prThreads map[string]ThreadInfo // "owner/repo#123" -> thread info
50+
creating map[string]bool // Track PRs currently being created
4951
}
5052

5153
// ThreadInfo is an alias to state.ThreadInfo to avoid duplication.
@@ -82,18 +84,20 @@ func (tc *ThreadCache) Cleanup(maxAge time.Duration) {
8284
}
8385

8486
// Coordinator coordinates between GitHub, Slack, and notifications for a single org.
87+
//
88+
//nolint:govet // Field order optimized for logical grouping over memory alignment
8589
type Coordinator struct {
90+
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
91+
stateStore StateStore // Persistent state across restarts
92+
sprinklerURL string
93+
workspaceName string // Track workspace name for better logging
8694
slack *slackpkg.Client
8795
github *github.Client
8896
configManager *config.Manager
8997
notifier *notify.Manager
9098
userMapper *usermapping.Service
91-
sprinklerURL string
92-
threadCache *ThreadCache // In-memory cache for fast lookups
93-
stateStore StateStore // Persistent state across restarts
94-
workspaceName string // Track workspace name for better logging
95-
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
96-
processingEvents sync.WaitGroup // Tracks in-flight event processing for graceful shutdown
99+
threadCache *ThreadCache // In-memory cache for fast lookups
100+
eventSemaphore chan struct{} // Limits concurrent event processing (prevents overwhelming APIs)
97101
}
98102

99103
// StateStore interface for persistent state - allows dependency injection for testing.
@@ -157,16 +161,40 @@ func New(
157161
return c
158162
}
159163

164+
// saveThread persists thread info to both cache and persistent storage.
165+
// This ensures threads survive restarts and are available for closed PR updates.
166+
func (c *Coordinator) saveThread(owner, repo string, number int, channelID string, info ThreadInfo) {
167+
// Save to in-memory cache for fast lookups
168+
key := fmt.Sprintf("%s/%s#%d:%s", owner, repo, number, channelID)
169+
c.threadCache.Set(key, info)
170+
171+
// Persist to state store for cross-instance sharing and restart recovery
172+
if err := c.stateStore.SaveThread(owner, repo, number, channelID, info); err != nil {
173+
slog.Warn("failed to persist thread to state store",
174+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, number),
175+
"channel_id", channelID,
176+
"error", err,
177+
"impact", "thread updates may fail after restart")
178+
} else {
179+
slog.Debug("persisted thread to state store",
180+
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, number),
181+
"channel_id", channelID,
182+
"thread_ts", info.ThreadTS)
183+
}
184+
}
185+
160186
// findOrCreatePRThread finds an existing thread or creates a new one for a PR.
161-
// Returns (threadTS, wasNewlyCreated, error).
187+
// Returns (threadTS, wasNewlyCreated, currentMessageText, error).
188+
//
189+
//nolint:revive // Four return values needed to track thread state and creation status
162190
func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner, repo string, prNumber int, prState string, pullRequest struct {
163-
User struct {
191+
CreatedAt time.Time `json:"created_at"`
192+
User struct {
164193
Login string `json:"login"`
165194
} `json:"user"`
166-
HTMLURL string `json:"html_url"`
167-
Title string `json:"title"`
168-
Number int `json:"number"`
169-
CreatedAt time.Time `json:"created_at"`
195+
HTMLURL string `json:"html_url"`
196+
Title string `json:"title"`
197+
Number int `json:"number"`
170198
}, checkResult *turn.CheckResponse,
171199
) (threadTS string, wasNewlyCreated bool, currentMessageText string, err error) {
172200
// Use cache key that includes channel ID to support multiple channels per PR
@@ -212,8 +240,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
212240
logFieldChannel, channelID,
213241
"current_message_preview", initialSearchText[:min(100, len(initialSearchText))])
214242

215-
// Cache the found thread with its current message text
216-
c.threadCache.Set(cacheKey, ThreadInfo{
243+
// Save the found thread (cache + persist)
244+
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
217245
ThreadTS: initialSearchTS,
218246
ChannelID: channelID,
219247
LastState: prState,
@@ -289,8 +317,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
289317
"current_message_preview", crossInstanceText[:min(100, len(crossInstanceText))],
290318
"note", "this prevented duplicate thread creation during rolling deployment")
291319

292-
// Cache it and return
293-
c.threadCache.Set(cacheKey, ThreadInfo{
320+
// Save it and return (cache + persist)
321+
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
294322
ThreadTS: crossInstanceCheckTS,
295323
ChannelID: channelID,
296324
LastState: prState,
@@ -312,8 +340,8 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
312340
return "", false, "", fmt.Errorf("failed to create PR thread: %w", err)
313341
}
314342

315-
// Cache the new thread with its message text
316-
c.threadCache.Set(cacheKey, ThreadInfo{
343+
// Save the new thread (cache + persist)
344+
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
317345
ThreadTS: newThreadTS,
318346
ChannelID: channelID,
319347
LastState: prState,
@@ -337,7 +365,7 @@ func (c *Coordinator) findOrCreatePRThread(ctx context.Context, channelID, owner
337365
// Note: This is more expensive than search API but works reliably with basic bot permissions.
338366
// Results are cached by the calling code to minimize API calls.
339367
// Returns (threadTS, currentMessageText) - both empty if not found.
340-
func (c *Coordinator) searchForPRThread(ctx context.Context, channelID, prURL string, prCreatedAt time.Time) (string, string) {
368+
func (c *Coordinator) searchForPRThread(ctx context.Context, channelID, prURL string, prCreatedAt time.Time) (threadTS string, messageText string) {
341369
slog.Info("searching for existing PR thread using channel history",
342370
logFieldChannel, channelID,
343371
"pr_url", prURL)
@@ -789,6 +817,10 @@ func (*Coordinator) extractStateFromTurnclient(checkResult *turn.CheckResponse)
789817
func (*Coordinator) extractBlockedUsersFromTurnclient(checkResult *turn.CheckResponse) []string {
790818
var blockedUsers []string
791819
for user := range checkResult.Analysis.NextAction {
820+
// Skip _system sentinel value - it indicates processing state, not an actual user
821+
if user == "_system" {
822+
continue
823+
}
792824
blockedUsers = append(blockedUsers, user)
793825
}
794826
return blockedUsers
@@ -899,10 +931,11 @@ func (c *Coordinator) updateDMMessagesForPR(ctx context.Context, pr prUpdateInfo
899931

900932
for _, slackUserID := range slackUserIDs {
901933
if err := c.slack.UpdateDMMessage(ctx, slackUserID, prURL, message); err != nil {
902-
slog.Debug("failed to update DM message",
934+
slog.Warn("failed to update DM message",
903935
"user", slackUserID,
904936
"pr", fmt.Sprintf("%s/%s#%d", owner, repo, prNumber),
905937
"error", err,
938+
"impact", "user sees stale PR state in DM",
906939
"reason", "DM may not exist or too old")
907940
skippedCount++
908941
} else {
@@ -1140,28 +1173,32 @@ func (c *Coordinator) processPRForChannel(
11401173
// Find or create thread for this PR in this channel
11411174
// Convert to the expected struct format
11421175
pullRequestStruct := struct {
1143-
User struct {
1176+
CreatedAt time.Time `json:"created_at"`
1177+
User struct {
11441178
Login string `json:"login"`
11451179
} `json:"user"`
1146-
HTMLURL string `json:"html_url"`
1147-
Title string `json:"title"`
1148-
Number int `json:"number"`
1149-
CreatedAt time.Time `json:"created_at"`
1180+
HTMLURL string `json:"html_url"`
1181+
Title string `json:"title"`
1182+
Number int `json:"number"`
11501183
}{
11511184
User: event.PullRequest.User,
11521185
HTMLURL: event.PullRequest.HTMLURL,
11531186
Title: event.PullRequest.Title,
11541187
Number: event.PullRequest.Number,
11551188
CreatedAt: event.PullRequest.CreatedAt,
11561189
}
1157-
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult)
1190+
threadTS, wasNewlyCreated, currentText, err := c.findOrCreatePRThread(
1191+
ctx, channelID, owner, repo, prNumber, prState, pullRequestStruct, checkResult,
1192+
)
11581193
if err != nil {
11591194
slog.Error("failed to find or create PR thread",
11601195
"workspace", c.workspaceName,
11611196
logFieldPR, fmt.Sprintf(prFormatString, owner, repo, prNumber),
11621197
"channel", channelDisplay,
11631198
"channel_id", channelID,
11641199
"error", err,
1200+
"impact", "channel_update_skipped_will_retry_via_polling",
1201+
"next_poll_in", "5m",
11651202
"will_continue_with_next_channel", true)
11661203
return
11671204
}
@@ -1240,11 +1277,12 @@ func (c *Coordinator) processPRForChannel(
12401277
"channel", channelDisplay,
12411278
"channel_id", channelID,
12421279
"thread_ts", threadTS,
1243-
"error", err)
1280+
"error", err,
1281+
"impact", "message_update_skipped_will_retry_via_polling",
1282+
"next_poll_in", "5m")
12441283
} else {
1245-
// Update cache with new message text
1246-
cacheKey := fmt.Sprintf("%s/%s#%d:%s", owner, repo, prNumber, channelID)
1247-
c.threadCache.Set(cacheKey, ThreadInfo{
1284+
// Save updated thread info (cache + persist)
1285+
c.saveThread(owner, repo, prNumber, channelID, ThreadInfo{
12481286
ThreadTS: threadTS,
12491287
ChannelID: channelID,
12501288
LastState: prState,
@@ -1411,7 +1449,9 @@ func (c *Coordinator) handlePullRequestFromSprinkler(
14111449
// handlePullRequestReviewFromSprinkler handles PR review events from sprinkler.
14121450
// Reviews update PR state (approved, changes requested, etc), so we treat them
14131451
// like regular pull_request events and let turnclient analyze the current state.
1414-
func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time) {
1452+
func (c *Coordinator) handlePullRequestReviewFromSprinkler(
1453+
ctx context.Context, owner, repo string, prNumber int, sprinklerURL string, eventTimestamp time.Time,
1454+
) {
14151455
slog.Info("handling PR review event from sprinkler",
14161456
logFieldOwner, owner,
14171457
logFieldRepo, repo,
@@ -1427,13 +1467,13 @@ func (c *Coordinator) handlePullRequestReviewFromSprinkler(ctx context.Context,
14271467
// Critical performance optimization: Posts thread immediately WITHOUT user mentions,
14281468
// then updates asynchronously once email lookups complete (which take 13-20 seconds each).
14291469
func (c *Coordinator) createPRThread(ctx context.Context, channel, owner, repo string, number int, prState string, pr struct {
1430-
User struct {
1470+
CreatedAt time.Time `json:"created_at"`
1471+
User struct {
14311472
Login string `json:"login"`
14321473
} `json:"user"`
1433-
HTMLURL string `json:"html_url"`
1434-
Title string `json:"title"`
1435-
Number int `json:"number"`
1436-
CreatedAt time.Time `json:"created_at"`
1474+
HTMLURL string `json:"html_url"`
1475+
Title string `json:"title"`
1476+
Number int `json:"number"`
14371477
}, checkResult *turn.CheckResponse,
14381478
) (threadTS string, messageText string, err error) {
14391479
// Get state-based prefix

internal/bot/bot_sprinkler.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,14 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
6767
"event_key", eventKey,
6868
"reason", "deduplication_race")
6969
} else {
70-
slog.Warn("failed to mark event as processed - database error",
70+
slog.Error("failed to mark event as processed - database error",
7171
"organization", organization,
7272
"type", event.Type,
7373
"url", event.URL,
7474
"event_key", eventKey,
7575
"error", err,
76-
"impact", "will_skip_event")
76+
"impact", "event_dropped_will_retry_via_polling",
77+
"next_poll_in", "5m")
7778
}
7879
return
7980
}
@@ -161,16 +162,24 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
161162
Timestamp: event.Timestamp,
162163
}
163164

164-
if err := c.processEvent(ctx, msg); err != nil {
165+
// Add timeout to prevent hanging on external API failures
166+
processCtx, processCancel := context.WithTimeout(ctx, 30*time.Second)
167+
defer processCancel()
168+
169+
if err := c.processEvent(processCtx, msg); err != nil {
170+
timedOut := errors.Is(err, context.DeadlineExceeded)
165171
slog.Error("error processing event",
166172
"organization", organization,
167173
"error", err,
168174
"type", event.Type,
169175
"url", event.URL,
170-
"repo", repo)
176+
"repo", repo,
177+
"timed_out", timedOut,
178+
"impact", "event_dropped_will_retry_via_polling",
179+
"next_poll_in", "5m")
171180
// Event already marked as processed before goroutine started.
172181
// Failed processing won't be retried automatically.
173-
// This is intentional - we don't want infinite retries of broken events.
182+
// Polling will catch this within 5 minutes for open PRs.
174183
return
175184
}
176185

@@ -184,6 +193,8 @@ func (c *Coordinator) handleSprinklerEvent(ctx context.Context, event client.Eve
184193

185194
// waitForEventProcessing waits for all in-flight events to complete during shutdown.
186195
// Returns immediately if no events are being processed.
196+
//
197+
//nolint:unparam // maxWait parameter provides flexibility for different shutdown scenarios
187198
func (c *Coordinator) waitForEventProcessing(organization string, maxWait time.Duration) {
188199
// Check if any events are being processed
189200
queueLen := len(c.eventSemaphore)
@@ -252,6 +263,8 @@ func (c *Coordinator) handleAuthError(
252263
}
253264

254265
// RunWithSprinklerClient runs the bot using the official sprinkler client library.
266+
//
267+
//nolint:revive,maintidx // Complex retry/reconnection logic requires length for robustness
255268
func (c *Coordinator) RunWithSprinklerClient(ctx context.Context) error {
256269
slog.Info("starting bot coordinator with sprinkler client")
257270

internal/bot/dedup_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func TestConcurrentEventDeduplicationStress(t *testing.T) {
264264
var wg sync.WaitGroup
265265
wg.Add(numConcurrentEvents)
266266

267-
handleEvent := func(id int) {
267+
handleEvent := func(_ int) {
268268
defer wg.Done()
269269

270270
// Check if currently being processed
@@ -299,7 +299,7 @@ func TestConcurrentEventDeduplicationStress(t *testing.T) {
299299
}
300300

301301
// Launch all goroutines simultaneously
302-
for i := 0; i < numConcurrentEvents; i++ {
302+
for i := range numConcurrentEvents {
303303
go handleEvent(i)
304304
}
305305

0 commit comments

Comments
 (0)