diff --git a/internal/server/server.go b/internal/server/server.go index bf8d99d..a06c2b5 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -132,9 +132,10 @@ type CalculateRequest struct { // //nolint:govet // fieldalignment: API struct field order optimized for readability type CalculateResponse struct { - Breakdown cost.Breakdown `json:"breakdown"` - Timestamp time.Time `json:"timestamp"` - Commit string `json:"commit"` + Breakdown cost.Breakdown `json:"breakdown"` + Timestamp time.Time `json:"timestamp"` + Commit string `json:"commit"` + SecondsInState map[string]int `json:"seconds_in_state,omitempty"` // Only populated when using turnserver } // RepoSampleRequest represents a request to sample and calculate costs for a repository. @@ -162,24 +163,26 @@ type OrgSampleRequest struct { // //nolint:govet // fieldalignment: API struct field order optimized for readability type SampleResponse struct { - Extrapolated cost.ExtrapolatedBreakdown `json:"extrapolated"` - Timestamp time.Time `json:"timestamp"` - Commit string `json:"commit"` + Extrapolated cost.ExtrapolatedBreakdown `json:"extrapolated"` + Timestamp time.Time `json:"timestamp"` + Commit string `json:"commit"` + SecondsInState map[string]int `json:"seconds_in_state,omitempty"` // Aggregated across all sampled PRs } // ProgressUpdate represents a progress update for streaming responses. // //nolint:govet // fieldalignment: API struct field order optimized for readability type ProgressUpdate struct { - Type string `json:"type"` // "fetching", "processing", "complete", "error", "done" - PR int `json:"pr,omitempty"` - Owner string `json:"owner,omitempty"` - Repo string `json:"repo,omitempty"` - Progress string `json:"progress,omitempty"` // e.g., "5/15" - Error string `json:"error,omitempty"` - Result *cost.ExtrapolatedBreakdown `json:"result,omitempty"` - Commit string `json:"commit,omitempty"` - R2RCallout bool `json:"r2r_callout,omitempty"` + Type string `json:"type"` // "fetching", "processing", "complete", "error", "done" + PR int `json:"pr,omitempty"` + Owner string `json:"owner,omitempty"` + Repo string `json:"repo,omitempty"` + Progress string `json:"progress,omitempty"` // e.g., "5/15" + Error string `json:"error,omitempty"` + Result *cost.ExtrapolatedBreakdown `json:"result,omitempty"` + Commit string `json:"commit,omitempty"` + R2RCallout bool `json:"r2r_callout,omitempty"` + SecondsInState map[string]int `json:"seconds_in_state,omitempty"` // Only in "done" messages } // New creates a new Server instance. @@ -1046,26 +1049,38 @@ func (s *Server) processRequest(ctx context.Context, req *CalculateRequest, toke // Cache miss - need to fetch PR data and calculate cacheKey := fmt.Sprintf("pr:%s", req.URL) prData, prCached := s.cachedPRData(ctx, cacheKey) + var secondsInState map[string]int if !prCached { // Fetch PR data using configured data source var err error // For single PR requests, use 1 hour ago as reference time to enable reasonable caching referenceTime := time.Now().Add(-1 * time.Hour) if s.dataSource == "turnserver" { - // Use turnserver for PR data - prData, err = github.FetchPRDataViaTurnserver(ctx, req.URL, token, referenceTime) + // Use turnserver for PR data with analysis + prDataWithAnalysis, err := github.FetchPRDataWithAnalysisViaTurnserver(ctx, req.URL, token, referenceTime) + if err != nil { + s.logger.ErrorContext(ctx, "[processRequest] Failed to fetch PR data", "url", req.URL, "source", s.dataSource, errorKey, err) + // Check if it's an access error (404, 403) - return error to client. + if IsAccessError(err) { + s.logger.WarnContext(ctx, "[processRequest] Access denied", "url", req.URL) + return nil, NewAccessError(http.StatusForbidden, "access denied to PR") + } + return nil, fmt.Errorf("failed to fetch PR data: %w", err) + } + prData = prDataWithAnalysis.PRData + secondsInState = prDataWithAnalysis.Analysis.SecondsInState } else { // Use prx for PR data prData, err = github.FetchPRData(ctx, req.URL, token, referenceTime) - } - if err != nil { - s.logger.ErrorContext(ctx, "[processRequest] Failed to fetch PR data", "url", req.URL, "source", s.dataSource, errorKey, err) - // Check if it's an access error (404, 403) - return error to client. - if IsAccessError(err) { - s.logger.WarnContext(ctx, "[processRequest] Access denied", "url", req.URL) - return nil, NewAccessError(http.StatusForbidden, "access denied to PR") + if err != nil { + s.logger.ErrorContext(ctx, "[processRequest] Failed to fetch PR data", "url", req.URL, "source", s.dataSource, errorKey, err) + // Check if it's an access error (404, 403) - return error to client. + if IsAccessError(err) { + s.logger.WarnContext(ctx, "[processRequest] Access denied", "url", req.URL) + return nil, NewAccessError(http.StatusForbidden, "access denied to PR") + } + return nil, fmt.Errorf("failed to fetch PR data: %w", err) } - return nil, fmt.Errorf("failed to fetch PR data: %w", err) } s.logger.InfoContext(ctx, "[processRequest] PR data cache miss - fetched from GitHub", "url", req.URL) @@ -1080,9 +1095,10 @@ func (s *Server) processRequest(ctx context.Context, req *CalculateRequest, toke s.cacheCalcResult(ctx, req.URL, cfg, &breakdown, 1*time.Hour) return &CalculateResponse{ - Breakdown: breakdown, - Timestamp: time.Now(), - Commit: s.serverCommit, + Breakdown: breakdown, + Timestamp: time.Now(), + Commit: s.serverCommit, + SecondsInState: secondsInState, }, nil } @@ -1584,8 +1600,9 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest, samples := github.SamplePRs(prs, req.SampleSize) s.logger.InfoContext(ctx, "Sampled PRs", "sample_size", len(samples)) - // Collect breakdowns from each sample + // Collect breakdowns from each sample and aggregate seconds_in_state var breakdowns []cost.Breakdown + aggregatedSeconds := make(map[string]int) for i, pr := range samples { prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", req.Owner, req.Repo, pr.Number) s.logger.InfoContext(ctx, "Processing sample PR", @@ -1596,11 +1613,17 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest, // Try cache first prCacheKey := fmt.Sprintf("pr:%s", prURL) prData, prCached := s.cachedPRData(ctx, prCacheKey) + var secondsInState map[string]int if !prCached { var err error // Use configured data source with updatedAt for effective caching if s.dataSource == "turnserver" { - prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, pr.UpdatedAt) + var prDataWithAnalysis github.PRDataWithAnalysis + prDataWithAnalysis, err = github.FetchPRDataWithAnalysisViaTurnserver(ctx, prURL, token, pr.UpdatedAt) + if err == nil { + prData = prDataWithAnalysis.PRData + secondsInState = prDataWithAnalysis.Analysis.SecondsInState + } } else { prData, err = github.FetchPRData(ctx, prURL, token, pr.UpdatedAt) } @@ -1613,6 +1636,11 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest, s.cachePRData(ctx, prCacheKey, prData) } + // Aggregate seconds_in_state + for state, seconds := range secondsInState { + aggregatedSeconds[state] += seconds + } + breakdown := cost.Calculate(prData, cfg) breakdowns = append(breakdowns, breakdown) } @@ -1634,10 +1662,17 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest, // Extrapolate costs from samples extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, openPRCount, actualDays, cfg) + // Only include seconds_in_state if we have data (turnserver only) + var secondsInState map[string]int + if len(aggregatedSeconds) > 0 { + secondsInState = aggregatedSeconds + } + return &SampleResponse{ - Extrapolated: extrapolated, - Timestamp: time.Now(), - Commit: s.serverCommit, + Extrapolated: extrapolated, + Timestamp: time.Now(), + Commit: s.serverCommit, + SecondsInState: secondsInState, }, nil } @@ -1684,8 +1719,9 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to samples := github.SamplePRs(prs, req.SampleSize) s.logger.InfoContext(ctx, "Sampled PRs", "sample_size", len(samples)) - // Collect breakdowns from each sample + // Collect breakdowns from each sample and aggregate seconds_in_state var breakdowns []cost.Breakdown + aggregatedSeconds := make(map[string]int) for i, pr := range samples { prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", pr.Owner, pr.Repo, pr.Number) s.logger.InfoContext(ctx, "Processing sample PR", @@ -1696,11 +1732,17 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to // Try cache first prCacheKey := fmt.Sprintf("pr:%s", prURL) prData, prCached := s.cachedPRData(ctx, prCacheKey) + var secondsInState map[string]int if !prCached { var err error // Use configured data source with updatedAt for effective caching if s.dataSource == "turnserver" { - prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, pr.UpdatedAt) + var prDataWithAnalysis github.PRDataWithAnalysis + prDataWithAnalysis, err = github.FetchPRDataWithAnalysisViaTurnserver(ctx, prURL, token, pr.UpdatedAt) + if err == nil { + prData = prDataWithAnalysis.PRData + secondsInState = prDataWithAnalysis.Analysis.SecondsInState + } } else { prData, err = github.FetchPRData(ctx, prURL, token, pr.UpdatedAt) } @@ -1713,6 +1755,11 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to s.cachePRData(ctx, prCacheKey, prData) } + // Aggregate seconds_in_state + for state, seconds := range secondsInState { + aggregatedSeconds[state] += seconds + } + breakdown := cost.Calculate(prData, cfg) breakdowns = append(breakdowns, breakdown) } @@ -1735,10 +1782,17 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to // Extrapolate costs from samples extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, totalOpenPRs, actualDays, cfg) + // Only include seconds_in_state if we have data (turnserver only) + var secondsInState map[string]int + if len(aggregatedSeconds) > 0 { + secondsInState = aggregatedSeconds + } + return &SampleResponse{ - Extrapolated: extrapolated, - Timestamp: time.Now(), - Commit: s.serverCommit, + Extrapolated: extrapolated, + Timestamp: time.Now(), + Commit: s.serverCommit, + SecondsInState: secondsInState, }, nil } @@ -2101,7 +2155,7 @@ func (s *Server) processRepoSampleWithProgress(ctx context.Context, req *RepoSam })) // Process samples in parallel with progress updates - breakdowns := s.processPRsInParallel(workCtx, ctx, samples, req.Owner, req.Repo, token, cfg, writer) + breakdowns, aggregatedSeconds := s.processPRsInParallel(workCtx, ctx, samples, req.Owner, req.Repo, token, cfg, writer) if len(breakdowns) == 0 { logSSEError(ctx, s.logger, sendSSE(writer, ProgressUpdate{ @@ -2125,12 +2179,19 @@ func (s *Server) processRepoSampleWithProgress(ctx context.Context, req *RepoSam // Extrapolate costs from samples extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, openPRCount, actualDays, cfg) + // Only include seconds_in_state if we have data (turnserver only) + var secondsInState map[string]int + if len(aggregatedSeconds) > 0 { + secondsInState = aggregatedSeconds + } + // Send final result logSSEError(ctx, s.logger, sendSSE(writer, ProgressUpdate{ - Type: "done", - Result: &extrapolated, - Commit: s.serverCommit, - R2RCallout: s.r2rCallout, + Type: "done", + Result: &extrapolated, + Commit: s.serverCommit, + R2RCallout: s.r2rCallout, + SecondsInState: secondsInState, })) } @@ -2238,7 +2299,7 @@ func (s *Server) processOrgSampleWithProgress(ctx context.Context, req *OrgSampl })) // Process samples in parallel with progress updates (org mode uses empty owner/repo since it's mixed) - breakdowns := s.processPRsInParallel(workCtx, ctx, samples, "", "", token, cfg, writer) + breakdowns, aggregatedSeconds := s.processPRsInParallel(workCtx, ctx, samples, "", "", token, cfg, writer) s.logger.InfoContext(ctx, "[processOrgSampleWithProgress] Finished processing samples", "org", req.Org, @@ -2268,20 +2329,28 @@ func (s *Server) processOrgSampleWithProgress(ctx context.Context, req *OrgSampl // Extrapolate costs from samples extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, totalOpenPRs, actualDays, cfg) + // Only include seconds_in_state if we have data (turnserver only) + var secondsInState map[string]int + if len(aggregatedSeconds) > 0 { + secondsInState = aggregatedSeconds + } + // Send final result logSSEError(ctx, s.logger, sendSSE(writer, ProgressUpdate{ - Type: "done", - Result: &extrapolated, - Commit: s.serverCommit, - R2RCallout: s.r2rCallout, + Type: "done", + Result: &extrapolated, + Commit: s.serverCommit, + R2RCallout: s.r2rCallout, + SecondsInState: secondsInState, })) } // processPRsInParallel processes PRs in parallel and sends progress updates via SSE. // //nolint:revive // line-length/use-waitgroup-go: long function signature acceptable, standard wg pattern -func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples []github.PRSummary, defaultOwner, defaultRepo, token string, cfg cost.Config, writer http.ResponseWriter) []cost.Breakdown { +func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples []github.PRSummary, defaultOwner, defaultRepo, token string, cfg cost.Config, writer http.ResponseWriter) ([]cost.Breakdown, map[string]int) { var breakdowns []cost.Breakdown + aggregatedSeconds := make(map[string]int) var mu sync.Mutex var sseMu sync.Mutex // Protects SSE writes to prevent corrupted chunked encoding @@ -2350,12 +2419,18 @@ func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples [ // Cache miss - need to fetch PR data and calculate prCacheKey := fmt.Sprintf("pr:%s", prURL) prData, prCached := s.cachedPRData(workCtx, prCacheKey) + var secondsInState map[string]int if !prCached { var err error // Use work context for actual API calls (not tied to client connection) // Use configured data source with updatedAt for effective caching if s.dataSource == "turnserver" { - prData, err = github.FetchPRDataViaTurnserver(workCtx, prURL, token, prSummary.UpdatedAt) + var prDataWithAnalysis github.PRDataWithAnalysis + prDataWithAnalysis, err = github.FetchPRDataWithAnalysisViaTurnserver(workCtx, prURL, token, prSummary.UpdatedAt) + if err == nil { + prData = prDataWithAnalysis.PRData + secondsInState = prDataWithAnalysis.Analysis.SecondsInState + } } else { prData, err = github.FetchPRData(workCtx, prURL, token, prSummary.UpdatedAt) } @@ -2380,6 +2455,13 @@ func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples [ s.cachePRData(workCtx, prCacheKey, prData) } + // Aggregate seconds_in_state + mu.Lock() + for state, seconds := range secondsInState { + aggregatedSeconds[state] += seconds + } + mu.Unlock() + // Send "processing" update using request context for SSE sseMu.Lock() logSSEError(reqCtx, s.logger, sendSSE(writer, ProgressUpdate{ @@ -2415,5 +2497,5 @@ func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples [ } wg.Wait() - return breakdowns + return breakdowns, aggregatedSeconds } diff --git a/internal/server/static/index.html b/internal/server/static/index.html index 34b85bb..63fbdae 100644 --- a/internal/server/static/index.html +++ b/internal/server/static/index.html @@ -1610,6 +1610,109 @@
' + formatBreakdown(data) + '';
html += '';
+ // Add workflow timeline if available (only when using turnserver)
+ if (data.seconds_in_state) {
+ html += formatWorkflowTimeline(data.seconds_in_state);
+ }
+
resultDiv.innerHTML = html;
}
@@ -2134,6 +2242,17 @@ ' + formatAveragePR(e) + '';
html += '';
+ // Add workflow timeline if available (only when using turnserver)
+ if (data.seconds_in_state) {
+ // Calculate average seconds per PR
+ const avgSecondsInState = {};
+ const sampleCount = e.successful_samples || 1;
+ for (const [state, totalSeconds] of Object.entries(data.seconds_in_state)) {
+ avgSecondsInState[state] = Math.round(totalSeconds / sampleCount);
+ }
+ html += formatWorkflowTimeline(avgSecondsInState);
+ }
+
resultDiv.innerHTML = html;
resolve();
return;
diff --git a/pkg/github/turnclient.go b/pkg/github/turnclient.go
index 1d4cae3..6d0a735 100644
--- a/pkg/github/turnclient.go
+++ b/pkg/github/turnclient.go
@@ -11,6 +11,12 @@ import (
"github.com/codeGROOVE-dev/turnclient/pkg/turn"
)
+// PRDataWithAnalysis combines PR data with turnserver analysis.
+type PRDataWithAnalysis struct {
+ PRData cost.PRData
+ Analysis turn.Analysis
+}
+
// FetchPRDataViaTurnserver retrieves pull request information from the turnserver
// and converts it to the format needed for cost calculation.
//
@@ -74,3 +80,64 @@ func FetchPRDataViaTurnserver(ctx context.Context, prURL string, token string, u
slog.Debug("Converted PR data", "human_events", len(result.Events))
return result, nil
}
+
+// FetchPRDataWithAnalysisViaTurnserver retrieves pull request information and analysis
+// from the turnserver. This includes both the PR data needed for cost calculation and
+// the workflow analysis (seconds_in_state, workflow_state, etc.).
+//
+// Parameters:
+// - ctx: Context for the API call
+// - prURL: Full GitHub PR URL (e.g., "https://github.com/owner/repo/pull/123")
+// - token: GitHub authentication token
+// - updatedAt: PR's last update timestamp (for caching) or time.Now() to bypass cache
+//
+// Returns:
+// - PRDataWithAnalysis containing both cost.PRData and turn.Analysis
+func FetchPRDataWithAnalysisViaTurnserver(ctx context.Context, prURL string, token string, updatedAt time.Time) (PRDataWithAnalysis, error) {
+ slog.Debug("Creating turnserver client", "url", prURL, "updated_at", updatedAt.Format(time.RFC3339))
+
+ // Create turnserver client using default endpoint
+ client, err := turn.NewDefaultClient()
+ if err != nil {
+ slog.Error("Failed to create turnserver client", "error", err)
+ return PRDataWithAnalysis{}, fmt.Errorf("create turnserver client: %w", err)
+ }
+
+ // Set authentication token
+ client.SetAuthToken(token)
+
+ // Enable event data in response - critical for cost calculation
+ client.IncludeEvents()
+
+ slog.Debug("Calling turnserver API", "url", prURL, "updated_at", updatedAt.Format(time.RFC3339))
+
+ // Fetch PR data from turnserver
+ response, err := client.Check(ctx, prURL, "codeGROOVE-prcost", updatedAt)
+ if err != nil {
+ slog.Error("Turnserver API call failed", "url", prURL, "error", err)
+ return PRDataWithAnalysis{}, fmt.Errorf("turnserver API call failed: %w", err)
+ }
+
+ slog.Debug("Turnserver API call successful",
+ "additions", response.PullRequest.Additions,
+ "deletions", response.PullRequest.Deletions,
+ "author", response.PullRequest.Author,
+ "total_events", len(response.Events),
+ "workflow_state", response.Analysis.WorkflowState,
+ "seconds_in_state", len(response.Analysis.SecondsInState))
+
+ // Convert turnserver response to prx.PullRequestData format
+ prData := &prx.PullRequestData{
+ PullRequest: response.PullRequest,
+ Events: response.Events,
+ }
+
+ // Convert to cost.PRData using existing conversion function
+ result := PRDataFromPRX(prData)
+ slog.Debug("Converted PR data", "human_events", len(result.Events))
+
+ return PRDataWithAnalysis{
+ PRData: result,
+ Analysis: response.Analysis,
+ }, nil
+}