|
5 | 5 | "errors" |
6 | 6 | "fmt" |
7 | 7 | "log/slog" |
| 8 | + "strings" |
8 | 9 | "time" |
9 | 10 |
|
10 | 11 | "github.com/codeGROOVE-dev/slacker/internal/github" |
@@ -49,7 +50,7 @@ func (c *Coordinator) PollAndReconcile(ctx context.Context) { |
49 | 50 | "pr_count", len(prs), |
50 | 51 | "will_check_each", true) |
51 | 52 |
|
52 | | - // Process each PR |
| 53 | + // Process each open PR |
53 | 54 | successCount := 0 |
54 | 55 | errorCount := 0 |
55 | 56 |
|
@@ -93,9 +94,73 @@ func (c *Coordinator) PollAndReconcile(ctx context.Context) { |
93 | 94 | } |
94 | 95 | } |
95 | 96 |
|
| 97 | + // Query closed/merged PRs in last hour to update existing threads |
| 98 | + closedPRs, err := gqlClient.ListClosedPRs(ctx, org, 1) |
| 99 | + if err != nil { |
| 100 | + slog.Warn("failed to poll closed PRs", |
| 101 | + "org", org, |
| 102 | + "error", err, |
| 103 | + "impact", "will retry next poll") |
| 104 | + } else { |
| 105 | + slog.Info("poll retrieved closed/merged PRs", |
| 106 | + "org", org, |
| 107 | + "pr_count", len(closedPRs), |
| 108 | + "will_update_threads", true) |
| 109 | + |
| 110 | + closedSuccessCount := 0 |
| 111 | + closedErrorCount := 0 |
| 112 | + |
| 113 | + for i := range closedPRs { |
| 114 | + pr := &closedPRs[i] |
| 115 | + |
| 116 | + // Create event key for this PR state change |
| 117 | + eventKey := fmt.Sprintf("poll_closed:%s:%s:%s", pr.URL, pr.State, pr.UpdatedAt.Format(time.RFC3339)) |
| 118 | + |
| 119 | + // Skip if already processed |
| 120 | + if c.stateStore.WasProcessed(eventKey) { |
| 121 | + slog.Debug("skipping closed PR - already processed", |
| 122 | + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), |
| 123 | + "state", pr.State) |
| 124 | + closedSuccessCount++ |
| 125 | + continue |
| 126 | + } |
| 127 | + |
| 128 | + // Update thread for this closed/merged PR |
| 129 | + if err := c.updateClosedPRThread(ctx, pr); err != nil { |
| 130 | + slog.Warn("failed to update closed PR thread", |
| 131 | + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), |
| 132 | + "state", pr.State, |
| 133 | + "error", err) |
| 134 | + closedErrorCount++ |
| 135 | + } else { |
| 136 | + // Mark as processed |
| 137 | + if err := c.stateStore.MarkProcessed(eventKey, 24*time.Hour); err != nil { |
| 138 | + slog.Warn("failed to mark closed PR event as processed", |
| 139 | + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), |
| 140 | + "error", err) |
| 141 | + } |
| 142 | + closedSuccessCount++ |
| 143 | + } |
| 144 | + |
| 145 | + // Rate limit |
| 146 | + select { |
| 147 | + case <-ctx.Done(): |
| 148 | + slog.Info("polling canceled during closed PR processing", "org", org) |
| 149 | + return |
| 150 | + case <-time.After(100 * time.Millisecond): |
| 151 | + } |
| 152 | + } |
| 153 | + |
| 154 | + slog.Info("closed PR processing complete", |
| 155 | + "org", org, |
| 156 | + "total_closed_prs", len(closedPRs), |
| 157 | + "updated", closedSuccessCount, |
| 158 | + "errors", closedErrorCount) |
| 159 | + } |
| 160 | + |
96 | 161 | slog.Info("poll cycle complete", |
97 | 162 | "org", org, |
98 | | - "total_prs", len(prs), |
| 163 | + "total_open_prs", len(prs), |
99 | 164 | "processed", successCount, |
100 | 165 | "errors", errorCount, |
101 | 166 | "next_poll", "5m") |
@@ -180,6 +245,100 @@ func (c *Coordinator) reconcilePR(ctx context.Context, pr *github.PRSnapshot) er |
180 | 245 | return nil |
181 | 246 | } |
182 | 247 |
|
| 248 | +// updateClosedPRThread updates Slack threads for a closed or merged PR. |
| 249 | +func (c *Coordinator) updateClosedPRThread(ctx context.Context, pr *github.PRSnapshot) error { |
| 250 | + prKey := fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number) |
| 251 | + slog.Debug("updating thread for closed/merged PR", |
| 252 | + "pr", prKey, |
| 253 | + "state", pr.State) |
| 254 | + |
| 255 | + channels := c.configManager.ChannelsForRepo(pr.Owner, pr.Repo) |
| 256 | + if len(channels) == 0 { |
| 257 | + slog.Debug("no channels configured for closed PR", |
| 258 | + "pr", prKey, |
| 259 | + "owner", pr.Owner, |
| 260 | + "repo", pr.Repo) |
| 261 | + return nil |
| 262 | + } |
| 263 | + |
| 264 | + n := 0 |
| 265 | + for _, ch := range channels { |
| 266 | + id := c.slack.ResolveChannelID(ctx, ch) |
| 267 | + if id == "" { |
| 268 | + slog.Debug("could not resolve channel ID for closed PR thread update", |
| 269 | + "channel_name", ch, |
| 270 | + "pr", prKey) |
| 271 | + continue |
| 272 | + } |
| 273 | + |
| 274 | + info, ok := c.stateStore.GetThread(pr.Owner, pr.Repo, pr.Number, id) |
| 275 | + if !ok { |
| 276 | + slog.Debug("no thread found for closed PR in channel", |
| 277 | + "pr", prKey, |
| 278 | + "channel", ch, |
| 279 | + "channel_id", id) |
| 280 | + continue |
| 281 | + } |
| 282 | + |
| 283 | + if err := c.updateThreadForClosedPR(ctx, pr, id, info); err != nil { |
| 284 | + slog.Warn("failed to update thread for closed PR", |
| 285 | + "pr", prKey, |
| 286 | + "channel", ch, |
| 287 | + "error", err) |
| 288 | + continue |
| 289 | + } |
| 290 | + |
| 291 | + n++ |
| 292 | + slog.Info("updated thread for closed/merged PR", |
| 293 | + "pr", prKey, |
| 294 | + "state", pr.State, |
| 295 | + "channel", ch, |
| 296 | + "thread_ts", info.ThreadTS) |
| 297 | + } |
| 298 | + |
| 299 | + if n == 0 { |
| 300 | + return errors.New("no threads found or updated for closed PR") |
| 301 | + } |
| 302 | + |
| 303 | + return nil |
| 304 | +} |
| 305 | + |
| 306 | +// updateThreadForClosedPR updates a single thread's message to reflect closed/merged state. |
| 307 | +func (c *Coordinator) updateThreadForClosedPR(ctx context.Context, pr *github.PRSnapshot, channelID string, info ThreadInfo) error { |
| 308 | + var emoji, msg string |
| 309 | + switch pr.State { |
| 310 | + case "MERGED": |
| 311 | + emoji = ":rocket:" |
| 312 | + msg = "This PR was merged" |
| 313 | + case "CLOSED": |
| 314 | + emoji = ":x:" |
| 315 | + msg = "This PR was closed without merging" |
| 316 | + default: |
| 317 | + return fmt.Errorf("unexpected PR state: %s", pr.State) |
| 318 | + } |
| 319 | + |
| 320 | + // Replace emoji prefix in message (format: ":emoji: Title • repo#123 by @user") |
| 321 | + text := info.MessageText |
| 322 | + if i := strings.Index(text, " "); i == -1 { |
| 323 | + text = emoji + " " + text |
| 324 | + } else { |
| 325 | + text = emoji + text[i:] |
| 326 | + } |
| 327 | + |
| 328 | + if err := c.slack.UpdateMessage(ctx, channelID, info.ThreadTS, text); err != nil { |
| 329 | + return fmt.Errorf("failed to update message: %w", err) |
| 330 | + } |
| 331 | + |
| 332 | + // Post follow-up comment (don't fail if this errors - main update succeeded) |
| 333 | + if err := c.slack.PostThreadReply(ctx, channelID, info.ThreadTS, msg); err != nil { |
| 334 | + slog.Debug("failed to post follow-up comment for closed PR", |
| 335 | + "pr", fmt.Sprintf("%s/%s#%d", pr.Owner, pr.Repo, pr.Number), |
| 336 | + "error", err) |
| 337 | + } |
| 338 | + |
| 339 | + return nil |
| 340 | +} |
| 341 | + |
183 | 342 | // StartupReconciliation runs once at startup to catch up on any missed notifications. |
184 | 343 | // This ensures that if the service was down, we still notify about PRs that need attention. |
185 | 344 | func (c *Coordinator) StartupReconciliation(ctx context.Context) { |
|
0 commit comments