Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 78 additions & 98 deletions cmd/reviewGOOSE/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,109 +24,111 @@ type cacheEntry struct {
}

// checkCache checks the cache for a PR and returns the cached data if valid.
// Returns (cachedData, cacheHit, hasRunningTests).
func (app *App) checkCache(cacheFile, url string, updatedAt time.Time) (cachedData *turn.CheckResponse, cacheHit bool, hasRunningTests bool) {
fileData, err := os.ReadFile(cacheFile)
// Returns (data, hit, running) where running indicates incomplete tests.
func (app *App) checkCache(path, url string, updatedAt time.Time) (data *turn.CheckResponse, hit, running bool) {
b, err := os.ReadFile(path)
if err != nil {
if !os.IsNotExist(err) {
slog.Debug("[CACHE] Cache file read error", "url", url, "error", err)
}
return nil, false, false
}

var entry cacheEntry
if err := json.Unmarshal(fileData, &entry); err != nil {
var e cacheEntry
if err := json.Unmarshal(b, &e); err != nil {
slog.Warn("Failed to unmarshal cache data", "url", url, "error", err)
// Remove corrupted cache file
if err := os.Remove(cacheFile); err != nil {
slog.Error("Failed to remove corrupted cache file", "error", err)
if err := os.Remove(path); err != nil {
slog.Debug("Failed to remove corrupted cache file", "error", err)
}
return nil, false, false
}
if e.Data == nil {
slog.Warn("Cache entry missing data", "url", url)
if err := os.Remove(path); err != nil {
slog.Debug("Failed to remove corrupted cache file", "error", err)
}
return nil, false, false
}

// Determine TTL based on test state - use shorter TTL for incomplete tests
testState := entry.Data.PullRequest.TestState
isTestIncomplete := testState == "running" || testState == "queued" || testState == "pending"
state := e.Data.PullRequest.TestState
incomplete := state == "running" || state == "queued" || state == "pending"
ttl := cacheTTL
if isTestIncomplete {
if incomplete {
ttl = runningTestsCacheTTL
}

// Check if cache is expired or PR updated
if time.Since(entry.CachedAt) >= ttl || !entry.UpdatedAt.Equal(updatedAt) {
// Log why cache was invalid
if !entry.UpdatedAt.Equal(updatedAt) {
if time.Since(e.CachedAt) >= ttl || !e.UpdatedAt.Equal(updatedAt) {
if !e.UpdatedAt.Equal(updatedAt) {
slog.Debug("[CACHE] Cache miss - PR updated",
"url", url,
"cached_pr_time", entry.UpdatedAt.Format(time.RFC3339),
"cached_pr_time", e.UpdatedAt.Format(time.RFC3339),
"current_pr_time", updatedAt.Format(time.RFC3339))
} else {
slog.Debug("[CACHE] Cache miss - TTL expired",
"url", url,
"cached_at", entry.CachedAt.Format(time.RFC3339),
"cache_age", time.Since(entry.CachedAt).Round(time.Second),
"cached_at", e.CachedAt.Format(time.RFC3339),
"cache_age", time.Since(e.CachedAt).Round(time.Second),
"ttl", ttl,
"test_state", testState)
"test_state", state)
}
return nil, false, isTestIncomplete
return nil, false, incomplete
}

// Check for incomplete tests that should invalidate cache and trigger Turn API cache bypass
cacheAge := time.Since(entry.CachedAt)
if entry.Data != nil && isTestIncomplete && cacheAge < runningTestsCacheBypass {
// Invalidate cache for incomplete tests on recently-updated PRs to catch completion
// Skip this for PRs not updated in over an hour - their pending tests are likely stale
age := time.Since(e.CachedAt)
if incomplete && age < runningTestsCacheBypass && time.Since(updatedAt) < time.Hour {
slog.Debug("[CACHE] Cache invalidated - tests incomplete and cache entry is fresh",
"url", url,
"test_state", testState,
"cache_age", cacheAge.Round(time.Minute),
"cached_at", entry.CachedAt.Format(time.RFC3339))
"test_state", state,
"cache_age", age.Round(time.Minute),
"cached_at", e.CachedAt.Format(time.RFC3339))
return nil, false, true
}

// Cache hit
slog.Debug("[CACHE] Cache hit",
"url", url,
"cached_at", entry.CachedAt.Format(time.RFC3339),
"cache_age", time.Since(entry.CachedAt).Round(time.Second),
"pr_updated_at", entry.UpdatedAt.Format(time.RFC3339))
"cached_at", e.CachedAt.Format(time.RFC3339),
"cache_age", time.Since(e.CachedAt).Round(time.Second),
"pr_updated_at", e.UpdatedAt.Format(time.RFC3339))
if app.healthMonitor != nil {
app.healthMonitor.recordCacheAccess(true)
}
return entry.Data, true, false
return e.Data, true, false
}

// turnData fetches Turn API data with caching.
func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) (*turn.CheckResponse, bool, error) {
// If Turn API is disabled, return nil without error
if app.turnClient == nil {
slog.Debug("[TURN] Turn API disabled, skipping", "url", url)
return nil, false, nil
}

hasRunningTests := false
// Validate URL before processing
if err := safebrowse.ValidateURL(url); err != nil {
return nil, false, fmt.Errorf("invalid URL: %w", err)
}

// Create cache key from URL and updated timestamp
key := fmt.Sprintf("%s-%s", url, updatedAt.Format(time.RFC3339))
hash := sha256.Sum256([]byte(key))
cacheFile := filepath.Join(app.cacheDir, hex.EncodeToString(hash[:])[:16]+".json")
h := sha256.Sum256([]byte(key))
path := filepath.Join(app.cacheDir, hex.EncodeToString(h[:])[:16]+".json")

// Log the cache key details
slog.Debug("[CACHE] Checking cache",
"url", url,
"updated_at", updatedAt.Format(time.RFC3339),
"cache_key", key,
"cache_file", filepath.Base(cacheFile))
"cache_file", filepath.Base(path))

// Skip cache if --no-cache flag is set
// Check cache unless --no-cache flag is set
var running bool
if !app.noCache {
if cachedData, cacheHit, runningTests := app.checkCache(cacheFile, url, updatedAt); cacheHit {
return cachedData, true, nil
} else if runningTests {
hasRunningTests = true
data, hit, r := app.checkCache(path, url, updatedAt)
if hit {
return data, true, nil
}
running = r
}

// Cache miss, fetch from API
Expand All @@ -144,26 +146,25 @@ func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) (
// Use exponential backoff with jitter for Turn API calls
var data *turn.CheckResponse
err := retry.Do(func() error {
// Create timeout context for Turn API call
turnCtx, cancel := context.WithTimeout(ctx, turnAPITimeout)
tctx, cancel := context.WithTimeout(ctx, turnAPITimeout)
defer cancel()

// For PRs with running tests, send current time to bypass Turn server cache
timestampToSend := updatedAt
if hasRunningTests {
timestampToSend = time.Now()
ts := updatedAt
if running {
ts = time.Now()
slog.Debug("[TURN] Using current timestamp for PR with running tests to bypass Turn server cache",
"url", url,
"pr_updated_at", updatedAt.Format(time.RFC3339),
"timestamp_sent", timestampToSend.Format(time.RFC3339))
"timestamp_sent", ts.Format(time.RFC3339))
}

var err error
slog.Debug("[TURN] Making API call",
"url", url,
"user", app.currentUser.GetLogin(),
"pr_updated_at", timestampToSend.Format(time.RFC3339))
data, err = app.turnClient.Check(turnCtx, url, app.currentUser.GetLogin(), timestampToSend)
"pr_updated_at", ts.Format(time.RFC3339))
var err error
data, err = app.turnClient.Check(tctx, url, app.currentUser.GetLogin(), ts)
if err != nil {
slog.Warn("Turn API error (will retry)", "error", err)
return err
Expand All @@ -172,7 +173,7 @@ func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) (
return nil
},
retry.Attempts(maxRetries),
retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)), // Add jitter for better backoff distribution
retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)),
retry.MaxDelay(maxRetryDelay),
retry.OnRetry(func(n uint, err error) {
slog.Warn("[TURN] API retry", "attempt", n+1, "maxRetries", maxRetries, "url", url, "error", err)
Expand All @@ -191,7 +192,6 @@ func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) (
app.healthMonitor.recordAPICall(true)
}

// Log Turn API response for debugging
if data != nil {
slog.Info("[TURN] API response details",
"url", url,
Expand All @@ -201,38 +201,21 @@ func (app *App) turnData(ctx context.Context, url string, updatedAt time.Time) (
"pending_checks", len(data.PullRequest.CheckSummary.Pending))
}

// Save to cache (don't fail if caching fails) - skip if --no-cache is set
// Cache PRs with incomplete tests using short TTL to catch completion quickly
// Save to cache (don't fail if caching fails)
if !app.noCache && data != nil {
testState := data.PullRequest.TestState
isTestIncomplete := testState == "running" || testState == "queued" || testState == "pending"

entry := cacheEntry{
Data: data,
CachedAt: time.Now(),
UpdatedAt: updatedAt,
}
if cacheData, err := json.Marshal(entry); err != nil {
e := cacheEntry{Data: data, CachedAt: time.Now(), UpdatedAt: updatedAt}
b, err := json.Marshal(e)
if err != nil {
slog.Error("Failed to marshal cache data", "url", url, "error", err)
} else if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
slog.Error("Failed to create cache directory", "error", err)
} else if err := os.WriteFile(path, b, 0o600); err != nil {
slog.Error("Failed to write cache file", "error", err)
} else {
// Ensure cache directory exists with secure permissions
if err := os.MkdirAll(filepath.Dir(cacheFile), 0o700); err != nil {
slog.Error("Failed to create cache directory", "error", err)
} else if err := os.WriteFile(cacheFile, cacheData, 0o600); err != nil {
slog.Error("Failed to write cache file", "error", err)
} else {
ttl := cacheTTL
if isTestIncomplete {
ttl = runningTestsCacheTTL
}
slog.Debug("[CACHE] Saved to cache",
"url", url,
"cached_at", entry.CachedAt.Format(time.RFC3339),
"pr_updated_at", entry.UpdatedAt.Format(time.RFC3339),
"ttl", ttl,
"test_state", testState,
"cache_file", filepath.Base(cacheFile))
}
slog.Debug("[CACHE] Saved to cache",
"url", url,
"cache_file", filepath.Base(path),
"test_state", data.PullRequest.TestState)
}
}

Expand All @@ -247,32 +230,29 @@ func (app *App) cleanupOldCache() {
return
}

var cleanupCount, errorCount int
for _, entry := range entries {
if !strings.HasSuffix(entry.Name(), ".json") {
var cleaned, errs int
for _, e := range entries {
if !strings.HasSuffix(e.Name(), ".json") {
continue
}

info, err := entry.Info()
info, err := e.Info()
if err != nil {
slog.Error("Failed to get file info for cache entry", "entry", entry.Name(), "error", err)
errorCount++
slog.Error("Failed to get file info for cache entry", "entry", e.Name(), "error", err)
errs++
continue
}

// Remove cache files older than cleanup interval (15 days)
if time.Since(info.ModTime()) > cacheCleanupInterval {
filePath := filepath.Join(app.cacheDir, entry.Name())
if err := os.Remove(filePath); err != nil {
slog.Error("Failed to remove old cache file", "file", filePath, "error", err)
errorCount++
p := filepath.Join(app.cacheDir, e.Name())
if err := os.Remove(p); err != nil {
slog.Error("Failed to remove old cache file", "file", p, "error", err)
errs++
} else {
cleanupCount++
cleaned++
}
}
}

if cleanupCount > 0 || errorCount > 0 {
slog.Info("Cache cleanup completed", "removed", cleanupCount, "errors", errorCount)
if cleaned > 0 || errs > 0 {
slog.Info("Cache cleanup completed", "removed", cleaned, "errors", errs)
}
}
15 changes: 11 additions & 4 deletions cmd/reviewGOOSE/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ const (
ancientPRThreshold = 24 * time.Hour // Refuse to notify for PRs with no activity in this long (safety check)
)

// simplifySource transforms slog source attributes to show only filename:line.
func simplifySource(_ []string, a slog.Attr) slog.Attr {
if a.Key == slog.SourceKey {
if s, ok := a.Value.Any().(*slog.Source); ok {
a.Value = slog.StringValue(fmt.Sprintf("%s:%d", filepath.Base(s.File), s.Line))
}
}
return a
}

// PR represents a pull request with metadata.
type PR struct {
UpdatedAt time.Time
Expand Down Expand Up @@ -195,10 +205,7 @@ func main() {
if debugMode {
logLevel = slog.LevelDebug
}
opts := &slog.HandlerOptions{
AddSource: true,
Level: logLevel,
}
opts := &slog.HandlerOptions{AddSource: true, Level: logLevel, ReplaceAttr: simplifySource}
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, opts)))
slog.Info("Starting Goose", "version", getVersion(), "commit", commit, "date", date)
slog.Info("Configuration", "update_interval", updateInterval, "max_retries", maxRetries, "max_delay", maxRetryDelay)
Expand Down
2 changes: 1 addition & 1 deletion cmd/reviewGOOSE/sprinkler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (sm *sprinklerMonitor) start(ctx context.Context) error {
ServerURL: "wss://" + serverAddr + "/ws",
Token: sm.token,
Organization: "*", // Monitor all orgs
EventTypes: []string{"pull_request"},
EventTypes: []string{"*"},
UserEventsOnly: false,
Verbose: false,
NoReconnect: false,
Expand Down
Loading