Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/livekit/livekit-server v1.8.4
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/media-sdk v0.0.0-20251006100658-7bffd4440294
github.com/livekit/protocol v1.42.2-0.20251016024155-8cf58ff15ac6
github.com/livekit/protocol v1.43.2
github.com/livekit/psrpc v0.7.1
github.com/livekit/server-sdk-go/v2 v2.11.4-0.20251021124258-003550a83b17
github.com/livekit/storage v0.0.0-20251113154014-aa1f4d0ce057
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ github.com/livekit/media-sdk v0.0.0-20251006100658-7bffd4440294 h1:xwcL5qC1ICAqT
github.com/livekit/media-sdk v0.0.0-20251006100658-7bffd4440294/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397 h1:Z7j2mY+bvG05UC80MpnJkitlJju8sSDWsr0Bb4dPceo=
github.com/livekit/mediatransportutil v0.0.0-20250922175932-f537f0880397/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.42.2-0.20251016024155-8cf58ff15ac6 h1:Tby1v0yn0XCXl9nBVnZI9M1cQW/0o4E/ejzRgcaMETI=
github.com/livekit/protocol v1.42.2-0.20251016024155-8cf58ff15ac6/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
github.com/livekit/protocol v1.43.2 h1:yx4Xtd7yUJC2CESsBcotUcimrH7y/lZYiDYpO2YfW8g=
github.com/livekit/protocol v1.43.2/go.mod h1:yjkL2/HcaCRyHykP9rLgKST2099AGd8laaU8EuHMnfw=
github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw=
github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk=
github.com/livekit/server-sdk-go/v2 v2.11.4-0.20251021124258-003550a83b17 h1:87m5nUD7Bd4ZKZJ7HRH27wUsrjwJfDW2m1aN+s+138g=
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package config

import (
"sync"
"sync/atomic"
"time"

"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
Expand All @@ -37,6 +39,8 @@ type Stream struct {
RedactedUrl string // url with stream key removed
StreamID string // stream ID used by rtmpconnection
StreamInfo *livekit.StreamInfo

lastRetryUpdate atomic.Int64
}

func (p *PipelineConfig) GetStreamConfig() *StreamConfig {
Expand Down Expand Up @@ -94,3 +98,12 @@ func (s *Stream) UpdateEndTime(endedAt int64) {
s.StreamInfo.Duration = endedAt - s.StreamInfo.StartedAt
}
}

func (s *Stream) ShouldSendRetryUpdate(now time.Time, minInterval time.Duration) bool {
last := s.lastRetryUpdate.Load()
if last == 0 || now.UnixNano()-last >= int64(minInterval) {
s.lastRetryUpdate.Store(now.UnixNano())
return true
}
return false
}
17 changes: 17 additions & 0 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
const (
pipelineName = "pipeline"
eosTimeout = time.Second * 30

streamRetryUpdateInterval = time.Minute
)

type Controller struct {
Expand Down Expand Up @@ -363,6 +365,21 @@ func (c *Controller) streamFailed(ctx context.Context, stream *config.Stream, st
return c.getStreamSink().RemoveStream(stream)
}

func (c *Controller) trackStreamRetry(ctx context.Context, stream *config.Stream) {
now := time.Now()
stream.StreamInfo.LastRetryAt = now.UnixNano()
stream.StreamInfo.Retries++
if !stream.ShouldSendRetryUpdate(now, streamRetryUpdateInterval) {
return
}
logger.Infow("retrying stream update",
"url", stream.RedactedUrl,
"retries", stream.StreamInfo.Retries,
)

c.streamUpdated(ctx)
}

func (c *Controller) onEOSSent() {
// for video-only track/track composite, EOS might have already
// made it through the pipeline by the time endRecording is closed
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (c *Controller) handleMessageError(gErr *gst.GError) error {
if err != nil {
logger.Errorw("failed to reset stream", err)
} else if ok {
c.trackStreamRetry(context.Background(), stream)
return nil
}
}
Expand Down
Loading