Skip to content

Commit 001c57c

Browse files
authored
Merge pull request #14 from tstromberg/main
add Workflow State Timeline
2 parents cdb2aaf + a97bca4 commit 001c57c

File tree

3 files changed

+319
-51
lines changed

3 files changed

+319
-51
lines changed

internal/server/server.go

Lines changed: 133 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ type CalculateRequest struct {
132132
//
133133
//nolint:govet // fieldalignment: API struct field order optimized for readability
134134
type CalculateResponse struct {
135-
Breakdown cost.Breakdown `json:"breakdown"`
136-
Timestamp time.Time `json:"timestamp"`
137-
Commit string `json:"commit"`
135+
Breakdown cost.Breakdown `json:"breakdown"`
136+
Timestamp time.Time `json:"timestamp"`
137+
Commit string `json:"commit"`
138+
SecondsInState map[string]int `json:"seconds_in_state,omitempty"` // Only populated when using turnserver
138139
}
139140

140141
// RepoSampleRequest represents a request to sample and calculate costs for a repository.
@@ -162,24 +163,26 @@ type OrgSampleRequest struct {
162163
//
163164
//nolint:govet // fieldalignment: API struct field order optimized for readability
164165
type SampleResponse struct {
165-
Extrapolated cost.ExtrapolatedBreakdown `json:"extrapolated"`
166-
Timestamp time.Time `json:"timestamp"`
167-
Commit string `json:"commit"`
166+
Extrapolated cost.ExtrapolatedBreakdown `json:"extrapolated"`
167+
Timestamp time.Time `json:"timestamp"`
168+
Commit string `json:"commit"`
169+
SecondsInState map[string]int `json:"seconds_in_state,omitempty"` // Aggregated across all sampled PRs
168170
}
169171

170172
// ProgressUpdate represents a progress update for streaming responses.
171173
//
172174
//nolint:govet // fieldalignment: API struct field order optimized for readability
173175
type ProgressUpdate struct {
174-
Type string `json:"type"` // "fetching", "processing", "complete", "error", "done"
175-
PR int `json:"pr,omitempty"`
176-
Owner string `json:"owner,omitempty"`
177-
Repo string `json:"repo,omitempty"`
178-
Progress string `json:"progress,omitempty"` // e.g., "5/15"
179-
Error string `json:"error,omitempty"`
180-
Result *cost.ExtrapolatedBreakdown `json:"result,omitempty"`
181-
Commit string `json:"commit,omitempty"`
182-
R2RCallout bool `json:"r2r_callout,omitempty"`
176+
Type string `json:"type"` // "fetching", "processing", "complete", "error", "done"
177+
PR int `json:"pr,omitempty"`
178+
Owner string `json:"owner,omitempty"`
179+
Repo string `json:"repo,omitempty"`
180+
Progress string `json:"progress,omitempty"` // e.g., "5/15"
181+
Error string `json:"error,omitempty"`
182+
Result *cost.ExtrapolatedBreakdown `json:"result,omitempty"`
183+
Commit string `json:"commit,omitempty"`
184+
R2RCallout bool `json:"r2r_callout,omitempty"`
185+
SecondsInState map[string]int `json:"seconds_in_state,omitempty"` // Only in "done" messages
183186
}
184187

185188
// New creates a new Server instance.
@@ -1046,26 +1049,38 @@ func (s *Server) processRequest(ctx context.Context, req *CalculateRequest, toke
10461049
// Cache miss - need to fetch PR data and calculate
10471050
cacheKey := fmt.Sprintf("pr:%s", req.URL)
10481051
prData, prCached := s.cachedPRData(ctx, cacheKey)
1052+
var secondsInState map[string]int
10491053
if !prCached {
10501054
// Fetch PR data using configured data source
10511055
var err error
10521056
// For single PR requests, use 1 hour ago as reference time to enable reasonable caching
10531057
referenceTime := time.Now().Add(-1 * time.Hour)
10541058
if s.dataSource == "turnserver" {
1055-
// Use turnserver for PR data
1056-
prData, err = github.FetchPRDataViaTurnserver(ctx, req.URL, token, referenceTime)
1059+
// Use turnserver for PR data with analysis
1060+
prDataWithAnalysis, err := github.FetchPRDataWithAnalysisViaTurnserver(ctx, req.URL, token, referenceTime)
1061+
if err != nil {
1062+
s.logger.ErrorContext(ctx, "[processRequest] Failed to fetch PR data", "url", req.URL, "source", s.dataSource, errorKey, err)
1063+
// Check if it's an access error (404, 403) - return error to client.
1064+
if IsAccessError(err) {
1065+
s.logger.WarnContext(ctx, "[processRequest] Access denied", "url", req.URL)
1066+
return nil, NewAccessError(http.StatusForbidden, "access denied to PR")
1067+
}
1068+
return nil, fmt.Errorf("failed to fetch PR data: %w", err)
1069+
}
1070+
prData = prDataWithAnalysis.PRData
1071+
secondsInState = prDataWithAnalysis.Analysis.SecondsInState
10571072
} else {
10581073
// Use prx for PR data
10591074
prData, err = github.FetchPRData(ctx, req.URL, token, referenceTime)
1060-
}
1061-
if err != nil {
1062-
s.logger.ErrorContext(ctx, "[processRequest] Failed to fetch PR data", "url", req.URL, "source", s.dataSource, errorKey, err)
1063-
// Check if it's an access error (404, 403) - return error to client.
1064-
if IsAccessError(err) {
1065-
s.logger.WarnContext(ctx, "[processRequest] Access denied", "url", req.URL)
1066-
return nil, NewAccessError(http.StatusForbidden, "access denied to PR")
1075+
if err != nil {
1076+
s.logger.ErrorContext(ctx, "[processRequest] Failed to fetch PR data", "url", req.URL, "source", s.dataSource, errorKey, err)
1077+
// Check if it's an access error (404, 403) - return error to client.
1078+
if IsAccessError(err) {
1079+
s.logger.WarnContext(ctx, "[processRequest] Access denied", "url", req.URL)
1080+
return nil, NewAccessError(http.StatusForbidden, "access denied to PR")
1081+
}
1082+
return nil, fmt.Errorf("failed to fetch PR data: %w", err)
10671083
}
1068-
return nil, fmt.Errorf("failed to fetch PR data: %w", err)
10691084
}
10701085

10711086
s.logger.InfoContext(ctx, "[processRequest] PR data cache miss - fetched from GitHub", "url", req.URL)
@@ -1080,9 +1095,10 @@ func (s *Server) processRequest(ctx context.Context, req *CalculateRequest, toke
10801095
s.cacheCalcResult(ctx, req.URL, cfg, &breakdown, 1*time.Hour)
10811096

10821097
return &CalculateResponse{
1083-
Breakdown: breakdown,
1084-
Timestamp: time.Now(),
1085-
Commit: s.serverCommit,
1098+
Breakdown: breakdown,
1099+
Timestamp: time.Now(),
1100+
Commit: s.serverCommit,
1101+
SecondsInState: secondsInState,
10861102
}, nil
10871103
}
10881104

@@ -1584,8 +1600,9 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest,
15841600
samples := github.SamplePRs(prs, req.SampleSize)
15851601
s.logger.InfoContext(ctx, "Sampled PRs", "sample_size", len(samples))
15861602

1587-
// Collect breakdowns from each sample
1603+
// Collect breakdowns from each sample and aggregate seconds_in_state
15881604
var breakdowns []cost.Breakdown
1605+
aggregatedSeconds := make(map[string]int)
15891606
for i, pr := range samples {
15901607
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", req.Owner, req.Repo, pr.Number)
15911608
s.logger.InfoContext(ctx, "Processing sample PR",
@@ -1596,11 +1613,17 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest,
15961613
// Try cache first
15971614
prCacheKey := fmt.Sprintf("pr:%s", prURL)
15981615
prData, prCached := s.cachedPRData(ctx, prCacheKey)
1616+
var secondsInState map[string]int
15991617
if !prCached {
16001618
var err error
16011619
// Use configured data source with updatedAt for effective caching
16021620
if s.dataSource == "turnserver" {
1603-
prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, pr.UpdatedAt)
1621+
var prDataWithAnalysis github.PRDataWithAnalysis
1622+
prDataWithAnalysis, err = github.FetchPRDataWithAnalysisViaTurnserver(ctx, prURL, token, pr.UpdatedAt)
1623+
if err == nil {
1624+
prData = prDataWithAnalysis.PRData
1625+
secondsInState = prDataWithAnalysis.Analysis.SecondsInState
1626+
}
16041627
} else {
16051628
prData, err = github.FetchPRData(ctx, prURL, token, pr.UpdatedAt)
16061629
}
@@ -1613,6 +1636,11 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest,
16131636
s.cachePRData(ctx, prCacheKey, prData)
16141637
}
16151638

1639+
// Aggregate seconds_in_state
1640+
for state, seconds := range secondsInState {
1641+
aggregatedSeconds[state] += seconds
1642+
}
1643+
16161644
breakdown := cost.Calculate(prData, cfg)
16171645
breakdowns = append(breakdowns, breakdown)
16181646
}
@@ -1634,10 +1662,17 @@ func (s *Server) processRepoSample(ctx context.Context, req *RepoSampleRequest,
16341662
// Extrapolate costs from samples
16351663
extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, openPRCount, actualDays, cfg)
16361664

1665+
// Only include seconds_in_state if we have data (turnserver only)
1666+
var secondsInState map[string]int
1667+
if len(aggregatedSeconds) > 0 {
1668+
secondsInState = aggregatedSeconds
1669+
}
1670+
16371671
return &SampleResponse{
1638-
Extrapolated: extrapolated,
1639-
Timestamp: time.Now(),
1640-
Commit: s.serverCommit,
1672+
Extrapolated: extrapolated,
1673+
Timestamp: time.Now(),
1674+
Commit: s.serverCommit,
1675+
SecondsInState: secondsInState,
16411676
}, nil
16421677
}
16431678

@@ -1684,8 +1719,9 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to
16841719
samples := github.SamplePRs(prs, req.SampleSize)
16851720
s.logger.InfoContext(ctx, "Sampled PRs", "sample_size", len(samples))
16861721

1687-
// Collect breakdowns from each sample
1722+
// Collect breakdowns from each sample and aggregate seconds_in_state
16881723
var breakdowns []cost.Breakdown
1724+
aggregatedSeconds := make(map[string]int)
16891725
for i, pr := range samples {
16901726
prURL := fmt.Sprintf("https://github.com/%s/%s/pull/%d", pr.Owner, pr.Repo, pr.Number)
16911727
s.logger.InfoContext(ctx, "Processing sample PR",
@@ -1696,11 +1732,17 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to
16961732
// Try cache first
16971733
prCacheKey := fmt.Sprintf("pr:%s", prURL)
16981734
prData, prCached := s.cachedPRData(ctx, prCacheKey)
1735+
var secondsInState map[string]int
16991736
if !prCached {
17001737
var err error
17011738
// Use configured data source with updatedAt for effective caching
17021739
if s.dataSource == "turnserver" {
1703-
prData, err = github.FetchPRDataViaTurnserver(ctx, prURL, token, pr.UpdatedAt)
1740+
var prDataWithAnalysis github.PRDataWithAnalysis
1741+
prDataWithAnalysis, err = github.FetchPRDataWithAnalysisViaTurnserver(ctx, prURL, token, pr.UpdatedAt)
1742+
if err == nil {
1743+
prData = prDataWithAnalysis.PRData
1744+
secondsInState = prDataWithAnalysis.Analysis.SecondsInState
1745+
}
17041746
} else {
17051747
prData, err = github.FetchPRData(ctx, prURL, token, pr.UpdatedAt)
17061748
}
@@ -1713,6 +1755,11 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to
17131755
s.cachePRData(ctx, prCacheKey, prData)
17141756
}
17151757

1758+
// Aggregate seconds_in_state
1759+
for state, seconds := range secondsInState {
1760+
aggregatedSeconds[state] += seconds
1761+
}
1762+
17161763
breakdown := cost.Calculate(prData, cfg)
17171764
breakdowns = append(breakdowns, breakdown)
17181765
}
@@ -1735,10 +1782,17 @@ func (s *Server) processOrgSample(ctx context.Context, req *OrgSampleRequest, to
17351782
// Extrapolate costs from samples
17361783
extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, totalOpenPRs, actualDays, cfg)
17371784

1785+
// Only include seconds_in_state if we have data (turnserver only)
1786+
var secondsInState map[string]int
1787+
if len(aggregatedSeconds) > 0 {
1788+
secondsInState = aggregatedSeconds
1789+
}
1790+
17381791
return &SampleResponse{
1739-
Extrapolated: extrapolated,
1740-
Timestamp: time.Now(),
1741-
Commit: s.serverCommit,
1792+
Extrapolated: extrapolated,
1793+
Timestamp: time.Now(),
1794+
Commit: s.serverCommit,
1795+
SecondsInState: secondsInState,
17421796
}, nil
17431797
}
17441798

@@ -2101,7 +2155,7 @@ func (s *Server) processRepoSampleWithProgress(ctx context.Context, req *RepoSam
21012155
}))
21022156

21032157
// Process samples in parallel with progress updates
2104-
breakdowns := s.processPRsInParallel(workCtx, ctx, samples, req.Owner, req.Repo, token, cfg, writer)
2158+
breakdowns, aggregatedSeconds := s.processPRsInParallel(workCtx, ctx, samples, req.Owner, req.Repo, token, cfg, writer)
21052159

21062160
if len(breakdowns) == 0 {
21072161
logSSEError(ctx, s.logger, sendSSE(writer, ProgressUpdate{
@@ -2125,12 +2179,19 @@ func (s *Server) processRepoSampleWithProgress(ctx context.Context, req *RepoSam
21252179
// Extrapolate costs from samples
21262180
extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, openPRCount, actualDays, cfg)
21272181

2182+
// Only include seconds_in_state if we have data (turnserver only)
2183+
var secondsInState map[string]int
2184+
if len(aggregatedSeconds) > 0 {
2185+
secondsInState = aggregatedSeconds
2186+
}
2187+
21282188
// Send final result
21292189
logSSEError(ctx, s.logger, sendSSE(writer, ProgressUpdate{
2130-
Type: "done",
2131-
Result: &extrapolated,
2132-
Commit: s.serverCommit,
2133-
R2RCallout: s.r2rCallout,
2190+
Type: "done",
2191+
Result: &extrapolated,
2192+
Commit: s.serverCommit,
2193+
R2RCallout: s.r2rCallout,
2194+
SecondsInState: secondsInState,
21342195
}))
21352196
}
21362197

@@ -2238,7 +2299,7 @@ func (s *Server) processOrgSampleWithProgress(ctx context.Context, req *OrgSampl
22382299
}))
22392300

22402301
// Process samples in parallel with progress updates (org mode uses empty owner/repo since it's mixed)
2241-
breakdowns := s.processPRsInParallel(workCtx, ctx, samples, "", "", token, cfg, writer)
2302+
breakdowns, aggregatedSeconds := s.processPRsInParallel(workCtx, ctx, samples, "", "", token, cfg, writer)
22422303

22432304
s.logger.InfoContext(ctx, "[processOrgSampleWithProgress] Finished processing samples",
22442305
"org", req.Org,
@@ -2268,20 +2329,28 @@ func (s *Server) processOrgSampleWithProgress(ctx context.Context, req *OrgSampl
22682329
// Extrapolate costs from samples
22692330
extrapolated := cost.ExtrapolateFromSamples(breakdowns, len(prs), totalAuthors, totalOpenPRs, actualDays, cfg)
22702331

2332+
// Only include seconds_in_state if we have data (turnserver only)
2333+
var secondsInState map[string]int
2334+
if len(aggregatedSeconds) > 0 {
2335+
secondsInState = aggregatedSeconds
2336+
}
2337+
22712338
// Send final result
22722339
logSSEError(ctx, s.logger, sendSSE(writer, ProgressUpdate{
2273-
Type: "done",
2274-
Result: &extrapolated,
2275-
Commit: s.serverCommit,
2276-
R2RCallout: s.r2rCallout,
2340+
Type: "done",
2341+
Result: &extrapolated,
2342+
Commit: s.serverCommit,
2343+
R2RCallout: s.r2rCallout,
2344+
SecondsInState: secondsInState,
22772345
}))
22782346
}
22792347

22802348
// processPRsInParallel processes PRs in parallel and sends progress updates via SSE.
22812349
//
22822350
//nolint:revive // line-length/use-waitgroup-go: long function signature acceptable, standard wg pattern
2283-
func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples []github.PRSummary, defaultOwner, defaultRepo, token string, cfg cost.Config, writer http.ResponseWriter) []cost.Breakdown {
2351+
func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples []github.PRSummary, defaultOwner, defaultRepo, token string, cfg cost.Config, writer http.ResponseWriter) ([]cost.Breakdown, map[string]int) {
22842352
var breakdowns []cost.Breakdown
2353+
aggregatedSeconds := make(map[string]int)
22852354
var mu sync.Mutex
22862355
var sseMu sync.Mutex // Protects SSE writes to prevent corrupted chunked encoding
22872356

@@ -2350,12 +2419,18 @@ func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples [
23502419
// Cache miss - need to fetch PR data and calculate
23512420
prCacheKey := fmt.Sprintf("pr:%s", prURL)
23522421
prData, prCached := s.cachedPRData(workCtx, prCacheKey)
2422+
var secondsInState map[string]int
23532423
if !prCached {
23542424
var err error
23552425
// Use work context for actual API calls (not tied to client connection)
23562426
// Use configured data source with updatedAt for effective caching
23572427
if s.dataSource == "turnserver" {
2358-
prData, err = github.FetchPRDataViaTurnserver(workCtx, prURL, token, prSummary.UpdatedAt)
2428+
var prDataWithAnalysis github.PRDataWithAnalysis
2429+
prDataWithAnalysis, err = github.FetchPRDataWithAnalysisViaTurnserver(workCtx, prURL, token, prSummary.UpdatedAt)
2430+
if err == nil {
2431+
prData = prDataWithAnalysis.PRData
2432+
secondsInState = prDataWithAnalysis.Analysis.SecondsInState
2433+
}
23592434
} else {
23602435
prData, err = github.FetchPRData(workCtx, prURL, token, prSummary.UpdatedAt)
23612436
}
@@ -2380,6 +2455,13 @@ func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples [
23802455
s.cachePRData(workCtx, prCacheKey, prData)
23812456
}
23822457

2458+
// Aggregate seconds_in_state
2459+
mu.Lock()
2460+
for state, seconds := range secondsInState {
2461+
aggregatedSeconds[state] += seconds
2462+
}
2463+
mu.Unlock()
2464+
23832465
// Send "processing" update using request context for SSE
23842466
sseMu.Lock()
23852467
logSSEError(reqCtx, s.logger, sendSSE(writer, ProgressUpdate{
@@ -2415,5 +2497,5 @@ func (s *Server) processPRsInParallel(workCtx, reqCtx context.Context, samples [
24152497
}
24162498

24172499
wg.Wait()
2418-
return breakdowns
2500+
return breakdowns, aggregatedSeconds
24192501
}

0 commit comments

Comments
 (0)