Skip to content

Commit b515fce

Browse files
authored
Merge pull request #2 from OpenVidu/v1.4.3
V1.4.3
2 parents 438d993 + a485d6f commit b515fce

File tree

8 files changed

+309
-1
lines changed

8 files changed

+309
-1
lines changed

.vscode/launch.json

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{
2+
// Use IntelliSense to learn about possible attributes.
3+
// Hover to view descriptions of existing attributes.
4+
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5+
"version": "0.2.0",
6+
"configurations": [
7+
{
8+
"name": "Launch pion",
9+
"type": "go",
10+
"request": "launch",
11+
"preLaunchTask": "go: build package",
12+
"env": {
13+
"INGRESS_CONFIG_BODY": "openvidu:\n rtc:\n engine: pion\nredis:\n address: 127.0.0.1:6379\n username: \"\"\n password: redispassword\n db: 0\n use_tls: false\napi_key: devkey\napi_secret: secret\nws_url: ws://127.0.0.1:7880\nrtmp_port: 1935\nwhip_port: 8085\nhttp_relay_port: 9090\nhealth_port: 9091\nlogging:\n json: false\n level: debug\ndevelopment: false\nrtc_config:\n udp_port: 7895\ndebug_handler_port: 40000\ncpu_cost:\n min_idle_ratio: 0.01\n rtmp_cpu_cost: -10\n whip_cpu_cost: -10\n whip_bypass_transcoding_cpu_cost: -10\n url_cpu_cost: -10\n"
14+
},
15+
"program": "${workspaceFolder}/cmd/server"
16+
},
17+
{
18+
"name": "Launch mediasoup",
19+
"type": "go",
20+
"request": "launch",
21+
"preLaunchTask": "go: build package",
22+
"env": {
23+
"INGRESS_CONFIG_BODY": "openvidu:\n rtc:\n engine: mediasoup\nredis:\n address: 127.0.0.1:6379\n username: \"\"\n password: redispassword\n db: 0\n use_tls: false\napi_key: devkey\napi_secret: secret\nws_url: ws://127.0.0.1:7880\nrtmp_port: 1935\nwhip_port: 8085\nhttp_relay_port: 9090\nhealth_port: 9091\nlogging:\n json: false\n level: debug\ndevelopment: false\nrtc_config:\n udp_port: 7895\ndebug_handler_port: 40000\ncpu_cost:\n min_idle_ratio: 0.01\n rtmp_cpu_cost: -10\n whip_cpu_cost: -10\n whip_bypass_transcoding_cpu_cost: -10\n url_cpu_cost: -10\n"
24+
},
25+
"program": "${workspaceFolder}/cmd/server"
26+
}
27+
]
28+
}

.vscode/tasks.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"version": "2.0.0",
3+
"tasks": [
4+
{
5+
"type": "go",
6+
"label": "go: build package",
7+
"command": "build",
8+
"args": [
9+
"-o",
10+
"ingress-debug",
11+
"./cmd/server"
12+
],
13+
"problemMatcher": [
14+
"$go"
15+
],
16+
"group": "build",
17+
}
18+
]
19+
}

cmd/server/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ func runService(_ context.Context, c *cli.Command) error {
110110
return err
111111
}
112112

113+
// BEGIN OPENVIDU BLOCK
114+
if conf.OpenVidu.Rtc.Engine != "" {
115+
fmt.Println("Rtc Engine: " + string(conf.OpenVidu.Rtc.Engine))
116+
} else {
117+
fmt.Println("Rtc Engine not set. Using pion")
118+
}
119+
// END OPENVIDU BLOCK
120+
113121
rc, err := redis.GetRedisClient(conf.Redis)
114122
if err != nil {
115123
return err

pkg/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import (
2828
"github.com/livekit/protocol/utils"
2929
"github.com/livekit/psrpc"
3030
lksdk "github.com/livekit/server-sdk-go/v2"
31+
32+
// BEGIN OPENVIDU BLOCK
33+
"github.com/livekit/ingress/pkg/openvidupro/openviduproconfig"
34+
// END OPENVIDU BLOCK
3135
)
3236

3337
const (
@@ -43,6 +47,10 @@ var (
4347
type Config struct {
4448
*ServiceConfig `yaml:",inline"`
4549
*InternalConfig `yaml:",inline"`
50+
51+
// BEGIN OPENVIDU BLOCK
52+
OpenVidu openviduproconfig.OpenViduProConfig `yaml:"openvidu"`
53+
// END OPENVIDU BLOCK
4654
}
4755

4856
type ServiceConfig struct {

pkg/media/input.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ func NewInput(ctx context.Context, p *params.Params, g *stats.LocalMediaStatsGat
9595
latencyCaps: gst.NewCapsFromString(packetLatencyCaps),
9696
}
9797

98+
// BEGIN OPENVIDU BLOCK
99+
// We must add the rtspsrc element to the parent bin so it can later link its
100+
// dynamic pads to the ghost sink pads of the video bin and audio bin
101+
if strings.HasPrefix(p.Url, "rtsp://") || strings.HasPrefix(p.Url, "rtsps://") {
102+
urlsource, ok := src.(*urlpull.URLSource)
103+
if !ok {
104+
return nil, errors.New("URL is rtsp but source is not of type URLSource")
105+
}
106+
bin.Add(urlsource.Rtspsrc)
107+
}
108+
// END OPENVIDU BLOCK
109+
98110
if p.InputType == livekit.IngressInput_URL_INPUT {
99111
// Gather input stats from the pipeline
100112
i.trackStatsGatherer[types.Audio] = g.RegisterTrackStats(stats.InputAudio)
@@ -197,7 +209,18 @@ func (i *Input) onPadAdded(_ *gst.Element, pad *gst.Pad) {
197209
err = i.source.ValidateCaps(caps.(*gst.Caps))
198210
if err != nil {
199211
logger.Infow("input caps validation failed", "error", err)
200-
return
212+
213+
// BEGIN OPENVIDU BLOCK
214+
// In some occasions the caps might be empty when using rtspsrc
215+
// Simply ignore this error if this is the case
216+
rtspsrcElement, rtspsrcError := i.bin.GetElementByName("rtspsrc")
217+
if rtspsrcError == nil && rtspsrcElement != nil {
218+
logger.Infow("Ignore validation caps as we are using rtspsrc")
219+
err = nil
220+
} else {
221+
return
222+
}
223+
// END OPENVIDU BLOCK
201224
}
202225
}
203226
}

pkg/media/urlpull/source.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package urlpull
1717
import (
1818
"context"
1919
"strings"
20+
"sync"
2021
"time"
2122

2223
"github.com/frostbyte73/core"
@@ -29,6 +30,11 @@ import (
2930

3031
var (
3132
supportedMimeTypes = []string{
33+
34+
// BEGIN OPENVIDU BLOCK
35+
"application/x-rtp", // RTSP streams have this mime type
36+
// END OPENVIDU BLOCK
37+
3238
"audio/x-m4a",
3339
"application/x-hls",
3440
"video/quicktime",
@@ -42,6 +48,11 @@ var (
4248
)
4349

4450
type URLSource struct {
51+
// BEGIN OPENVIDU BLOCK
52+
srcAudio *gst.Element
53+
Rtspsrc *gst.Element
54+
// END OPENVIDU BLOCK
55+
4556
params *params.Params
4657
src *gst.Element
4758
pad *gst.Pad
@@ -85,10 +96,163 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
8596
}
8697
}
8798
}
99+
100+
// BEGIN OPENVIDU BLOCK
101+
} else if strings.HasPrefix(p.Url, "rtsp://") || strings.HasPrefix(p.Url, "rtsps://") {
102+
103+
elem, err = gst.NewElementWithName("rtspsrc", "rtspsrc")
104+
if err != nil {
105+
return nil, err
106+
}
107+
err = elem.SetProperty("location", p.Url)
108+
if err != nil {
109+
return nil, err
110+
}
111+
err = elem.SetProperty("latency", uint(2000))
112+
if err != nil {
113+
return nil, err
114+
}
115+
err = elem.SetProperty("drop-on-latency", true)
116+
if err != nil {
117+
return nil, err
118+
}
119+
err = elem.SetProperty("port-range", "0-0")
120+
if err != nil {
121+
return nil, err
122+
}
123+
// END OPENVIDU BLOCK
124+
88125
} else {
89126
return nil, errors.ErrUnsupportedURLFormat
90127
}
91128

129+
// BEGIN OPENVIDU BLOCK
130+
if strings.HasPrefix(p.Url, "rtsp://") || strings.HasPrefix(p.Url, "rtsps://") {
131+
// Video
132+
videoqueue, _ := gst.NewElementWithName("queue2", "videoqueue")
133+
bin.Add(videoqueue)
134+
// Create video queue ghost sink
135+
videoqueuesink := videoqueue.GetStaticPad("sink")
136+
if videoqueuesink == nil {
137+
return nil, errors.ErrUnableToAddPad
138+
}
139+
videoqueueghostsink := gst.NewGhostPad("sink", videoqueuesink)
140+
bin.AddPad(videoqueueghostsink.Pad)
141+
// Create video queue ghost source
142+
videoqueuesrc := videoqueue.GetStaticPad("src")
143+
if videoqueuesrc == nil {
144+
return nil, errors.ErrUnableToAddPad
145+
}
146+
videoqueueghostsrc := gst.NewGhostPad("src", videoqueuesrc)
147+
if !bin.AddPad(videoqueueghostsrc.Pad) {
148+
return nil, errors.ErrUnableToAddPad
149+
}
150+
151+
// Audio
152+
audiobin := gst.NewBin("audioinput")
153+
audioqueue, _ := gst.NewElementWithName("queue2", "audioqueue")
154+
audiobin.Add(audioqueue)
155+
// Create audio queue ghost sink
156+
audioqueuesink := audioqueue.GetStaticPad("sink")
157+
if audioqueuesink == nil {
158+
return nil, errors.ErrUnableToAddPad
159+
}
160+
audioqueueghostsink := gst.NewGhostPad("sink", audioqueuesink)
161+
audiobin.AddPad(audioqueueghostsink.Pad)
162+
// Create audio queue ghost source
163+
audioqueuesrc := audioqueue.GetStaticPad("src")
164+
if audioqueuesrc == nil {
165+
return nil, errors.ErrUnableToAddPad
166+
}
167+
audioqueueghostsrc := gst.NewGhostPad("src", audioqueuesrc)
168+
if !audiobin.AddPad(audioqueueghostsrc.Pad) {
169+
return nil, errors.ErrUnableToAddPad
170+
}
171+
172+
var mu sync.Mutex
173+
174+
elem.Connect("pad-added", func(src *gst.Element, pad *gst.Pad) {
175+
176+
padName := pad.GetName()
177+
178+
logger.Infow("rtspsrc pad-added", "padName", padName)
179+
180+
var queue *gst.Element
181+
var sinkPad *gst.Pad
182+
var getQueueErr error
183+
isAudio := false
184+
isVideo := false
185+
186+
pad.GetCurrentCaps().ForEach(func(features *gst.CapsFeatures, structure *gst.Structure) bool {
187+
value, getMediaErr := structure.GetValue("media")
188+
if getMediaErr != nil {
189+
logger.Errorw("failed to get media value from caps", getMediaErr)
190+
return false
191+
}
192+
if value == "audio" {
193+
isAudio = true
194+
return true
195+
} else if value == "video" {
196+
isVideo = true
197+
return true
198+
} else {
199+
logger.Errorw("pad unrecognized media type", nil, "media", value)
200+
return false
201+
}
202+
})
203+
204+
mu.Lock()
205+
defer mu.Unlock()
206+
207+
if isAudio {
208+
sinkPad = audiobin.GetStaticPad("sink")
209+
queue, getQueueErr = audiobin.GetElementByName("audioqueue")
210+
} else if isVideo {
211+
sinkPad = bin.GetStaticPad("sink")
212+
queue, getQueueErr = bin.GetElementByName("videoqueue")
213+
} else {
214+
logger.Errorw("pad media type not audio nor video", nil, "padName", padName)
215+
return
216+
}
217+
218+
if sinkPad == nil {
219+
logger.Errorw("failed to get sink pad", nil)
220+
return
221+
}
222+
if getQueueErr != nil {
223+
logger.Errorw("failed to get queue element", err)
224+
return
225+
}
226+
227+
if sinkPad.IsLinked() {
228+
var kind string
229+
if isAudio {
230+
kind = "audio"
231+
} else {
232+
kind = "video"
233+
}
234+
logger.Warnw("sink pad of kind "+kind+" is already linked", nil)
235+
logger.Warnw("this usually means that the source is sending multiple streams of the same kind", nil)
236+
return
237+
}
238+
239+
padLinkReturnValue := pad.Link(sinkPad)
240+
if padLinkReturnValue != gst.PadLinkOK && padLinkReturnValue != gst.PadLinkWasLinked {
241+
logger.Errorw("failed to link pad", nil, "padLinkReturnValue", padLinkReturnValue)
242+
return
243+
}
244+
queue.SyncStateWithParent()
245+
})
246+
247+
return &URLSource{
248+
params: p,
249+
src: bin.Element,
250+
srcAudio: audiobin.Element,
251+
Rtspsrc: elem,
252+
}, nil
253+
}
254+
// END OPENVIDU BLOCK
255+
92256
queue, err := gst.NewElement("queue2")
93257
if err != nil {
94258
return nil, err
@@ -135,6 +299,14 @@ func NewURLSource(ctx context.Context, p *params.Params) (*URLSource, error) {
135299
}
136300

137301
func (u *URLSource) GetSources() []*gst.Element {
302+
// BEGIN OPENVIDU BLOCK
303+
if u.srcAudio != nil {
304+
return []*gst.Element{
305+
u.src,
306+
u.srcAudio,
307+
}
308+
}
309+
// END OPENVIDU BLOCK
138310
return []*gst.Element{
139311
u.src,
140312
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// BEGIN OPENVIDU BLOCK
2+
package openviduproconfig
3+
4+
type RtcEngine string
5+
6+
const (
7+
RtcEnginePion RtcEngine = "pion"
8+
RtcEngineMediasoup RtcEngine = "mediasoup"
9+
)
10+
11+
type RtcConfig struct {
12+
Engine RtcEngine `yaml:"engine,omitempty"`
13+
}
14+
15+
type OpenViduProConfig struct {
16+
// WebRTC engine config.
17+
Rtc RtcConfig `yaml:"rtc"`
18+
}
19+
20+
// END OPENVIDU BLOCK

0 commit comments

Comments
 (0)