Skip to content

Commit 5d32bf9

Browse files
Fix prediction response timestamp population and webhook finalization (#219)
* Fix prediction response timestamp population and webhook finalization This PR addresses critical issues with timestamp population in prediction responses and consolidates webhook finalization logic to ensure consistent behavior across sync and async predictions. Core Infrastructure Changes Time Format Standardization - Add TimeFormat constant to config package using microsecond precision with explicit timezone offset - Ensures consistent timestamp formatting across all prediction responses and webhook payloads PredictionResponse Structure Overhaul - Remove deprecated WebhookURL field from PredictionResponse struct - Add dedicated CreatedAt, StartedAt, CompletedAt timestamp fields with proper JSON tags - Change Metrics field from any to map[string]any for type safety - Add finalizeResponse() method to set CompletedAt timestamp and calculate predict_time metric - Add populateFromRequest() method to copy core fields (ID, Input, CreatedAt, StartedAt) from request Prediction Lifecycle Management Manager API Restructuring - Rename Manager.Predict() to Manager.PredictSync() for clarity - Update Manager.PredictAsync() to return initial populated response instead of void - Refactor internal predict() method to return both channel and initial response - Add async parameter to assignReqToRunner() to control setup wait behavior Runner Prediction Flow - Update runner.predict() signature to return initial response populated with request fields - Fix timestamp population in predict() method by setting CreatedAt/StartedAt on pending request - Add proper populateFromRequest() calls in handleResponseWebhooksAndCompletion after response overwrites - Ensure terminal responses call finalizeResponse() before sending through safeSend() Webhook and Response Handling - Consolidate terminal webhook logic into unified sendTerminalWebhook() method - Fix race conditions by ensuring response finalization before webhook sending - Update subprocess monitoring to use populateFromRequest() for failed prediction responses - Ensure all webhook payloads include complete timestamp information Test Infrastructure Improvements Response Validation Framework - Add comprehensive unit tests for finalizeResponse() covering edge cases and error conditions - Add unit tests for populateFromRequest() method with field overwrite scenarios - Update all integration tests to validate CreatedAt, StartedAt, CompletedAt fields - Add ValidateTerminalWebhookResponse helper function for consistent terminal response validation Test Harness Enhancements - Introduce testHarnessResponse type with string Logs field for easier test assertions - Update webhook receiver to use testHarnessResponse for simplified log comparisons - Retrofit existing tests to use new validation patterns without changing test logic Bug Fixes Timestamp Population Issues - Fix missing timestamps in sync prediction responses by ensuring populateFromRequest() calls - Resolve timestamp loss during response overwrites in handleResponseWebhooksAndCompletion - Ensure webhook payloads contain complete timestamp information for terminal events Metrics and Finalization - Fix missing predict_time metric in terminal responses by calling finalizeResponse() - Ensure Metrics map initialization before metric calculation - Handle time parsing errors gracefully in finalizeResponse() method Concurrency and Error Handling - Fix potential race conditions in webhook sending by proper response finalization order - Ensure cleanup of failed predictions includes populated response fields - Update error responses in runner stop scenarios to include request-derived fields Backward Compatibility - Maintain existing API signatures where possible - Preserve webhook payload structure while adding timestamp fields - Keep existing test behavior while enhancing validation capabilities * Refactor handleResponseWebhooksAndCompletion to eliminate duplicate response handling - Consolidate response update logic into single operation to avoid duplicate overwrites - Extract webhook sending logic into dedicated sendStatusWebhook function - Extract terminal completion handling into handleTerminalCompletion function - Ensure proper lock context around populateFromRequest calls accessing pending.request - Maintain race condition protection by using local response copy for safeSend - Remove TODO comment about function being "a mess" - now clean and maintainable This eliminates the scattered response object handling and reduces lock contention while preserving all existing functionality and thread safety guarantees. * nit fixes
1 parent 1384119 commit 5d32bf9

21 files changed

+478
-277
lines changed

internal/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"time"
66
)
77

8+
const (
9+
TimeFormat = "2006-01-02T15:04:05.999999-07:00"
10+
)
11+
812
// Config holds all configuration for the cog runtime service
913
type Config struct {
1014
// Server configuration

internal/runner/manager.go

Lines changed: 49 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package runner
22

33
import (
4-
"bytes"
54
"context"
65
_ "embed"
7-
"encoding/json"
86
"errors"
97
"fmt"
108
"io/fs"
@@ -152,9 +150,9 @@ func (m *Manager) releaseSlot() {
152150
}
153151
}
154152

155-
// Predict executes a sync prediction request - blocks until complete
156-
func (m *Manager) Predict(req PredictionRequest) (*PredictionResponse, error) {
157-
respChan, err := m.predict(m.ctx, req)
153+
// PredictSync executes a sync prediction request - blocks until complete
154+
func (m *Manager) PredictSync(req PredictionRequest) (*PredictionResponse, error) {
155+
respChan, _, err := m.predict(m.ctx, req, false)
158156
if err != nil {
159157
return nil, err
160158
}
@@ -165,41 +163,11 @@ func (m *Manager) Predict(req PredictionRequest) (*PredictionResponse, error) {
165163
}
166164

167165
// PredictAsync executes an async prediction request - returns immediately, sends webhook when complete
168-
func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error {
166+
func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) (*PredictionResponse, error) {
169167
log := m.logger.Sugar()
170-
if err := m.claimSlot(); err != nil {
171-
return err
172-
}
173-
174-
deadlineCtx, deadlineCancel := context.WithTimeout(ctx, 10*time.Second)
175-
defer deadlineCancel()
176-
177-
runner, err := m.assignReqToRunner(deadlineCtx, req)
168+
respChan, initialResponse, err := m.predict(ctx, req, true)
178169
if err != nil {
179-
log.Tracew("failed to get runner for async request", "error", err)
180-
m.releaseSlot()
181-
return err
182-
}
183-
184-
switch runner.status {
185-
case StatusReady:
186-
// Status ready is always valid for new predictions
187-
break
188-
case StatusStarting:
189-
if !m.cfg.UseProcedureMode {
190-
m.releaseSlot()
191-
return fmt.Errorf("%w: %s", ErrInvalidRunnerStatus, runner.status)
192-
}
193-
default:
194-
m.releaseSlot()
195-
return fmt.Errorf("%w: %s", ErrInvalidRunnerStatus, runner.status)
196-
}
197-
198-
respChan, err := runner.predict(req)
199-
if err != nil {
200-
log.Tracew("failed to predict", "error", err)
201-
m.releaseSlot()
202-
return err
170+
return nil, err
203171
}
204172

205173
// Release slot when prediction completes in background
@@ -209,33 +177,33 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error
209177
log.Tracew("async prediction completed", "prediction_id", req.ID)
210178
}()
211179

212-
return nil
180+
return initialResponse, nil
213181
}
214182

215183
// predict is the internal implementation shared by both sync and async predictions
216-
func (m *Manager) predict(ctx context.Context, req PredictionRequest) (chan PredictionResponse, error) {
184+
func (m *Manager) predict(ctx context.Context, req PredictionRequest, async bool) (chan PredictionResponse, *PredictionResponse, error) {
217185
if err := m.claimSlot(); err != nil {
218-
return nil, err
186+
return nil, nil, err
219187
}
220188

221189
deadlineCtx, deadlineCancel := context.WithTimeout(ctx, 10*time.Second)
222190
defer deadlineCancel()
223191

224-
runner, err := m.assignReqToRunnerWait(deadlineCtx, req)
192+
runner, err := m.assignReqToRunner(deadlineCtx, req, async)
225193
if err != nil {
226194
m.releaseSlot()
227-
return nil, err
195+
return nil, nil, err
228196
}
229197

230198
if !m.cfg.UseProcedureMode && runner.status != StatusReady {
231199
m.releaseSlot()
232-
return nil, fmt.Errorf("runner not ready: %s", runner.status)
200+
return nil, nil, fmt.Errorf("runner not ready: %s", runner.status)
233201
}
234202

235-
respChan, err := runner.predict(req)
203+
respChan, initialResponse, err := runner.predict(req)
236204
if err != nil {
237205
m.releaseSlot()
238-
return nil, err
206+
return nil, nil, err
239207
}
240208

241209
// Wrap the channel to release slot when prediction completes
@@ -247,39 +215,7 @@ func (m *Manager) predict(ctx context.Context, req PredictionRequest) (chan Pred
247215
close(wrappedChan)
248216
}()
249217

250-
return wrappedChan, nil
251-
}
252-
253-
// sendTerminalWebhook sends a terminal webhook synchronously for a completed prediction
254-
func (m *Manager) sendTerminalWebhook(req PredictionRequest, resp PredictionResponse) error {
255-
log := m.logger.Sugar()
256-
257-
// Send synchronously using SendConditional to respect filters
258-
// Send the actual PredictionResponse object, not a custom map
259-
260-
body, err := json.Marshal(resp)
261-
if err != nil {
262-
log.Errorw("failed to marshal prediction response", "error", err)
263-
return fmt.Errorf("failed to marshal prediction response: %w", err)
264-
}
265-
266-
if err := m.webhookSender.SendConditional(req.Webhook, bytes.NewReader(body), webhook.EventCompleted, req.WebhookEventsFilter, nil); err != nil {
267-
log.Errorw("failed to send terminal webhook", "prediction_id", resp.ID, "webhook_url", req.Webhook, "error", err)
268-
return fmt.Errorf("failed to send terminal webhook: %w", err)
269-
}
270-
log.Infow("sent terminal webhook", "prediction_id", resp.ID, "webhook_url", req.Webhook, "status", resp.Status)
271-
return nil
272-
}
273-
274-
func (m *Manager) assignReqToRunnerWait(ctx context.Context, req PredictionRequest) (*Runner, error) {
275-
runner, err := m.assignReqToRunner(ctx, req)
276-
if err != nil {
277-
return nil, fmt.Errorf("failed to assign request to runner: %w", err)
278-
}
279-
if waitForRunnerSetup(ctx, runner) != nil {
280-
return nil, err
281-
}
282-
return runner, nil
218+
return wrappedChan, initialResponse, nil
283219
}
284220

285221
// createDefaultRunner creates the default runner for non-procedure mode
@@ -384,6 +320,7 @@ func (m *Manager) createDefaultRunner(ctx context.Context) (*Runner, error) {
384320

385321
// allocatePrediction reserves a slot in the runner for the prediction
386322
func (m *Manager) allocatePrediction(runner *Runner, req PredictionRequest) { //nolint:contextcheck // we do not use this context for the prediction see note below
323+
log := m.logger.Sugar()
387324
runner.mu.Lock()
388325
defer runner.mu.Unlock()
389326

@@ -409,11 +346,14 @@ func (m *Manager) allocatePrediction(runner *Runner, req PredictionRequest) { //
409346
defer func() {
410347
// When watcher exits, handle terminal webhook and cleanup
411348
pending.mu.Lock()
412-
finalResponse := pending.response
349+
pending.response.populateFromRequest(pending.request)
350+
if err := pending.response.finalizeResponse(); err != nil {
351+
log.Errorw("failed to finalize response", "error", err)
352+
}
413353

414354
// Send terminal webhook if prediction completed
415-
if finalResponse.Status.IsCompleted() && pending.terminalWebhookSent.CompareAndSwap(false, true) {
416-
_ = m.sendTerminalWebhook(pending.request, finalResponse)
355+
if err := m.sendTerminalWebhook(pending); err != nil {
356+
log.Errorw("failed to send terminal webhook", "error", err)
417357
}
418358
pending.mu.Unlock()
419359

@@ -423,7 +363,7 @@ func (m *Manager) allocatePrediction(runner *Runner, req PredictionRequest) { //
423363
runner.mu.Unlock()
424364

425365
// In one-shot mode, stop runner after prediction completes to trigger cleanup
426-
if m.cfg.OneShot && finalResponse.Status.IsCompleted() {
366+
if m.cfg.OneShot {
427367
go func() {
428368
logger := m.logger.Sugar()
429369
logger.Infow("one-shot mode: stopping runner after prediction completion", "prediction_id", req.ID, "runner_id", runner.runnerCtx.id)
@@ -461,7 +401,7 @@ func (m *Manager) allocatePrediction(runner *Runner, req PredictionRequest) { //
461401
}()
462402
}
463403

464-
func (m *Manager) assignReqToRunner(ctx context.Context, req PredictionRequest) (*Runner, error) {
404+
func (m *Manager) assignReqToRunner(ctx context.Context, req PredictionRequest, async bool) (*Runner, error) {
465405
log := m.logger.Sugar()
466406

467407
if !m.cfg.UseProcedureMode {
@@ -523,6 +463,11 @@ func (m *Manager) assignReqToRunner(ctx context.Context, req PredictionRequest)
523463
m.allocatePrediction(procRunner, req) //nolint:contextcheck // see above note
524464
m.mu.Unlock()
525465

466+
if !async {
467+
if err := waitForRunnerSetup(ctx, procRunner); err != nil {
468+
return nil, err
469+
}
470+
}
526471
return procRunner, nil
527472
}
528473

@@ -1088,26 +1033,25 @@ func (m *Manager) monitorRunnerSubprocess(ctx context.Context, runnerName string
10881033
pending.mu.Unlock()
10891034

10901035
failedResponse := PredictionResponse{
1091-
ID: id,
10921036
Status: PredictionFailed,
1093-
Input: pending.request.Input,
10941037
Error: "setup failed",
10951038
Logs: allLogs,
10961039
Metrics: pending.response.Metrics,
10971040
}
1041+
failedResponse.populateFromRequest(pending.request)
10981042

10991043
pending.safeSend(failedResponse)
11001044
pending.safeClose()
11011045

11021046
// Update pending response with failed response for webhook
11031047
pending.mu.Lock()
11041048
pending.response = failedResponse
1105-
pending.mu.Unlock()
11061049

11071050
// Send terminal webhook since we're canceling the watcher
1108-
if pending.terminalWebhookSent.CompareAndSwap(false, true) {
1109-
_ = pending.sendWebhookSync(webhook.EventCompleted)
1051+
if err := m.sendTerminalWebhook(pending); err != nil {
1052+
log.Errorw("failed to send terminal webhook", "error", err)
11101053
}
1054+
pending.mu.Unlock()
11111055

11121056
for _, inputPath := range pending.inputPaths {
11131057
if err := os.Remove(inputPath); err != nil {
@@ -1143,26 +1087,25 @@ func (m *Manager) monitorRunnerSubprocess(ctx context.Context, runnerName string
11431087
pending.mu.Unlock()
11441088

11451089
failedResponse := PredictionResponse{
1146-
ID: id,
11471090
Status: PredictionFailed,
1148-
Input: pending.request.Input,
11491091
Error: "prediction failed",
11501092
Logs: allLogs,
11511093
Metrics: pending.response.Metrics,
11521094
}
1095+
failedResponse.populateFromRequest(pending.request)
11531096

11541097
pending.safeSend(failedResponse)
11551098
pending.safeClose()
11561099

11571100
// Update pending response with failed response for webhook
11581101
pending.mu.Lock()
11591102
pending.response = failedResponse
1160-
pending.mu.Unlock()
11611103

11621104
// Send terminal webhook since we're canceling the watcher
1163-
if pending.terminalWebhookSent.CompareAndSwap(false, true) {
1164-
_ = pending.sendWebhookSync(webhook.EventCompleted)
1105+
if err := m.sendTerminalWebhook(pending); err != nil {
1106+
log.Errorw("failed to send terminal webhook", "error", err)
11651107
}
1108+
pending.mu.Unlock()
11661109

11671110
for _, inputPath := range pending.inputPaths {
11681111
if err := os.Remove(inputPath); err != nil {
@@ -1180,3 +1123,15 @@ func (m *Manager) monitorRunnerSubprocess(ctx context.Context, runnerName string
11801123
runner.status = StatusDefunct
11811124
}
11821125
}
1126+
1127+
func (m *Manager) sendTerminalWebhook(pending *PendingPrediction) error {
1128+
log := m.logger.Sugar()
1129+
// Send terminal webhook since we're canceling the watcher
1130+
if pending.response.Status.IsCompleted() && pending.terminalWebhookSent.CompareAndSwap(false, true) {
1131+
if err := pending.response.finalizeResponse(); err != nil {
1132+
log.Errorw("failed to finalize response", "error", err)
1133+
}
1134+
return pending.sendWebhookSync(webhook.EventCompleted)
1135+
}
1136+
return nil
1137+
}

internal/runner/manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ func TestManagerPredictionHandling(t *testing.T) {
579579
require.NoError(t, err)
580580

581581
req := PredictionRequest{ID: "test-id"}
582-
_, err = m.Predict(req)
582+
_, err = m.PredictSync(req)
583583
require.Error(t, err)
584584
assert.Equal(t, ErrNoCapacity, err)
585585
})

0 commit comments

Comments
 (0)