-
Notifications
You must be signed in to change notification settings - Fork 145
feat(audio): encoded and decoded opus audio tracks #633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
0b06cef
5ed2058
1351899
ff06ae3
25bdab6
6025f5e
01507d4
8d76ddc
192b3bd
74decf6
ed6e25b
09e3d43
7c76a55
acb181b
52eb00f
89b1b6e
bd9fc6b
16c1835
9532217
8fa95da
536c9b7
192576f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,282 @@ | ||
| package lksdk | ||
|
|
||
| import ( | ||
| "errors" | ||
| "io" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/gammazero/deque" | ||
| media "github.com/livekit/media-sdk" | ||
| opus "github.com/livekit/media-sdk/opus" | ||
| rtp "github.com/livekit/media-sdk/rtp" | ||
| protoLogger "github.com/livekit/protocol/logger" | ||
| "github.com/pion/webrtc/v4" | ||
| "go.uber.org/atomic" | ||
| ) | ||
|
|
||
| const ( | ||
| DefaultOpusSampleRate = 48000 | ||
| DefaultOpusSampleDuration = 20 * time.Millisecond | ||
|
|
||
| // using the smallest opus frame duration to minimize | ||
| // the silent filler chunks | ||
| defaultPCMSampleDuration = 2500 * time.Microsecond | ||
| ) | ||
|
|
||
| type pcmChunk struct { | ||
| sample media.PCM16Sample | ||
| frameDuration time.Duration | ||
| } | ||
|
|
||
| type EncodingLocalAudioTrack struct { | ||
| *webrtc.TrackLocalStaticSample | ||
|
|
||
| opusWriter media.WriteCloser[opus.Sample] | ||
| pcmWriter media.WriteCloser[media.PCM16Sample] | ||
| resampledPCMWriter media.WriteCloser[media.PCM16Sample] | ||
|
|
||
| sourceSampleRate int | ||
| frameDuration time.Duration | ||
| sourceChannels int | ||
| chunksPerSample int | ||
|
|
||
| // int16 to support a little-endian PCM16 chunk that has a high byte and low byte | ||
|
||
| chunkBuffer *deque.Deque[int16] | ||
| ticker *time.Ticker | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| mu sync.Mutex | ||
| closed atomic.Bool | ||
| } | ||
|
|
||
| // TODO: test stereo with resampler | ||
| func NewEncodingLocalAudioTrack(sourceSampleRate int, sourceChannels int, logger protoLogger.Logger) (*EncodingLocalAudioTrack, error) { | ||
| if sourceChannels <= 0 || sourceChannels > 2 || sourceSampleRate <= 0 { | ||
| return nil, errors.New("invalid source sample rate or channels") | ||
| } | ||
|
|
||
| track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "test", "test") | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // opusWriter writes opus samples to the track | ||
anunaym14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| opusWriter := media.FromSampleWriter[opus.Sample](track, DefaultOpusSampleRate, defaultPCMSampleDuration) | ||
| // pcmWriter encodes opus samples from PCM16 samples and writes them to opusWriter | ||
| pcmWriter, err := opus.Encode(opusWriter, sourceChannels, logger) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // resampled writer resamples the PCM16 samples from sourceSampleRate to DefaultOpusSampleRate | ||
| // and writes them to pcmWriter. If no resampling is needed, we directly point resampledPCMWriter to pcmWriter. | ||
| resampledPCMWriter := pcmWriter | ||
| if sourceSampleRate != DefaultOpusSampleRate { | ||
| resampledPCMWriter = media.ResampleWriter(pcmWriter, sourceSampleRate) | ||
| } | ||
|
|
||
| t := &EncodingLocalAudioTrack{ | ||
| TrackLocalStaticSample: track, | ||
| opusWriter: opusWriter, | ||
| pcmWriter: pcmWriter, | ||
| resampledPCMWriter: resampledPCMWriter, | ||
| sourceSampleRate: sourceSampleRate, | ||
| frameDuration: defaultPCMSampleDuration, | ||
| sourceChannels: sourceChannels, | ||
| chunkBuffer: new(deque.Deque[int16]), | ||
| chunksPerSample: (sourceSampleRate * sourceChannels * int(defaultPCMSampleDuration/time.Nanosecond)) / 1e9, | ||
| } | ||
|
|
||
| go t.processSamples() | ||
| return t, nil | ||
| } | ||
|
|
||
| func (t *EncodingLocalAudioTrack) pushChunksToBuffer(sample media.PCM16Sample) { | ||
| for _, chunk := range sample { | ||
| t.chunkBuffer.PushBack(chunk) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Too bad it cannot push/pop multiple values at once. This would be pretty slow. Also I'm a bit concerned about unbounded growth of this structure. Maybe ring buffer would be better here? I think we should have one in SIP media stack.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah not being able to push/pop in batches is a bummer. I'm also considering a ring buffer, but it comes down to the rate of writing. And the buffer size could be case dependent as well. Maybe we can take buffer size as a param from the user. @boks1971 @davidzhao wdyt?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we maybe determine an ideal buffer size using sampling rate?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I think buffer size should always multiple of the frame size. We can decide that we want to buffer 100ms max, lets say, which is 5x of the frame size. Everything higher than this could drop frames. This shouldn't really happen, since we have a jitter buffer in front, but it might help handle bursts of packets coming from it.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Jitter buffer and fixed frame size (20ms) is for the subscribing side. This is for publish. The write API here can take any random frame size, multiple frames can be of different durations as well. We take out just enough from the buffer to build an frame duration that is supported by opus.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the public API is supposed to be like writing to a file, chunk duration doesn't matter
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, confused the code path. Sure, than we just set the buffer to be as long as is required by opus. Maybe 2x to be safe.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't think you need to take a size param. If you want, you can make a bounded ring buffer in a subsequent version. If we already have one, that is even better. You can set min cap (https://pkg.go.dev/github.com/gammazero/deque#Deque.SetBaseCap) so that the size does not go below that. Also, the Deque grows/shrinks automatically. We also have this thing in buffer.go in SFU where it checks fullness and grows too, but that is more involved and not necessary here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do this in a followup PR, have added a TODO comment |
||
| } | ||
| } | ||
|
|
||
| func (t *EncodingLocalAudioTrack) getChunksFromBuffer() media.PCM16Sample { | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| chunks := make(media.PCM16Sample, t.chunksPerSample) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we know that deque will append to internal buffer, this slice can be reused between iterations. So I would propose to pass it as argument here and let the caller allocate it once.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good observations, let me do that! |
||
| for i := 0; i < t.chunksPerSample; i++ { | ||
| if t.chunkBuffer.Len() == 0 { | ||
| // this will zero-init at index i | ||
| // which will be a silent chunk | ||
| continue | ||
| } else { | ||
| chunks[i] = t.chunkBuffer.PopFront() | ||
| } | ||
| } | ||
|
|
||
| return chunks | ||
| } | ||
|
|
||
| func (t *EncodingLocalAudioTrack) WriteSample(sample media.PCM16Sample) error { | ||
| if t.closed.Load() { | ||
| return errors.New("track is closed") | ||
| } | ||
|
|
||
| t.mu.Lock() | ||
| t.pushChunksToBuffer(sample) | ||
| t.mu.Unlock() | ||
| return nil | ||
| } | ||
|
|
||
| func (t *EncodingLocalAudioTrack) processSamples() { | ||
| // write first sample before starting the ticker | ||
| // t.mu.Lock() | ||
| sample := t.getChunksFromBuffer() | ||
| t.resampledPCMWriter.WriteSample(sample) | ||
| t.ticker = time.NewTicker(t.frameDuration) | ||
| // t.mu.Unlock() | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| for range t.ticker.C { | ||
|
||
| if t.closed.Load() { | ||
| break | ||
| } | ||
|
|
||
| // TODO: do we need locking while reading from the deque? | ||
| // the continuous writes from the example | ||
| // writes very frequently to the deque, which acquires a lock | ||
| // and then reading from the deque acquires another lock | ||
| // and it does not guarentee FIFO order, so the read might | ||
| // be blocked for a long time. But, since only this goroutine is reading | ||
| // from the deque, we might not need it. Writing still needs locking, | ||
| // since mutliple goroutines could be calling WriteSample that writes to the deque. | ||
| sample := t.getChunksFromBuffer() | ||
|
||
| t.resampledPCMWriter.WriteSample(sample) | ||
| } | ||
|
|
||
| t.chunkBuffer.Clear() | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| t.ticker.Stop() | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| func (t *EncodingLocalAudioTrack) Close() { | ||
| firstClose := t.closed.CompareAndSwap(false, true) | ||
| // avoid closing the writer multiple times | ||
| if firstClose { | ||
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
anunaym14 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| t.resampledPCMWriter.Close() | ||
| t.pcmWriter.Close() | ||
| t.opusWriter.Close() | ||
| } | ||
|
|
||
| } | ||
|
|
||
| type DecodingRemoteAudioTrack struct { | ||
| *webrtc.TrackRemote | ||
| channels int | ||
| sampleRate int | ||
| once sync.Once | ||
|
|
||
| opusWriter media.WriteCloser[opus.Sample] | ||
| pcmMWriter media.WriteCloser[media.PCM16Sample] | ||
| resampledPCMWriter media.WriteCloser[media.PCM16Sample] | ||
| logger protoLogger.Logger | ||
| } | ||
|
|
||
| // TODO: fix channel messiness, webm writer in the example needs number of channels at the time of init | ||
|
||
| // and NewDecodedAudioTrack is called afterwards. But, we also need to check for channels in the init function | ||
| // to make sure user does not pass stereo as target channels for a mono track. Any suggestions on how to handle this? | ||
| // TODO: test stereo with resampler | ||
| func NewDecodingRemoteAudioTrack(track *webrtc.TrackRemote, writer *media.WriteCloser[media.PCM16Sample], targetSampleRate int, targetChannels int, handleJitter bool) (*DecodingRemoteAudioTrack, error) { | ||
| if track.Codec().MimeType != webrtc.MimeTypeOpus { | ||
| return nil, errors.New("track is not opus") | ||
| } | ||
|
|
||
| if targetChannels <= 0 || targetChannels > 2 || targetSampleRate <= 0 { | ||
| return nil, errors.New("invalid target channels or sample rate") | ||
| } | ||
|
|
||
| outputChannels := targetChannels | ||
| sourceChannels := DetermineOpusChannels(track) | ||
|
||
| if targetChannels > sourceChannels { | ||
| outputChannels = sourceChannels | ||
| } | ||
|
|
||
| // resampledPCMWriter resamples the PCM16 samples from DefaultOpusSampleRate to targetSampleRate and | ||
| // writes them to the writer. If no resampling is needed, we directly point resampledPCMWriter to writer. | ||
| resampledPCMWriter := *writer | ||
| if targetSampleRate != DefaultOpusSampleRate { | ||
| resampledPCMWriter = media.ResampleWriter(*writer, targetSampleRate) | ||
| } | ||
|
|
||
| // opus writer takes opus samples, decodes them to PCM16 samples | ||
| // and writes them to the pcmMWriter | ||
| opusWriter, err := opus.Decode(resampledPCMWriter, outputChannels, protoLogger.GetLogger()) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| t := &DecodingRemoteAudioTrack{ | ||
| TrackRemote: track, | ||
| opusWriter: opusWriter, | ||
| pcmMWriter: *writer, | ||
| resampledPCMWriter: resampledPCMWriter, | ||
| sampleRate: targetSampleRate, | ||
| channels: outputChannels, | ||
| logger: protoLogger.GetLogger(), | ||
| } | ||
|
|
||
| go t.process(handleJitter) | ||
| return t, nil | ||
| } | ||
|
|
||
| func (t *DecodingRemoteAudioTrack) process(handleJitter bool) { | ||
| // Handler takes RTP packets and writes the payload to opusWriter | ||
| var h rtp.Handler = rtp.NewMediaStreamIn[opus.Sample](t.opusWriter) | ||
| if handleJitter { | ||
| h = rtp.HandleJitter(int(t.TrackRemote.Codec().ClockRate), h) | ||
| } | ||
|
|
||
| // HandleLoop takes RTP packets from the track and writes them to the handler | ||
| // TODO: handle concealment | ||
| err := rtp.HandleLoop(t.TrackRemote, h) | ||
| if err != nil && !errors.Is(err, io.EOF) { | ||
| t.logger.Errorw("error handling rtp from track", err) | ||
| } | ||
| } | ||
|
|
||
| func (t *DecodingRemoteAudioTrack) Channels() int { | ||
| return t.channels | ||
| } | ||
|
|
||
| func (t *DecodingRemoteAudioTrack) SampleRate() int { | ||
| return t.sampleRate | ||
| } | ||
|
|
||
| func (t *DecodingRemoteAudioTrack) Close() { | ||
| // opus writer closes resampledPCMWriter internally | ||
anunaym14 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| t.opusWriter.Close() | ||
| } | ||
|
|
||
| // ------------------------------------------------------------------ | ||
|
|
||
| func isOpusPacketStereo(payload []byte) bool { | ||
| // the table-of-contents (TOC) header byte is the first byte of the payload | ||
| // it is composed of a configuration number, "config", a stereo flag, "s", and a frame count code, "c" | ||
| // https://datatracker.ietf.org/doc/html/rfc6716#section-3.1 | ||
| tocByte := payload[0] | ||
| // TOC byte format: | ||
| // 0 | ||
| // 0 1 2 3 4 5 6 7 | ||
| // +-+-+-+-+-+-+-+-+ | ||
| // | config |s| c | | ||
| // +-+-+-+-+-+-+-+-+ | ||
| // the 's' bit is stereo bit | ||
| return tocByte&0x04 != 0 | ||
| } | ||
|
|
||
| func DetermineOpusChannels(track *webrtc.TrackRemote) int { | ||
| rtpPacket, _, err := track.ReadRTP() | ||
| if err != nil { | ||
| protoLogger.GetLogger().Errorw("error reading rtp from track", err) | ||
| return 1 | ||
| } | ||
|
|
||
| stereo := isOpusPacketStereo(rtpPacket.Payload) | ||
| if stereo { | ||
| return 2 | ||
| } | ||
| return 1 | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.