feat(audio): encoded and decoded opus audio tracks#633
Conversation
examples/pcmopus/subscribing/main.go
Outdated
| @@ -0,0 +1,82 @@ | |||
| package main | |||
There was a problem hiding this comment.
would be good to merge this and the above into a single file and possibly have command line options to start up as subscriber only or publisher only. Good to have related code together in examples.
There was a problem hiding this comment.
done, please have a look
pcmaudiotrack.go
Outdated
|
|
||
| // TODO: Support stereo | ||
| func NewPCM16ToOpusAudioTrack(sampleRate int, frameDuration time.Duration, logger protoLogger.Logger) (*PCM16ToOpusAudioTrack, error) { | ||
| track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "test", "test") |
There was a problem hiding this comment.
:nit: maybe some more descriptive names for id, stream id?
There was a problem hiding this comment.
Ah yea, that is a miss
pcmaudiotrack.go
Outdated
| var resetTimer bool | ||
|
|
||
| for len(t.sampleBuffer) == 0 && !t.closed.Load() { | ||
| t.cond.Wait() |
There was a problem hiding this comment.
If you are purely pacing, I am guessing this is tricky. Assume a bunch of packets arrive all at once. This will be blocked for a while after waking up from timer. Then it will push one packet and sleep for frameDuration again. I am guessing there needs to be a mechanism to check if multiple samples should be pushed in one run to catch up.
There was a problem hiding this comment.
Or if the input is paced, can rely on that and this can run purely on condition variable and not use a timer at all.
There was a problem hiding this comment.
Good point, yeah pacing handling needs to be better.
There was a problem hiding this comment.
What if the user does not send anything for silence? Should that be a requirement?
There was a problem hiding this comment.
On the PCM side, there is no real way to tell how much the gap is unless it is wrapped in something like RTP.
I guess we need to understand the flow fully and design for it (sorry I do not know the full requirements). My guess PCM has to be sending even for silence. Otherwise, there is no way to know what chunk is for what time as there is no time stamp.
There was a problem hiding this comment.
Changed things around, please let me know what you think of it now, frameDuration is not taken from the user anymore and user supplied chunks are split internally.
pcmaudiotrack.go
Outdated
| // TODO: We also have a reader API, but writer is more efficient as it is zero copy. | ||
| // Shall we support both reader and writer? | ||
| // Reader makes more sense while reading the code, might be easier for the end user to understand. | ||
| // But, it's less efficient as it involves a copy. |
There was a problem hiding this comment.
I think efficient method with some comments is better.
There was a problem hiding this comment.
which one is more user friendly? let's go with that approach?
not against having a more efficient interface too. but would be focus on the ergonomics. what about:
.WriteTo(writer) for the more efficient interface, and
.Read() for the standard reader?
pcmaudiotrack.go
Outdated
|
|
||
| func (t *OpusToPCM16AudioTrack) Close() { | ||
| // opus writer closes resampledPCMWriter internally | ||
| t.opusWriter.Close() |
There was a problem hiding this comment.
yeah, definitely harder to read, but some comments could be helpful, like this comment is useful.
Similarly would be good to have comments on how NewMediaStreamIn or opus.Decode handle this with writer would be nice to understand how the flow actually happens
There was a problem hiding this comment.
I agree it's hard to read, mostly because of the writer APIs: #633 (comment)
will add more comments
|
One more thing to ensure is endianness of PCM 16-bit. I am guessing |
pcmaudiotrack.go
Outdated
| } | ||
|
|
||
| // TODO: Support stereo | ||
| func NewPCM16ToOpusAudioTrack(sampleRate int, frameDuration time.Duration, logger protoLogger.Logger) (*PCM16ToOpusAudioTrack, error) { |
There was a problem hiding this comment.
is sampleRate the source sample rate or destination? I thought opus always used 48k internally. some clients are making this assumption, so we'd want to make sure it's compatible.
IMO users should specify source sample rate, and we can handle the compat layer during publishing
is frame duration the size of each frame that they are expected to write? or is this size of output frame size?
There was a problem hiding this comment.
both sample rate and frame duration are of source, i'll modify var names to indicate that clearly
There was a problem hiding this comment.
do we need to know the frame size? or can it be automatically handled internally? webrtc uses 10ms frames. we should be able to buffer internally if user pushes less than 10ms.. or split it into multiple samples if it's more than 10ms
There was a problem hiding this comment.
https://github.com/livekit/sip/blob/main/pkg/media/pcm.go#L203
this part of the code expects a set frame duration, which it uses to construct a media.Sample to write to the track. There might be a way to check for frame duration internally and pass it along dynamically, but this is the last writer in the chain so that information will have to be passed along. Also, it makes pacing more tricky, we already have an issue with it, in this thread: #633 (comment)
There was a problem hiding this comment.
This has been changed to not take frame duration from the user, please have a look at it now!
pcmaudiotrack.go
Outdated
| // TODO: We also have a reader API, but writer is more efficient as it is zero copy. | ||
| // Shall we support both reader and writer? | ||
| // Reader makes more sense while reading the code, might be easier for the end user to understand. | ||
| // But, it's less efficient as it involves a copy. |
There was a problem hiding this comment.
which one is more user friendly? let's go with that approach?
not against having a more efficient interface too. but would be focus on the ergonomics. what about:
.WriteTo(writer) for the more efficient interface, and
.Read() for the standard reader?
pcmaudiotrack.go
Outdated
| // Reader makes more sense while reading the code, might be easier for the end user to understand. | ||
| // But, it's less efficient as it involves a copy. | ||
| func NewOpusToPCM16AudioTrack(track *webrtc.TrackRemote, publication *RemoteTrackPublication, writer *audio.WriteCloser[audio.PCM16Sample], sampleRate int, handleJitter bool) (*OpusToPCM16AudioTrack, error) { | ||
| if track.Codec().MimeType != webrtc.MimeTypeOpus { |
There was a problem hiding this comment.
should handle jitter be an option? or is it a configuration for max jitter buffer length? (0 to disable)
sampleRate -> targetSampleRate would be better.
There was a problem hiding this comment.
thinking out loud.. maybe it's better to have this to be a ...opts pattern.. so users can trust sensible defaults
There was a problem hiding this comment.
should handle jitter be an option? or is it a configuration for max jitter buffer length? (0 to disable)
Maybe, we can just do it internally. Currently, it's just a bool for whether jitter should be handled, I think it makes sense to do that before encoding to PCM, I haven't tried without.
Jitter buffer length is not configurable in the current implementation: https://github.com/livekit/sip/blob/main/pkg/media/rtp/jitter.go#L29
There was a problem hiding this comment.
Is there a need for jitter buffer? Is there mixing happening? Or is the outgoing RTP timestamp getting messed up due to jitter?
There was a problem hiding this comment.
As you said earlier, the output PCM has no seq number etc, a jitter buffer will eliminate out of order packets so that output PCM is always written in sequence.
There was a problem hiding this comment.
Got it. Thank you 🙏 . Guess, we do not do packet loss concealment (or maybe we are letting libopus do it?).
That depth of jitter buffer should be enough for server-to-server paths I guess. But, I am trying to ensure that input and output durations are the same. So, if the jitter buffer hits the max delay condition and it is popping a packet with a gap, how is that gap filled?
There was a problem hiding this comment.
I looked into it in more detail, SIP does not seem to handle concealment at the moment. Thought the Opus library we are using has methods for decoding it from FEC or PLC, but it might be tricky to use them properly. Maybe we can skip concealment for v1?
There was a problem hiding this comment.
Yeah, that's fine.
Maybe fill in zeros? I am thinking it should be very rare.
We should try to keep the durations right.
There was a problem hiding this comment.
thinking out loud.. maybe it's better to have this to be a ...opts pattern.. so users can trust sensible defaults
The only optional parameter there is the handleJitter one, not sure if I should use opts pattern for just one option
There was a problem hiding this comment.
Maybe fill in zeros? I am thinking it should be very rare.
Yeah it should be rare, let me make a note of it and keep it for v2.
pcmaudiotrack.go
Outdated
|
|
||
| channels := 1 | ||
| if publication.TrackInfo().Stereo { | ||
| channels = 2 |
There was a problem hiding this comment.
ideally users would have an option on preferred output channels, and we can handle downmix
There was a problem hiding this comment.
what about publishing? will that always be single channel?
There was a problem hiding this comment.
supporting both would be great.. but v1, single channel is fine IMO
There was a problem hiding this comment.
I think publishing should be easy, I'll try it.
I tried subscribing stereo, but the audio came out weird. But, forcing a conversion to mono, it's fine. Could be a bug with the webm writer used in the example, debugging that one to make sure.
pcmaudiotrack.go
Outdated
| opusWriter := audio.FromSampleWriter[opus.Sample](track, opusSampleRate, frameDuration) | ||
| pcmWriter, err := opus.Encode(opusWriter, 1, logger) | ||
| if err != nil { | ||
| return nil, err |
There was a problem hiding this comment.
Do we need to close the track here?
There was a problem hiding this comment.
Do you mean webrtc.TrackRemote? If yes, it does not have a close method, get's cleaned up on OnTrackUnsubscribed or on room.Disconnect()
examples/pcmopus/subscribing/main.go
Outdated
| }, &lksdk.RoomCallback{ | ||
| ParticipantCallback: lksdk.ParticipantCallback{ | ||
| OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { | ||
| pcmTrack, fileWriter = onTrackSubscribed(track, publication) |
There was a problem hiding this comment.
This should:
- Use a mutex, we don't want SDK user to complain about subtle bugs caused by races.
- Check if the track is already assigned. Even for 1:1 there might be more than one track, potentially.
- Ignore video tracks.
I think most of the problems can be fixed by just using an unbuffered channel and select-default to send to it.
There was a problem hiding this comment.
Ignored non-opus tracks, do we need the rest? This is only an example
There was a problem hiding this comment.
Right, but users will likely copy-paste it, so having a mutex/channel at least is a good idea, I think.
|
Hey folks, please re-review this! I couldn't find a device to test stereo, can someone please help me with that? Determining stereo is now added in code, please have a look at that as well. |
audiotrack.go
Outdated
| sourceChannels int | ||
| chunksPerSample int | ||
|
|
||
| // int16 to support a little-endian PCM16 chunk that has a high byte and low byte |
There was a problem hiding this comment.
It's actually doesn't matter if it was LE or BE, since reading it will switch to platform-specific byte order (LE in this case).
There was a problem hiding this comment.
oh yea BE is handled here as well
There was a problem hiding this comment.
though I'm not sure if rest of the stack supports BE (that'd just be the opus encode/decode layer I guess)
There was a problem hiding this comment.
It does, everything passes int16 so it will handle BE during decoding from file/RTP and will pass it around in native platform byte order.
audiotrack.go
Outdated
| t.ticker = time.NewTicker(t.frameDuration) | ||
| // t.mu.Unlock() | ||
|
|
||
| for range t.ticker.C { |
There was a problem hiding this comment.
Could also be:
for {
// write sample
// ...
<-ticker.C
}To avoid code duplication before the loop.
There was a problem hiding this comment.
And the ticker can probably be all internal? No need for struct field?
| } | ||
|
|
||
| func (t *EncodedAudioTrack) getChunksFromBuffer() media.PCM16Sample { | ||
| chunks := make(media.PCM16Sample, t.chunksPerSample) |
There was a problem hiding this comment.
Since we know that deque will append to internal buffer, this slice can be reused between iterations. So I would propose to pass it as argument here and let the caller allocate it once.
There was a problem hiding this comment.
Good observations, let me do that!
|
|
||
| func (t *EncodedAudioTrack) pushChunksToBuffer(sample media.PCM16Sample) { | ||
| for _, chunk := range sample { | ||
| t.chunkBuffer.PushBack(chunk) |
There was a problem hiding this comment.
Too bad it cannot push/pop multiple values at once. This would be pretty slow.
Also I'm a bit concerned about unbounded growth of this structure. Maybe ring buffer would be better here? I think we should have one in SIP media stack.
There was a problem hiding this comment.
Yeah not being able to push/pop in batches is a bummer.
I'm also considering a ring buffer, but it comes down to the rate of writing. And the buffer size could be case dependent as well. Maybe we can take buffer size as a param from the user.
@boks1971 @davidzhao wdyt?
There was a problem hiding this comment.
Could we maybe determine an ideal buffer size using sampling rate?
There was a problem hiding this comment.
Right, I think buffer size should always multiple of the frame size. We can decide that we want to buffer 100ms max, lets say, which is 5x of the frame size. Everything higher than this could drop frames. This shouldn't really happen, since we have a jitter buffer in front, but it might help handle bursts of packets coming from it.
There was a problem hiding this comment.
Jitter buffer and fixed frame size (20ms) is for the subscribing side. This is for publish.
The write API here can take any random frame size, multiple frames can be of different durations as well. We take out just enough from the buffer to build an frame duration that is supported by opus.
There was a problem hiding this comment.
the public API is supposed to be like writing to a file, chunk duration doesn't matter
There was a problem hiding this comment.
Sorry, confused the code path. Sure, than we just set the buffer to be as long as is required by opus. Maybe 2x to be safe.
There was a problem hiding this comment.
Don't think you need to take a size param. If you want, you can make a bounded ring buffer in a subsequent version. If we already have one, that is even better.
You can set min cap (https://pkg.go.dev/github.com/gammazero/deque#Deque.SetBaseCap) so that the size does not go below that. Also, the Deque grows/shrinks automatically. We also have this thing in buffer.go in SFU where it checks fullness and grows too, but that is more involved and not necessary here.
There was a problem hiding this comment.
Will do this in a followup PR, have added a TODO comment
audiotrack.go
Outdated
| // be blocked for a long time. But, since only this goroutine is reading | ||
| // from the deque, we might not need it. Writing still needs locking, | ||
| // since mutliple goroutines could be calling WriteSample that writes to the deque. | ||
| sample := t.getChunksFromBuffer() |
There was a problem hiding this comment.
This might produce weird results without mutex. It seems like it should be fine, since in only increments one integer. But when the writer reallocates internal storage, it may shuffle data around and change read offset as well. This would be really hard to debug.
There was a problem hiding this comment.
In my short testing, it seemed to work, since it's a queue with only one reader.
Also, if we use mutex while reading, it can block a writer and end up writing a silent chunk even though we have the data (but it's waiting to be written behind a lock). Without a lock for reading, we can use the data as soon as it's written.
There was a problem hiding this comment.
I don't think the writer will hold mutex for 20ms. It only takes the lock when there's a frame to write, and if write to deque takes 20ms, we should not use the implementation which is that slow :)
Going without mutex may cause way more issues. It's just not safe to use that data structure concurrently. It may not happen in the short tests, but it will happen eventually.
There was a problem hiding this comment.
It does not take 20ms to write, but, if you see the example code, the write happens very frequently. So, there's a fight for acquiring the lock between the read call and multiple write calls.
I've seen the reader not getting to acquire the lock for over a min in some runs.
There was a problem hiding this comment.
Sadly locks are not acquired in a FIFO order
There was a problem hiding this comment.
Looks like the channel can be unbuffered as well.
func (t *EncodedAudioTrack) WriteSample(data []byte) error {
select {
case t.queue <- slices.Clone(data): // we can get rid of the copy later
case <-t.done:
return ErrClosed
}
}
func (t *EncodedAudioTrack) readLoop() {
// Local var is needed to disable the channel later
queue := t.queue
tick := time.NewTicker(...)
defer tick.Close()
var buf []int16
for {
select {
case <-t.done:
return
// Receive from local var.
// If this var is nil, this case will never fire.
case b := <-queue:
buf = append(buf, b...)
if len(buf) > frameSize {
// Disable the receiving and wait for a tick.
queue = nil
}
case <-tick.C:
t.SendRTP(buf[:frameSize])
// Keep the rest, if there's more data.
n := copy(buf, buf[:frameSize])
buf = buf[:n]
// Re-enable the receving
queue = t.queue
}
}There was a problem hiding this comment.
Yeah, it should be locked. And the lock scope should be small.
There was a problem hiding this comment.
Yeah unbuffered channels are also an option, there might be other complications with them. Let me try it out.
There was a problem hiding this comment.
I'm okay with doing these improvements in a separate PR as well.
audiotrack.go
Outdated
| logger protoLogger.Logger | ||
| } | ||
|
|
||
| // TODO: fix channel messiness, webm writer in the example needs number of channels at the time of init |
There was a problem hiding this comment.
I can look into it afterwards, if you want.
|
|
||
| func (t *EncodedAudioTrack) pushChunksToBuffer(sample media.PCM16Sample) { | ||
| for _, chunk := range sample { | ||
| t.chunkBuffer.PushBack(chunk) |
There was a problem hiding this comment.
Don't think you need to take a size param. If you want, you can make a bounded ring buffer in a subsequent version. If we already have one, that is even better.
You can set min cap (https://pkg.go.dev/github.com/gammazero/deque#Deque.SetBaseCap) so that the size does not go below that. Also, the Deque grows/shrinks automatically. We also have this thing in buffer.go in SFU where it checks fullness and grows too, but that is more involved and not necessary here.
audiotrack.go
Outdated
| t.ticker = time.NewTicker(t.frameDuration) | ||
| // t.mu.Unlock() | ||
|
|
||
| for range t.ticker.C { |
There was a problem hiding this comment.
And the ticker can probably be all internal? No need for struct field?
audiotrack.go
Outdated
| // be blocked for a long time. But, since only this goroutine is reading | ||
| // from the deque, we might not need it. Writing still needs locking, | ||
| // since mutliple goroutines could be calling WriteSample that writes to the deque. | ||
| sample := t.getChunksFromBuffer() |
There was a problem hiding this comment.
Yeah, it should be locked. And the lock scope should be small.
audiotrack.go
Outdated
| } | ||
|
|
||
| outputChannels := targetChannels | ||
| sourceChannels := DetermineOpusChannels(track) |
There was a problem hiding this comment.
are there Opus packets available at this time to determine this?
There was a problem hiding this comment.
Or does this need something like checking the first packet (or maybe few) to determine this in the processing loop?
There was a problem hiding this comment.
Discussed this on slack with @boks1971, there's a condition like the track is muted when this is being read, in that case the SFU is sending silent opus packets but their ToC byte does not have the stereo bit, this can mess up the calculations. We also have no way to identify if a silent frame is sent from the publisher or from the SFU.
There was a problem hiding this comment.
Also, not sure if reading an RTP packet (which can have upto 120ms of audio iirc) is ideal for determining this. This might lose important audio in latency critical environments.
audiotrack.go
Outdated
| } | ||
|
|
||
| func (t *DecodingRemoteAudioTrack) Close() { | ||
| if t.pcmMWriter.String() != t.resampledPCMWriter.String() { |
There was a problem hiding this comment.
Are you sure this is needed?
There was a problem hiding this comment.
Tried to trace the code, opusWriter closes the one it is initialized with. But, I couldn't see the same with resampler, so that might need explicit closing.
audiotrack.go
Outdated
| // be blocked for a long time. But, since only this goroutine is reading | ||
| // from the deque, we might not need it. Writing still needs locking, | ||
| // since mutliple goroutines could be calling WriteSample that writes to the deque. | ||
| sample := t.getChunksFromBuffer() |
There was a problem hiding this comment.
I'm okay with doing these improvements in a separate PR as well.
| } | ||
| } | ||
|
|
||
| type PCMRemoteTrackParams struct { |
There was a problem hiding this comment.
:nit: not needed now, my preference is to make a separate file for each struct. This can be split into pcmlocaltrack.go and pcmremotetrack.go
7d090dc to
8fa95da
Compare
No description provided.