Skip to content

Commit 97eea0c

Browse files
authored
Enable LK CLI to publish multiple H264 streams as a simulcast track (#604)
- add simulcast support for multiple H.264 streams.
1 parent b01c31b commit 97eea0c

File tree

4 files changed

+327
-6
lines changed

4 files changed

+327
-6
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,47 @@ lk room join --identity bot \
203203
<room_name>
204204
```
205205
206+
### Publish H.264 simulcast track from TCP
207+
208+
You can publish multiple H.264 video tracks from different TCP ports as a single [Simulcast](https://docs.livekit.io/home/client/tracks/advanced/#video-simulcast) track. This is done by using multiple `--publish` flags.
209+
210+
The track will be published in simulcast mode if multiple `--publish` flags with the syntax `h264://<host>:<port>/<width>x<height>` are passed in as arguments.
211+
212+
Example:
213+
214+
Use Gstreamer to scale a video input to 3 resolutions (1920x1080, 1280x720, 640x360), encode each as a H.264 stream and output each H.264 stream on a different port using `tcpserversink`.
215+
216+
```shell
217+
# Note: this is just an example of a Gstreamer pipeline structure
218+
# It uses a `tee` element to split the raw frame input to 3 pipelines for
219+
# scaling to a specific resolution then encoding to H.264 byte stream.
220+
gst-launch-1.0 -e -v \
221+
v4l2src device=<device> \
222+
tee name=t \
223+
t. ! <scale to 1920x1080, H.264 encode elements> ! \
224+
tcpserversink host=0.0.0.0 port=5005 sync=false async=false \
225+
t. ! <scale to 1280x720, H.264 encode elements> ! \
226+
tcpserversink host=0.0.0.0 port=5006 sync=false async=false \
227+
t. ! <scale to 640x480, H.264 encode elements> ! \
228+
tcpserversink host=0.0.0.0 port=5007 sync=false async=false
229+
```
230+
231+
Use `livekit-cli` to publish the 3 resolution streams to a single Simulcast track.
232+
233+
```shell
234+
lk room join --identity <name> --url "<url>" --api-key "<key>" --api-secret "<secret>" \
235+
--publish h264://127.0.0.1:5005/1920x1080 \
236+
--publish h264://127.0.0.1:5006/1280x720 \
237+
--publish h264://127.0.0.1:5007/640x480 <room>
238+
```
239+
240+
Notes:
241+
- LiveKit CLI can only publish simulcast tracks using H.264 codec.
242+
- You can only use multiple `--publish` flags to create a simulcast track.
243+
- Using more than 1 `--publish` flag for other types of streams will not work.
244+
- Tracks will automatically be set to HIGH/MED/LOW resolution based on the order of their width.
245+
- If only 2 tracks are published, they will be published as HIGH and LOW resolution layers.
246+
206247
### Publish streams from your application
207248
208249
Using unix sockets, it's also possible to publish streams from your application. The tracks need to be encoded into

cmd/lk/join.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import (
2222
"os"
2323
"os/signal"
2424
"path/filepath"
25+
"sort"
26+
"strconv"
2527
"strings"
28+
"sync"
2629
"syscall"
2730
"time"
2831

@@ -368,3 +371,176 @@ func publishReader(room *lksdk.Room,
368371
}
369372
return nil
370373
}
374+
375+
// simulcastURLParts represents the parsed components of a simulcast URL
376+
type simulcastURLParts struct {
377+
network string // "tcp" or "unix"
378+
address string
379+
width uint32
380+
height uint32
381+
}
382+
383+
// parseSimulcastURL validates and parses a simulcast URL in the format h264://<host:port>/<width>x<height> or h264://<socket_path>/<width>x<height>
384+
func parseSimulcastURL(url string) (*simulcastURLParts, error) {
385+
matches := simulcastURLRegex.FindStringSubmatch(url)
386+
if matches == nil {
387+
return nil, fmt.Errorf("simulcast URL must be in format h264://<host:port>/<width>x<height> or h264://<socket_path>/<width>x<height>, got: %s", url)
388+
}
389+
390+
address, widthStr, heightStr := matches[1], matches[2], matches[3]
391+
392+
// Parse dimensions
393+
width, err := strconv.ParseUint(widthStr, 10, 32)
394+
if err != nil || width == 0 {
395+
return nil, fmt.Errorf("invalid width in URL %s: must be > 0", url)
396+
}
397+
398+
height, err := strconv.ParseUint(heightStr, 10, 32)
399+
if err != nil || height == 0 {
400+
return nil, fmt.Errorf("invalid height in URL %s: must be > 0", url)
401+
}
402+
403+
network := "unix"
404+
if strings.Contains(address, ":") {
405+
network = "tcp"
406+
}
407+
408+
return &simulcastURLParts{
409+
network: network,
410+
address: address,
411+
width: uint32(width),
412+
height: uint32(height),
413+
}, nil
414+
}
415+
416+
// createSimulcastVideoTrack creates a simulcast video track from a TCP or Unix socket H.264 streams
417+
func createSimulcastVideoTrack(urlParts *simulcastURLParts, quality livekit.VideoQuality, fps float64, onComplete func()) (*lksdk.LocalTrack, error) {
418+
conn, err := net.Dial(urlParts.network, urlParts.address)
419+
if err != nil {
420+
return nil, fmt.Errorf("failed to connect to %s://%s: %w", urlParts.network, urlParts.address, err)
421+
}
422+
423+
var opts []lksdk.ReaderSampleProviderOption
424+
425+
// Add completion handler if provided
426+
if onComplete != nil {
427+
opts = append(opts, lksdk.ReaderTrackWithOnWriteComplete(onComplete))
428+
}
429+
430+
// Set frame rate if FPS is set
431+
if fps != 0 {
432+
frameDuration := time.Second / time.Duration(fps)
433+
opts = append(opts, lksdk.ReaderTrackWithFrameDuration(frameDuration))
434+
}
435+
436+
// Configure simulcast layer
437+
opts = append(opts, lksdk.ReaderTrackWithSampleOptions(lksdk.WithSimulcast("simulcast", &livekit.VideoLayer{
438+
Quality: quality,
439+
Width: urlParts.width,
440+
Height: urlParts.height,
441+
})))
442+
443+
return lksdk.NewLocalReaderTrack(conn, webrtc.MimeTypeH264, opts...)
444+
}
445+
446+
// simulcastLayer represents a parsed H.264 stream with quality info
447+
type simulcastLayer struct {
448+
url string
449+
parts *simulcastURLParts
450+
quality livekit.VideoQuality
451+
name string
452+
}
453+
454+
// handleSimulcastPublish handles publishing multiple H.264 streams as a simulcast track
455+
func handleSimulcastPublish(room *lksdk.Room, urls []string, fps float64, onPublishComplete func(*lksdk.LocalTrackPublication)) error {
456+
// Parse all URLs
457+
var layers []simulcastLayer
458+
for _, url := range urls {
459+
parts, err := parseSimulcastURL(url)
460+
if err != nil {
461+
return fmt.Errorf("invalid simulcast URL %s: %w", url, err)
462+
}
463+
if parts != nil {
464+
layers = append(layers, simulcastLayer{
465+
url: url,
466+
parts: parts,
467+
})
468+
}
469+
}
470+
471+
if len(layers) == 0 {
472+
return fmt.Errorf("no valid simulcast URLs provided")
473+
}
474+
475+
// Sort streams by width to determine quality levels
476+
sort.Slice(layers, func(i, j int) bool {
477+
return layers[i].parts.width < layers[j].parts.width
478+
})
479+
480+
// Assign quality levels based on stream count and order
481+
if len(layers) == 2 {
482+
// 2 streams: low and high quality
483+
layers[0].quality = livekit.VideoQuality_LOW
484+
layers[0].name = "low"
485+
layers[1].quality = livekit.VideoQuality_HIGH
486+
layers[1].name = "high"
487+
} else if len(layers) == 3 {
488+
// 3 streams: low, medium, high quality
489+
layers[0].quality = livekit.VideoQuality_LOW
490+
layers[0].name = "low"
491+
layers[1].quality = livekit.VideoQuality_MEDIUM
492+
layers[1].name = "medium"
493+
layers[2].quality = livekit.VideoQuality_HIGH
494+
layers[2].name = "high"
495+
} else {
496+
return fmt.Errorf("simulcast requires 2 or 3 streams, got %d", len(layers))
497+
}
498+
499+
// Create tracks for each stream
500+
var tracks []*lksdk.LocalTrack
501+
var trackNames []string
502+
503+
// Track completion - if any stream ends, signal completion
504+
var pub *lksdk.LocalTrackPublication
505+
completionSignaled := false
506+
var completionMutex sync.Mutex
507+
508+
signalCompletion := func() {
509+
completionMutex.Lock()
510+
defer completionMutex.Unlock()
511+
if !completionSignaled && onPublishComplete != nil {
512+
completionSignaled = true
513+
onPublishComplete(pub)
514+
}
515+
}
516+
517+
for _, layer := range layers {
518+
track, err := createSimulcastVideoTrack(layer.parts, layer.quality, fps, signalCompletion)
519+
if err != nil {
520+
// Clean up any tracks we've already created
521+
for _, t := range tracks {
522+
t.Close()
523+
}
524+
return fmt.Errorf("failed to create %s quality track (%dx%d): %w",
525+
layer.name, layer.parts.width, layer.parts.height, err)
526+
}
527+
tracks = append(tracks, track)
528+
trackNames = append(trackNames, fmt.Sprintf("%s(%dx%d)", layer.name, layer.parts.width, layer.parts.height))
529+
}
530+
531+
// Publish simulcast track
532+
var err error
533+
pub, err = room.LocalParticipant.PublishSimulcastTrack(tracks, &lksdk.TrackPublicationOptions{
534+
Name: "simulcast",
535+
})
536+
if err != nil {
537+
// Clean up tracks on publish failure
538+
for _, track := range tracks {
539+
track.Close()
540+
}
541+
return fmt.Errorf("failed to publish simulcast track: %w", err)
542+
}
543+
544+
fmt.Printf("Successfully published H.264 simulcast track with qualities: %v\n", trackNames)
545+
return nil
546+
}

cmd/lk/join_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,47 @@ func TestParseSocketString(t *testing.T) {
8383
assert.Equal(t, address, "foobar.com:1234")
8484
assert.Equal(t, err, nil, "Expected no error for valid vp8 TCP socket")
8585
}
86+
87+
func TestParseSimulcastURL(t *testing.T) {
88+
// Test TCP format
89+
parts, err := parseSimulcastURL("h264://localhost:8080/640x480")
90+
assert.NoError(t, err, "Expected no error for valid TCP simulcast URL")
91+
assert.Equal(t, "tcp", parts.network)
92+
assert.Equal(t, "localhost:8080", parts.address)
93+
assert.Equal(t, uint32(640), parts.width)
94+
assert.Equal(t, uint32(480), parts.height)
95+
96+
// Test Unix socket format with multiple slashes
97+
parts, err = parseSimulcastURL("h264:///tmp/my.socket/1280x720")
98+
assert.NoError(t, err, "Expected no error for valid Unix socket simulcast URL")
99+
assert.Equal(t, "unix", parts.network)
100+
assert.Equal(t, "/tmp/my.socket", parts.address)
101+
assert.Equal(t, uint32(1280), parts.width)
102+
assert.Equal(t, uint32(720), parts.height)
103+
104+
// Test Unix socket format with nested paths
105+
parts, err = parseSimulcastURL("h264:///tmp/deep/nested/path/my.socket/1920x1080")
106+
assert.NoError(t, err, "Expected no error for valid nested path Unix socket simulcast URL")
107+
assert.Equal(t, "unix", parts.network)
108+
assert.Equal(t, "/tmp/deep/nested/path/my.socket", parts.address)
109+
assert.Equal(t, uint32(1920), parts.width)
110+
assert.Equal(t, uint32(1080), parts.height)
111+
112+
// Test simple socket name without path
113+
parts, err = parseSimulcastURL("h264://mysocket/640x480")
114+
assert.NoError(t, err, "Expected no error for simple socket name")
115+
assert.Equal(t, "unix", parts.network)
116+
assert.Equal(t, "mysocket", parts.address)
117+
assert.Equal(t, uint32(640), parts.width)
118+
assert.Equal(t, uint32(480), parts.height)
119+
120+
// Test invalid format
121+
_, err = parseSimulcastURL("h264://localhost:8080")
122+
assert.Error(t, err, "Expected error for URL without dimensions")
123+
124+
_, err = parseSimulcastURL("opus:///tmp/socket/640x480")
125+
assert.Error(t, err, "Expected error for non-h264 protocol")
126+
127+
_, err = parseSimulcastURL("h264:///tmp/socket/invalidxinvalid")
128+
assert.Error(t, err, "Expected error for invalid dimensions")
129+
}

0 commit comments

Comments
 (0)