Skip to content

Commit b9cf0ea

Browse files
authored
Clear previous gateway status on new requests (#3421)
Clear any previous gateway status and allow the stream status API to return the gateway status if no stream status from the runner has been received. This is to allow the front end to properly detect things like capacity errors
1 parent 4f8e1c9 commit b9cf0ea

File tree

2 files changed

+13
-6
lines changed

2 files changed

+13
-6
lines changed

server/ai_live_video.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,10 @@ func (s *SlowOrchChecker) GetCount() int {
550550

551551
func LiveErrorEventSender(ctx context.Context, streamID string, event map[string]string) func(err error) {
552552
return func(err error) {
553-
GatewayStatus.Store(streamID, map[string]interface{}{"last_error": err.Error()})
553+
GatewayStatus.Store(streamID, map[string]interface{}{
554+
"last_error": err.Error(),
555+
"last_error_time": time.Now().UnixMilli(),
556+
})
554557

555558
ev := maps.Clone(event)
556559
ev["capability"] = clog.GetVal(ctx, "capability")

server/ai_mediaserver.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,9 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
508508
ctx = clog.AddVal(ctx, "stream_id", streamID)
509509
clog.Infof(ctx, "Received live video AI request for %s. pipelineParams=%v", streamName, pipelineParams)
510510

511+
// Clear any previous gateway status
512+
GatewayStatus.Clear(streamID)
513+
511514
monitor.SendQueueEventAsync("stream_trace", map[string]interface{}{
512515
"type": "gateway_receive_stream_request",
513516
"timestamp": streamRequestTime,
@@ -663,15 +666,16 @@ func (ls *LivepeerServer) GetLiveVideoToVideoStatus() http.Handler {
663666

664667
// Get status for specific stream
665668
status, exists := StreamStatusStore.Get(streamId)
666-
if !exists {
669+
gatewayStatus, gatewayExists := GatewayStatus.Get(streamId)
670+
if !exists && !gatewayExists {
667671
http.Error(w, "Stream not found", http.StatusNotFound)
668672
return
669673
}
670-
gatewayStatus, exists := GatewayStatus.Get(streamId)
671-
if exists {
672-
for k, v := range gatewayStatus {
673-
status["gateway_"+k] = v
674+
if gatewayExists {
675+
if status == nil {
676+
status = make(map[string]any)
674677
}
678+
status["gateway_status"] = gatewayStatus
675679
}
676680

677681
w.Header().Set("Content-Type", "application/json")

0 commit comments

Comments
 (0)