Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/reviewGOOSE/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (app *App) checkCache(cacheFile, url string, updatedAt time.Time) (cachedDa

// turnData fetches Turn API data with caching.
func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) (*turn.CheckResponse, bool, error) {
// If Turn API is disabled, return nil without error
if app.turnClient == nil {
slog.Debug("[TURN] Turn API disabled, skipping", "url", url)
return nil, false, nil
}

hasRunningTests := false
// Validate URL before processing
if err := safebrowse.ValidateURL(url); err != nil {
Expand Down
139 changes: 82 additions & 57 deletions cmd/reviewGOOSE/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,51 @@ func (app *App) initClients(ctx context.Context) error {
tc := oauth2.NewClient(ctx, ts)
app.client = github.NewClient(tc)

// Initialize Turn client using default backend
turnClient, err := turn.NewDefaultClient()
if err != nil {
return fmt.Errorf("create turn client: %w", err)
// Check for custom turn server hostname (for self-hosting)
// Set TURNSERVER=disabled to run without Turn API
turnServer := os.Getenv("TURNSERVER")
if turnServer == "disabled" {
slog.Info("Turn API disabled via TURNSERVER=disabled")
} else {
var turnClient *turn.Client
if turnServer != "" {
slog.Info("Using custom turn server", "hostname", turnServer)
turnClient, err = turn.NewClient("https://" + turnServer)
} else {
turnClient, err = turn.NewDefaultClient()
}
if err != nil {
return fmt.Errorf("create turn client: %w", err)
}
turnClient.SetAuthToken(token)
app.turnClient = turnClient
}
turnClient.SetAuthToken(token)
app.turnClient = turnClient

// Initialize sprinkler monitor for real-time events
app.sprinklerMonitor = newSprinklerMonitor(app, token)
// Check for custom sprinkler server hostname (for self-hosting)
// Set SPRINKLER=disabled to run without real-time events
sprinklerServer := os.Getenv("SPRINKLER")
if sprinklerServer == "disabled" {
slog.Info("Sprinkler disabled via SPRINKLER=disabled")
} else {
if sprinklerServer != "" {
slog.Info("Using custom sprinkler server", "hostname", sprinklerServer)
}
app.sprinklerMonitor = newSprinklerMonitor(app, token, sprinklerServer)
}

return nil
}

// initSprinklerOrgs fetches the user's organizations and starts sprinkler monitoring.
func (app *App) initSprinklerOrgs(ctx context.Context) error {
if app.client == nil || app.sprinklerMonitor == nil {
return errors.New("client or sprinkler not initialized")
if app.client == nil {
return errors.New("github client not initialized")
}
// If sprinkler is disabled, skip silently
if app.sprinklerMonitor == nil {
slog.Debug("[SPRINKLER] Sprinkler disabled, skipping org initialization")
return nil
}

// Get current user
Expand All @@ -80,22 +107,21 @@ func (app *App) initSprinklerOrgs(ctx context.Context) error {

// Fetch all orgs the user is a member of with retry
opts := &github.ListOptions{PerPage: 100}
var allOrgs []string
var orgs []string

for {
var orgs []*github.Organization
var page []*github.Organization
var resp *github.Response

err := retry.Do(func() error {
// Create timeout context for API call
apiCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

var retryErr error
orgs, resp, retryErr = app.client.Organizations.List(apiCtx, user, opts)
if retryErr != nil {
slog.Debug("[SPRINKLER] Organizations.List failed (will retry)", "error", retryErr, "page", opts.Page)
return retryErr
var err error
page, resp, err = app.client.Organizations.List(apiCtx, user, opts)
if err != nil {
slog.Debug("[SPRINKLER] Organizations.List failed (will retry)", "error", err, "page", opts.Page)
return err
}
return nil
},
Expand All @@ -115,9 +141,9 @@ func (app *App) initSprinklerOrgs(ctx context.Context) error {
return nil // Return nil to avoid blocking startup
}

for _, org := range orgs {
if org.Login != nil {
allOrgs = append(allOrgs, *org.Login)
for _, o := range page {
if o.Login != nil {
orgs = append(orgs, *o.Login)
}
}

Expand All @@ -129,12 +155,12 @@ func (app *App) initSprinklerOrgs(ctx context.Context) error {

slog.Info("[SPRINKLER] Discovered user organizations",
"user", user,
"orgs", allOrgs,
"count", len(allOrgs))
"orgs", orgs,
"count", len(orgs))

// Update sprinkler with all orgs at once
if len(allOrgs) > 0 {
app.sprinklerMonitor.updateOrgs(allOrgs)
if len(orgs) > 0 {
app.sprinklerMonitor.updateOrgs(orgs)
if err := app.sprinklerMonitor.start(ctx); err != nil {
return fmt.Errorf("start sprinkler: %w", err)
}
Expand Down Expand Up @@ -387,79 +413,78 @@ func (app *App) fetchPRsInternal(ctx context.Context) (incoming []PR, outgoing [
searchStart := time.Now()

// Run both queries in parallel
type queryResult struct {
type qResult struct {
err error
query string
issues []*github.Issue
}

queryResults := make(chan queryResult, 2)
results := make(chan qResult, 2)

// Query 1: PRs involving the user
go func() {
query := fmt.Sprintf("is:open is:pr involves:%s archived:false", user)
slog.Debug("[GITHUB] Searching for PRs", "query", query)
q := fmt.Sprintf("is:open is:pr involves:%s archived:false", user)
slog.Debug("[GITHUB] Searching for PRs", "query", q)

result, err := app.executeGitHubQuery(ctx, query, opts)
res, err := app.executeGitHubQuery(ctx, q, opts)
if err != nil {
queryResults <- queryResult{err: err, query: query}
results <- qResult{err: err, query: q}
} else {
queryResults <- queryResult{issues: result.Issues, query: query}
results <- qResult{issues: res.Issues, query: q}
}
}()

// Query 2: PRs in user-owned repos with no reviewers
go func() {
query := fmt.Sprintf("is:open is:pr user:%s review:none archived:false", user)
slog.Debug("[GITHUB] Searching for PRs", "query", query)
q := fmt.Sprintf("is:open is:pr user:%s review:none archived:false", user)
slog.Debug("[GITHUB] Searching for PRs", "query", q)

result, err := app.executeGitHubQuery(ctx, query, opts)
res, err := app.executeGitHubQuery(ctx, q, opts)
if err != nil {
queryResults <- queryResult{err: err, query: query}
results <- qResult{err: err, query: q}
} else {
queryResults <- queryResult{issues: result.Issues, query: query}
results <- qResult{issues: res.Issues, query: q}
}
}()

// Collect results from both queries
var allIssues []*github.Issue
seenURLs := make(map[string]bool)
var queryErrors []error
var issues []*github.Issue
seen := make(map[string]bool)
var errs []error

for range 2 {
result := <-queryResults
if result.err != nil {
slog.Error("[GITHUB] Query failed", "query", result.query, "error", result.err)
queryErrors = append(queryErrors, result.err)
// Continue processing other query results even if one fails
r := <-results
if r.err != nil {
slog.Error("[GITHUB] Query failed", "query", r.query, "error", r.err)
errs = append(errs, r.err)
continue
}
slog.Debug("[GITHUB] Query completed", "query", result.query, "prCount", len(result.issues))
slog.Debug("[GITHUB] Query completed", "query", r.query, "prCount", len(r.issues))

// Deduplicate PRs based on URL
for _, issue := range result.issues {
for _, issue := range r.issues {
url := issue.GetHTMLURL()
if !seenURLs[url] {
seenURLs[url] = true
allIssues = append(allIssues, issue)
if !seen[url] {
seen[url] = true
issues = append(issues, issue)
}
}
}
slog.Info("[GITHUB] Both searches completed", "duration", time.Since(searchStart), "uniquePRs", len(allIssues))
slog.Info("[GITHUB] Both searches completed", "duration", time.Since(searchStart), "uniquePRs", len(issues))

// If both queries failed, return an error
if len(queryErrors) == 2 {
return nil, nil, fmt.Errorf("all GitHub queries failed: %v", queryErrors)
if len(errs) == 2 {
return nil, nil, fmt.Errorf("all GitHub queries failed: %v", errs)
}

// Limit PRs for performance
if len(allIssues) > maxPRsToProcess {
slog.Info("Limiting PRs for performance", "limit", maxPRsToProcess, "total", len(allIssues))
allIssues = allIssues[:maxPRsToProcess]
if len(issues) > maxPRsToProcess {
slog.Info("Limiting PRs for performance", "limit", maxPRsToProcess, "total", len(issues))
issues = issues[:maxPRsToProcess]
}

// Process GitHub results immediately
for _, issue := range allIssues {
for _, issue := range issues {
if !issue.IsPullRequest() {
continue
}
Expand Down Expand Up @@ -503,7 +528,7 @@ func (app *App) fetchPRsInternal(ctx context.Context) (incoming []PR, outgoing [

// Fetch Turn API data
// Always synchronous now for simplicity - Turn API calls are fast with caching
app.fetchTurnDataSync(ctx, allIssues, user, &incoming, &outgoing)
app.fetchTurnDataSync(ctx, issues, user, &incoming, &outgoing)

return incoming, outgoing, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/reviewGOOSE/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
runningTestsCacheBypass = 90 * time.Minute // Don't cache PRs with running tests if fresher than this
maxPRsToProcess = 200
minUpdateInterval = 10 * time.Second
defaultUpdateInterval = 1 * time.Minute
defaultUpdateInterval = 2 * time.Minute
blockedPRIconDuration = 5 * time.Minute
maxRetryDelay = 2 * time.Minute
maxRetries = 10
Expand Down
Loading