Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch pion",
"type": "go",
"request": "launch",
"preLaunchTask": "go: build package",
"env": {
"INGRESS_CONFIG_BODY": "openvidu:\n rtc:\n engine: pion\nredis:\n address: 127.0.0.1:6379\n username: \"\"\n password: redispassword\n db: 0\n use_tls: false\napi_key: devkey\napi_secret: secret\nws_url: ws://127.0.0.1:7880\nrtmp_port: 1935\nwhip_port: 8085\nhttp_relay_port: 9090\nhealth_port: 9091\nlogging:\n json: false\n level: debug\ndevelopment: false\nrtc_config:\n udp_port: 7895\ndebug_handler_port: 40000\ncpu_cost:\n min_idle_ratio: 0.01\n rtmp_cpu_cost: -10\n whip_cpu_cost: -10\n whip_bypass_transcoding_cpu_cost: -10\n url_cpu_cost: -10\n"
},
"program": "${workspaceFolder}/cmd/server"
},
{
"name": "Launch mediasoup",
"type": "go",
"request": "launch",
"preLaunchTask": "go: build package",
"env": {
"INGRESS_CONFIG_BODY": "openvidu:\n rtc:\n engine: mediasoup\nredis:\n address: 127.0.0.1:6379\n username: \"\"\n password: redispassword\n db: 0\n use_tls: false\napi_key: devkey\napi_secret: secret\nws_url: ws://127.0.0.1:7880\nrtmp_port: 1935\nwhip_port: 8085\nhttp_relay_port: 9090\nhealth_port: 9091\nlogging:\n json: false\n level: debug\ndevelopment: false\nrtc_config:\n udp_port: 7895\ndebug_handler_port: 40000\ncpu_cost:\n min_idle_ratio: 0.01\n rtmp_cpu_cost: -10\n whip_cpu_cost: -10\n whip_bypass_transcoding_cpu_cost: -10\n url_cpu_cost: -10\n"
},
"program": "${workspaceFolder}/cmd/server"
}
]
}
19 changes: 19 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "2.0.0",
"tasks": [
{
"type": "go",
"label": "go: build package",
"command": "build",
"args": [
"-o",
"ingress-debug",
"./cmd/server"
],
"problemMatcher": [
"$go"
],
"group": "build",
}
]
}
8 changes: 8 additions & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func runService(_ context.Context, c *cli.Command) error {
return err
}

// BEGIN OPENVIDU BLOCK
if conf.OpenVidu.Rtc.Engine != "" {
fmt.Println("Rtc Engine: " + string(conf.OpenVidu.Rtc.Engine))
} else {
fmt.Println("Rtc Engine not set. Using pion")
}
// END OPENVIDU BLOCK

rc, err := redis.GetRedisClient(conf.Redis)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
"github.com/livekit/protocol/utils"
"github.com/livekit/psrpc"
lksdk "github.com/livekit/server-sdk-go/v2"

// BEGIN OPENVIDU BLOCK
"github.com/livekit/ingress/pkg/openvidupro/openviduproconfig"
// END OPENVIDU BLOCK
)

const (
Expand All @@ -43,6 +47,10 @@ var (
type Config struct {
*ServiceConfig `yaml:",inline"`
*InternalConfig `yaml:",inline"`

// BEGIN OPENVIDU BLOCK
OpenVidu openviduproconfig.OpenViduProConfig `yaml:"openvidu"`
// END OPENVIDU BLOCK
}

type ServiceConfig struct {
Expand Down
25 changes: 24 additions & 1 deletion pkg/media/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ func NewInput(ctx context.Context, p *params.Params, g *stats.LocalMediaStatsGat
latencyCaps: gst.NewCapsFromString(packetLatencyCaps),
}

// BEGIN OPENVIDU BLOCK
// We must add the rtspsrc element to the parent bin so it can later link its
// dynamic pads to the ghost sink pads of the video bin and audio bin
if strings.HasPrefix(p.Url, "rtsp://") || strings.HasPrefix(p.Url, "rtsps://") {
urlsource, ok := src.(*urlpull.URLSource)
if !ok {
return nil, errors.New("URL is rtsp but source is not of type URLSource")
}
bin.Add(urlsource.Rtspsrc)
}
// END OPENVIDU BLOCK

if p.InputType == livekit.IngressInput_URL_INPUT {
// Gather input stats from the pipeline
i.trackStatsGatherer[types.Audio] = g.RegisterTrackStats(stats.InputAudio)
Expand Down Expand Up @@ -197,7 +209,18 @@ func (i *Input) onPadAdded(_ *gst.Element, pad *gst.Pad) {
err = i.source.ValidateCaps(caps.(*gst.Caps))
if err != nil {
logger.Infow("input caps validation failed", "error", err)
return

// BEGIN OPENVIDU BLOCK
// In some occasions the caps might be empty when using rtspsrc
// Simply ignore this error if this is the case
rtspsrcElement, rtspsrcError := i.bin.GetElementByName("rtspsrc")
if rtspsrcError == nil && rtspsrcElement != nil {
logger.Infow("Ignore validation caps as we are using rtspsrc")
err = nil
} else {
return
}
// END OPENVIDU BLOCK
}
}
}
Expand Down
172 changes: 172 additions & 0 deletions pkg/media/urlpull/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package urlpull
import (
"context"
"strings"
"sync"
"time"

"github.com/frostbyte73/core"
Expand All @@ -29,6 +30,11 @@ import (

var (
supportedMimeTypes = []string{

// BEGIN OPENVIDU BLOCK
"application/x-rtp", // RTSP streams have this mime type
// END OPENVIDU BLOCK

"audio/x-m4a",
"application/x-hls",
"video/quicktime",
Expand All @@ -42,6 +48,11 @@ var (
)

type URLSource struct {
// BEGIN OPENVIDU BLOCK
srcAudio *gst.Element
Rtspsrc *gst.Element
// END OPENVIDU BLOCK

params *params.Params
src *gst.Element
pad *gst.Pad
Expand Down Expand Up @@ -85,10 +96,163 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
}
}
}

// BEGIN OPENVIDU BLOCK
} else if strings.HasPrefix(p.Url, "rtsp://") || strings.HasPrefix(p.Url, "rtsps://") {

elem, err = gst.NewElementWithName("rtspsrc", "rtspsrc")
if err != nil {
return nil, err
}
err = elem.SetProperty("location", p.Url)
if err != nil {
return nil, err
}
err = elem.SetProperty("latency", uint(2000))
if err != nil {
return nil, err
}
err = elem.SetProperty("drop-on-latency", true)
if err != nil {
return nil, err
}
err = elem.SetProperty("port-range", "0-0")
if err != nil {
return nil, err
}
// END OPENVIDU BLOCK

} else {
return nil, errors.ErrUnsupportedURLFormat
}

// BEGIN OPENVIDU BLOCK
if strings.HasPrefix(p.Url, "rtsp://") || strings.HasPrefix(p.Url, "rtsps://") {
// Video
videoqueue, _ := gst.NewElementWithName("queue2", "videoqueue")
bin.Add(videoqueue)
// Create video queue ghost sink
videoqueuesink := videoqueue.GetStaticPad("sink")
if videoqueuesink == nil {
return nil, errors.ErrUnableToAddPad
}
videoqueueghostsink := gst.NewGhostPad("sink", videoqueuesink)
bin.AddPad(videoqueueghostsink.Pad)
// Create video queue ghost source
videoqueuesrc := videoqueue.GetStaticPad("src")
if videoqueuesrc == nil {
return nil, errors.ErrUnableToAddPad
}
videoqueueghostsrc := gst.NewGhostPad("src", videoqueuesrc)
if !bin.AddPad(videoqueueghostsrc.Pad) {
return nil, errors.ErrUnableToAddPad
}

// Audio
audiobin := gst.NewBin("audioinput")
audioqueue, _ := gst.NewElementWithName("queue2", "audioqueue")
audiobin.Add(audioqueue)
// Create audio queue ghost sink
audioqueuesink := audioqueue.GetStaticPad("sink")
if audioqueuesink == nil {
return nil, errors.ErrUnableToAddPad
}
audioqueueghostsink := gst.NewGhostPad("sink", audioqueuesink)
audiobin.AddPad(audioqueueghostsink.Pad)
// Create audio queue ghost source
audioqueuesrc := audioqueue.GetStaticPad("src")
if audioqueuesrc == nil {
return nil, errors.ErrUnableToAddPad
}
audioqueueghostsrc := gst.NewGhostPad("src", audioqueuesrc)
if !audiobin.AddPad(audioqueueghostsrc.Pad) {
return nil, errors.ErrUnableToAddPad
}

var mu sync.Mutex

elem.Connect("pad-added", func(src *gst.Element, pad *gst.Pad) {

padName := pad.GetName()

logger.Infow("rtspsrc pad-added", "padName", padName)

var queue *gst.Element
var sinkPad *gst.Pad
var getQueueErr error
isAudio := false
isVideo := false

pad.GetCurrentCaps().ForEach(func(features *gst.CapsFeatures, structure *gst.Structure) bool {
value, getMediaErr := structure.GetValue("media")
if getMediaErr != nil {
logger.Errorw("failed to get media value from caps", getMediaErr)
return false
}
if value == "audio" {
isAudio = true
return true
} else if value == "video" {
isVideo = true
return true
} else {
logger.Errorw("pad unrecognized media type", nil, "media", value)
return false
}
})

mu.Lock()
defer mu.Unlock()

if isAudio {
sinkPad = audiobin.GetStaticPad("sink")
queue, getQueueErr = audiobin.GetElementByName("audioqueue")
} else if isVideo {
sinkPad = bin.GetStaticPad("sink")
queue, getQueueErr = bin.GetElementByName("videoqueue")
} else {
logger.Errorw("pad media type not audio nor video", nil, "padName", padName)
return
}

if sinkPad == nil {
logger.Errorw("failed to get sink pad", nil)
return
}
if getQueueErr != nil {
logger.Errorw("failed to get queue element", err)
return
}

if sinkPad.IsLinked() {
var kind string
if isAudio {
kind = "audio"
} else {
kind = "video"
}
logger.Warnw("sink pad of kind "+kind+" is already linked", nil)
logger.Warnw("this usually means that the source is sending multiple streams of the same kind", nil)
return
}

padLinkReturnValue := pad.Link(sinkPad)
if padLinkReturnValue != gst.PadLinkOK && padLinkReturnValue != gst.PadLinkWasLinked {
logger.Errorw("failed to link pad", nil, "padLinkReturnValue", padLinkReturnValue)
return
}
queue.SyncStateWithParent()
})

return &URLSource{
params: p,
src: bin.Element,
srcAudio: audiobin.Element,
Rtspsrc: elem,
}, nil
}
// END OPENVIDU BLOCK

queue, err := gst.NewElement("queue2")
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,6 +299,14 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
}

func (u *URLSource) GetSources() []*gst.Element {
// BEGIN OPENVIDU BLOCK
if u.srcAudio != nil {
return []*gst.Element{
u.src,
u.srcAudio,
}
}
// END OPENVIDU BLOCK
return []*gst.Element{
u.src,
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/openvidupro/openviduproconfig/openviduproconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// BEGIN OPENVIDU BLOCK
package openviduproconfig

type RtcEngine string

const (
RtcEnginePion RtcEngine = "pion"
RtcEngineMediasoup RtcEngine = "mediasoup"
)

type RtcConfig struct {
Engine RtcEngine `yaml:"engine,omitempty"`
}

type OpenViduProConfig struct {
// WebRTC engine config.
Rtc RtcConfig `yaml:"rtc"`
}

// END OPENVIDU BLOCK
Loading
Loading