Skip to content

Commit 8768da8

Browse files
committed
Eliminate utils/restore Metrics
This PR elmintes the utils package by moving the functionality to where it is used. This is more in-line with idiomatic go (no non-descriptive package names). As part of the elimination we moved to a custom slice for logs that elminates the majority of the Custom Marshal/Unmarshal code. Identified in this change, the metrics emission was no longer wired up and has been restored to functionality. Key improvements: - Introduce LogsSlice type with proper JSON marshaling/unmarshaling - Implements Stringer interface for consistent log formatting - Handles newline-delimited string conversion automatically - Eliminates need for custom PredictionResponse marshal/unmarshal - Move metrics functionality from dead util/metrics.go to runner.go - Restore sendRunnerMetric() function that was never called - Inline HTTPClientWithRetry() using httpclient.ApplyRetryPolicy - Fix missing imports and undefined references - Remove util.JoinLogs() dependency by using LogsSlice.String() - Consolidate logging utilities into appropriate packages Technical details: - LogsSlice.String() preserves util.JoinLogs() behavior (joins with \n, ensures trailing \n) - LogsSlice.MarshalJSON/UnmarshalJSON handles string ↔ []string conversion - All util package imports removed across codebase - Metrics sending now properly integrated into runner lifecycle This eliminates the "terrible util package" while restoring previously broken metrics functionality and improving type safety for log handling.
1 parent 8752ce8 commit 8768da8

File tree

20 files changed

+169
-285
lines changed

20 files changed

+169
-285
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
/.idea
66
/build
77
/dist
8-
/internal/util/version.txt
8+
/internal/version/version.txt
99
/python/cog/cog-*
1010
/python/coglet/_version.py
1111
/uv.lock

cmd/cog/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/replicate/cog-runtime/internal/config"
1717
"github.com/replicate/cog-runtime/internal/runner"
1818
"github.com/replicate/cog-runtime/internal/service"
19-
"github.com/replicate/cog-runtime/internal/util"
19+
"github.com/replicate/cog-runtime/internal/version"
2020
)
2121

2222
type ServerCmd struct {
@@ -125,7 +125,7 @@ func (s *ServerCmd) Run() error {
125125
}
126126

127127
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
128-
log.Infow("starting Cog HTTP server", "addr", addr, "version", util.Version(), "pid", os.Getpid())
128+
log.Infow("starting Cog HTTP server", "addr", addr, "version", version.Version(), "pid", os.Getpid())
129129

130130
// Create service with base logger
131131
svc := service.New(cfg, baseLogger)

internal/runner/path.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ import (
1515

1616
"github.com/gabriel-vasile/mimetype"
1717
"github.com/getkin/kin-openapi/openapi3"
18-
19-
"github.com/replicate/cog-runtime/internal/util"
18+
"github.com/replicate/go/httpclient"
2019
)
2120

2221
var Base64Regex = regexp.MustCompile(`^data:.*;base64,(?P<base64>.*)$`)
@@ -210,7 +209,7 @@ type uploader struct {
210209
// newUploader creates a new uploader instance
211210
func newUploader(uploadURL string) *uploader {
212211
return &uploader{
213-
client: util.HTTPClientWithRetry(),
212+
client: httpclient.ApplyRetryPolicy(http.DefaultClient),
214213
uploadURL: uploadURL,
215214
}
216215
}

internal/runner/runner.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package runner
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"encoding/json"
78
"errors"
89
"fmt"
10+
"net/http"
911
"os"
1012
"os/exec"
1113
"path"
@@ -20,8 +22,10 @@ import (
2022
"github.com/getkin/kin-openapi/openapi3"
2123
"go.uber.org/zap"
2224

25+
"github.com/replicate/go/httpclient"
26+
2327
"github.com/replicate/cog-runtime/internal/config"
24-
"github.com/replicate/cog-runtime/internal/util"
28+
"github.com/replicate/cog-runtime/internal/version"
2529
"github.com/replicate/cog-runtime/internal/webhook"
2630
)
2731

@@ -293,7 +297,7 @@ type Runner struct {
293297
schema string
294298
doc *openapi3.T
295299
setupResult SetupResult
296-
logs []string
300+
logs LogsSlice
297301
asyncPredict bool
298302
maxConcurrency int
299303
pending map[string]*PendingPrediction
@@ -521,7 +525,7 @@ func (r *Runner) captureLogLine(line string) {
521525
} else {
522526
// Add to runner logs for crash reporting
523527
r.logs = append(r.logs, line)
524-
r.setupResult.Logs = util.JoinLogs(r.logs)
528+
r.setupResult.Logs = r.logs.String()
525529
}
526530
r.mu.Unlock()
527531
default:
@@ -566,6 +570,9 @@ func (r *Runner) Config(ctx context.Context) error {
566570
// Default to 1 if not set in cog.yaml, regardless whether async predict or not
567571
maxConcurrency := max(1, cogYaml.Concurrency.Max)
568572

573+
// Send metrics
574+
go r.sendRunnerMetric(*cogYaml)
575+
569576
// Create config.json for the coglet process
570577
configJSON := map[string]any{
571578
"module_name": moduleName,
@@ -593,6 +600,36 @@ func (r *Runner) Config(ctx context.Context) error {
593600
return nil
594601
}
595602

603+
func (r *Runner) sendRunnerMetric(cogYaml CogYaml) {
604+
log := r.logger.Sugar()
605+
// FIXME: wire this up through more than os.getenv
606+
endpoint := os.Getenv("COG_METRICS_ENDPOINT")
607+
if endpoint == "" {
608+
return
609+
}
610+
data := map[string]any{
611+
"gpu": cogYaml.Build.GPU,
612+
"fast": cogYaml.Build.Fast,
613+
"cog_runtime": cogYaml.Build.CogRuntime,
614+
"version": version.Version(),
615+
}
616+
payload := MetricsPayload{
617+
Source: "cog-runtime",
618+
Type: "runner",
619+
Data: data,
620+
}
621+
body, err := json.Marshal(payload)
622+
if err != nil {
623+
log.Errorw("failed to marshal payload", "error", err)
624+
return
625+
}
626+
resp, err := httpclient.ApplyRetryPolicy(http.DefaultClient).Post(endpoint, "application/json", bytes.NewBuffer(body))
627+
if err != nil || resp.StatusCode != http.StatusOK {
628+
log.Errorw("failed to send runner metrics", "error", err)
629+
}
630+
defer resp.Body.Close()
631+
}
632+
596633
func (r *Runner) Stop() error {
597634
log := r.logger.Sugar()
598635
r.mu.Lock()
@@ -913,7 +950,10 @@ func (r *Runner) updateSetupResult() {
913950
}
914951

915952
// Set logs first (original pattern)
916-
r.setupResult.Logs = util.JoinLogs(logLines)
953+
r.setupResult.Logs = strings.Join(logLines, "\n")
954+
if r.setupResult.Logs != "" {
955+
r.setupResult.Logs += "\n"
956+
}
917957

918958
setupResultPath := filepath.Join(r.runnerCtx.workingdir, "setup_result.json")
919959
log.Debug("reading setup_result.json", "path", setupResultPath)
@@ -954,7 +994,7 @@ func (r *Runner) rotateLogs() string {
954994
r.mu.Lock()
955995
defer r.mu.Unlock()
956996

957-
allLogs := util.JoinLogs(r.logs)
997+
allLogs := r.logs.String()
958998
r.logs = r.logs[:0]
959999
return allLogs
9601000
}

internal/runner/runner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,7 @@ func TestPerPredictionWatcher(t *testing.T) {
12081208
assert.Equal(t, "partial output", pending.response.Output)
12091209
assert.Equal(t, predictionID, pending.response.ID)
12101210
assert.Equal(t, map[string]any{"test": "input"}, pending.response.Input)
1211-
assert.Equal(t, []string{"existing log"}, pending.response.Logs) // Logs preserved
1211+
assert.Equal(t, LogsSlice{"existing log"}, pending.response.Logs) // Logs preserved
12121212
pending.mu.Unlock()
12131213
})
12141214

internal/runner/types.go

Lines changed: 48 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,50 @@ import (
1616
"syscall"
1717
"time"
1818

19-
"github.com/replicate/cog-runtime/internal/util"
2019
"github.com/replicate/cog-runtime/internal/webhook"
2120
)
2221

22+
// LogsSlice is a []string that marshals to/from a newline-joined string in JSON
23+
type LogsSlice []string
24+
25+
func (l LogsSlice) String() string {
26+
r := strings.Join(l, "\n")
27+
if r != "" {
28+
r += "\n"
29+
}
30+
return r
31+
}
32+
33+
// MarshalJSON implements custom JSON marshaling to convert logs from []string to string
34+
func (l LogsSlice) MarshalJSON() ([]byte, error) {
35+
result := strings.Join(l, "\n")
36+
if result != "" {
37+
result += "\n"
38+
}
39+
return json.Marshal(result)
40+
}
41+
42+
// UnmarshalJSON implements custom JSON unmarshaling to convert logs from string to []string
43+
func (l *LogsSlice) UnmarshalJSON(data []byte) error {
44+
var str string
45+
if err := json.Unmarshal(data, &str); err != nil {
46+
return err
47+
}
48+
49+
if str == "" {
50+
*l = nil
51+
return nil
52+
}
53+
54+
// Split on newline and remove the trailing empty element if it exists
55+
parts := strings.Split(str, "\n")
56+
if len(parts) > 0 && parts[len(parts)-1] == "" {
57+
parts = parts[:len(parts)-1]
58+
}
59+
*l = LogsSlice(parts)
60+
return nil
61+
}
62+
2363
type Status int
2464

2565
const (
@@ -116,72 +156,11 @@ type PredictionResponse struct {
116156
Input any `json:"input,omitempty"`
117157
Output any `json:"output,omitempty"`
118158
Error string `json:"error,omitempty"`
119-
Logs []string `json:"logs,omitempty"`
159+
Logs LogsSlice `json:"logs,omitempty"`
120160
Metrics any `json:"metrics,omitempty"`
121161
WebhookURL string `json:"webhook,omitempty"`
122162
}
123163

124-
// MarshalJSON implements custom JSON marshaling to convert logs from []string to string
125-
func (pr PredictionResponse) MarshalJSON() ([]byte, error) {
126-
return json.Marshal(&struct {
127-
ID string `json:"id"`
128-
Status PredictionStatus `json:"status"`
129-
Input any `json:"input,omitempty"`
130-
Output any `json:"output,omitempty"`
131-
Error string `json:"error,omitempty"`
132-
Logs string `json:"logs,omitempty"`
133-
Metrics any `json:"metrics,omitempty"`
134-
WebhookURL string `json:"webhook,omitempty"`
135-
}{
136-
ID: pr.ID,
137-
Status: pr.Status,
138-
Input: pr.Input,
139-
Output: pr.Output,
140-
Error: pr.Error,
141-
Logs: util.JoinLogs(pr.Logs),
142-
Metrics: pr.Metrics,
143-
WebhookURL: pr.WebhookURL,
144-
})
145-
}
146-
147-
// UnmarshalJSON implements custom JSON unmarshalling to convert logs from string to []string
148-
func (pr *PredictionResponse) UnmarshalJSON(data []byte) error {
149-
aux := &struct {
150-
ID string `json:"id"`
151-
Status PredictionStatus `json:"status"`
152-
Input any `json:"input,omitempty"`
153-
Output any `json:"output,omitempty"`
154-
Error string `json:"error,omitempty"`
155-
Logs string `json:"logs,omitempty"`
156-
Metrics any `json:"metrics,omitempty"`
157-
WebhookURL string `json:"webhook,omitempty"`
158-
}{}
159-
if err := json.Unmarshal(data, aux); err != nil {
160-
return err
161-
}
162-
163-
pr.ID = aux.ID
164-
pr.Status = aux.Status
165-
pr.Input = aux.Input
166-
pr.Output = aux.Output
167-
pr.Error = aux.Error
168-
pr.Metrics = aux.Metrics
169-
pr.WebhookURL = aux.WebhookURL
170-
171-
// Convert string logs back to []string
172-
if aux.Logs != "" {
173-
// Split on newline and remove the trailing empty element if it exists
174-
parts := strings.Split(aux.Logs, "\n")
175-
if len(parts) > 0 && parts[len(parts)-1] == "" {
176-
parts = parts[:len(parts)-1]
177-
}
178-
pr.Logs = parts
179-
} else {
180-
pr.Logs = nil
181-
}
182-
return nil
183-
}
184-
185164
// RunnerID is a unique identifier for a runner instance.
186165
// Format: 8-character base32 string (no leading zeros)
187166
// Example: "k7m3n8p2", "b9q4x2w1"
@@ -391,3 +370,9 @@ func (p *PendingPrediction) sendWebhookSync(event webhook.Event) error {
391370
_ = p.webhookSender.SendConditional(p.request.Webhook, bytes.NewReader(body), event, p.request.WebhookEventsFilter, &p.lastUpdated)
392371
return nil
393372
}
373+
374+
type MetricsPayload struct {
375+
Source string `json:"source,omitempty"`
376+
Type string `json:"type,omitempty"`
377+
Data map[string]any `json:"data,omitempty"`
378+
}

internal/runner/types_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestPredictionResponse(t *testing.T) {
139139
assert.Equal(t, PredictionSucceeded, resp.Status)
140140
assert.Equal(t, map[string]any{"result": "success"}, resp.Output)
141141
assert.Empty(t, resp.Error)
142-
assert.Equal(t, []string{"log1", "log2"}, resp.Logs)
142+
assert.Equal(t, LogsSlice{"log1", "log2"}, resp.Logs)
143143
assert.Equal(t, map[string]any{"duration": 1.5}, resp.Metrics)
144144
assert.Equal(t, "http://example.com/webhook", resp.WebhookURL)
145145
})
@@ -330,7 +330,7 @@ func TestPredictionResponseUnmarshalFromExternalJSON(t *testing.T) {
330330
err := json.Unmarshal([]byte(jsonStr), &response)
331331
require.NoError(t, err)
332332

333-
expected := []string{
333+
expected := LogsSlice{
334334
"starting prediction",
335335
"prediction in progress 1/2",
336336
"prediction in progress 2/2",

0 commit comments

Comments
 (0)