Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions audiotrack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
package lksdk

import (
"errors"
"io"
"sync"
"time"

"github.com/gammazero/deque"
"github.com/google/uuid"
media "github.com/livekit/media-sdk"
opus "github.com/livekit/media-sdk/opus"
rtp "github.com/livekit/media-sdk/rtp"
protoLogger "github.com/livekit/protocol/logger"
"github.com/pion/webrtc/v4"
"go.uber.org/atomic"
)

const (
DefaultOpusSampleRate = 48000
DefaultOpusSampleDuration = 20 * time.Millisecond

// using the smallest opus frame duration to minimize
// the silent filler chunks
defaultPCMSampleDuration = 2500 * time.Microsecond
)

type pcmChunk struct {
sample media.PCM16Sample
frameDuration time.Duration
}

type EncodingLocalAudioTrack struct {
*webrtc.TrackLocalStaticSample

opusWriter media.WriteCloser[opus.Sample]
pcmWriter media.WriteCloser[media.PCM16Sample]
resampledPCMWriter media.WriteCloser[media.PCM16Sample]

sourceSampleRate int
frameDuration time.Duration
sourceChannels int
chunksPerSample int

// int16 to support a LE/BE PCM16 chunk that has a high byte and low byte
chunkBuffer *deque.Deque[int16]

mu sync.Mutex
closed atomic.Bool
}

// TODO: test stereo with resampler
func NewEncodingLocalAudioTrack(sourceSampleRate int, sourceChannels int, logger protoLogger.Logger) (*EncodingLocalAudioTrack, error) {
if sourceChannels <= 0 || sourceChannels > 2 || sourceSampleRate <= 0 {
return nil, errors.New("invalid source sample rate or channels")
}

id := uuid.New().String()[:5]
track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "go_track"+id, "go_stream"+id)
if err != nil {
return nil, err
}

// opusWriter writes opus samples to the track
opusWriter := media.FromSampleWriter[opus.Sample](track, DefaultOpusSampleRate, defaultPCMSampleDuration)
// pcmWriter encodes opus samples from PCM16 samples and writes them to opusWriter
pcmWriter, err := opus.Encode(opusWriter, sourceChannels, logger)
if err != nil {
return nil, err
}

// resampled writer resamples the PCM16 samples from sourceSampleRate to DefaultOpusSampleRate
// and writes them to pcmWriter. If no resampling is needed, we directly point resampledPCMWriter to pcmWriter.
resampledPCMWriter := pcmWriter
if sourceSampleRate != DefaultOpusSampleRate {
resampledPCMWriter = media.ResampleWriter(pcmWriter, sourceSampleRate)
}

t := &EncodingLocalAudioTrack{
TrackLocalStaticSample: track,
opusWriter: opusWriter,
pcmWriter: pcmWriter,
resampledPCMWriter: resampledPCMWriter,
sourceSampleRate: sourceSampleRate,
frameDuration: defaultPCMSampleDuration,
sourceChannels: sourceChannels,
chunkBuffer: new(deque.Deque[int16]),
chunksPerSample: (sourceSampleRate * sourceChannels * int(defaultPCMSampleDuration/time.Nanosecond)) / 1e9,
}

go t.processSamples()
return t, nil
}

func (t *EncodingLocalAudioTrack) pushChunksToBuffer(sample media.PCM16Sample) {
for _, chunk := range sample {
t.chunkBuffer.PushBack(chunk)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad it cannot push/pop multiple values at once. This would be pretty slow.

Also I'm a bit concerned about unbounded growth of this structure. Maybe ring buffer would be better here? I think we should have one in SIP media stack.

Copy link
Copy Markdown
Member Author

@anunaym14 anunaym14 Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not being able to push/pop in batches is a bummer.

I'm also considering a ring buffer, but it comes down to the rate of writing. And the buffer size could be case dependent as well. Maybe we can take buffer size as a param from the user.

@boks1971 @davidzhao wdyt?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe determine an ideal buffer size using sampling rate?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think buffer size should always multiple of the frame size. We can decide that we want to buffer 100ms max, lets say, which is 5x of the frame size. Everything higher than this could drop frames. This shouldn't really happen, since we have a jitter buffer in front, but it might help handle bursts of packets coming from it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jitter buffer and fixed frame size (20ms) is for the subscribing side. This is for publish.

The write API here can take any random frame size, multiple frames can be of different durations as well. We take out just enough from the buffer to build an frame duration that is supported by opus.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the public API is supposed to be like writing to a file, chunk duration doesn't matter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, confused the code path. Sure, than we just set the buffer to be as long as is required by opus. Maybe 2x to be safe.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think you need to take a size param. If you want, you can make a bounded ring buffer in a subsequent version. If we already have one, that is even better.

You can set min cap (https://pkg.go.dev/github.com/gammazero/deque#Deque.SetBaseCap) so that the size does not go below that. Also, the Deque grows/shrinks automatically. We also have this thing in buffer.go in SFU where it checks fullness and grows too, but that is more involved and not necessary here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in a followup PR, have added a TODO comment

}
}

func (t *EncodingLocalAudioTrack) getChunksFromBuffer() media.PCM16Sample {
chunks := make(media.PCM16Sample, t.chunksPerSample)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we know that deque will append to internal buffer, this slice can be reused between iterations. So I would propose to pass it as argument here and let the caller allocate it once.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observations, let me do that!

for i := 0; i < t.chunksPerSample; i++ {
if t.chunkBuffer.Len() == 0 {
// this will zero-init at index i
// which will be a silent chunk
continue
} else {
chunks[i] = t.chunkBuffer.PopFront()
}
}

return chunks
}

func (t *EncodingLocalAudioTrack) WriteSample(sample media.PCM16Sample) error {
if t.closed.Load() {
return errors.New("track is closed")
}

t.mu.Lock()
t.pushChunksToBuffer(sample)
t.mu.Unlock()
return nil
}

func (t *EncodingLocalAudioTrack) processSamples() {
ticker := time.NewTicker(t.frameDuration)
defer ticker.Stop()

for {
if t.closed.Load() {
break
}

t.mu.Lock()
sample := t.getChunksFromBuffer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might produce weird results without mutex. It seems like it should be fine, since in only increments one integer. But when the writer reallocates internal storage, it may shuffle data around and change read offset as well. This would be really hard to debug.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my short testing, it seemed to work, since it's a queue with only one reader.

Also, if we use mutex while reading, it can block a writer and end up writing a silent chunk even though we have the data (but it's waiting to be written behind a lock). Without a lock for reading, we can use the data as soon as it's written.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the writer will hold mutex for 20ms. It only takes the lock when there's a frame to write, and if write to deque takes 20ms, we should not use the implementation which is that slow :)

Going without mutex may cause way more issues. It's just not safe to use that data structure concurrently. It may not happen in the short tests, but it will happen eventually.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not take 20ms to write, but, if you see the example code, the write happens very frequently. So, there's a fight for acquiring the lock between the read call and multiple write calls.

I've seen the reader not getting to acquire the lock for over a min in some runs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly locks are not acquired in a FIFO order

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the channel can be unbuffered as well.

func (t *EncodedAudioTrack) WriteSample(data []byte) error {
   select {
   case t.queue <- slices.Clone(data): // we can get rid of the copy later
   case <-t.done:
      return ErrClosed
   }
}

func (t *EncodedAudioTrack) readLoop() {
   // Local var is needed to disable the channel later
   queue := t.queue 

   tick := time.NewTicker(...)
   defer tick.Close()

   var buf []int16
   for {
      select {
      case <-t.done:
         return
      // Receive from local var.
      // If this var is nil, this case will never fire.
      case b := <-queue:
         buf = append(buf, b...)
         if len(buf) > frameSize {
            // Disable the receiving and wait for a tick.
           queue = nil
         }
      case <-tick.C:
         t.SendRTP(buf[:frameSize])
         // Keep the rest, if there's more data.
         n := copy(buf, buf[:frameSize])
         buf = buf[:n]
         // Re-enable the receving
         queue = t.queue
      }
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should be locked. And the lock scope should be small.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unbuffered channels are also an option, there might be other complications with them. Let me try it out.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with doing these improvements in a separate PR as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

t.resampledPCMWriter.WriteSample(sample)
t.mu.Unlock()
<-ticker.C
}
}

func (t *EncodingLocalAudioTrack) Close() {
firstClose := t.closed.CompareAndSwap(false, true)
// avoid closing the writer multiple times
if firstClose {
t.mu.Lock()
defer t.mu.Unlock()
t.chunkBuffer.Clear()
t.resampledPCMWriter.Close()
t.pcmWriter.Close()
t.opusWriter.Close()
}
}

type DecodingRemoteTrackParams struct {
HandleJitter bool
}

type DecodingRemoteTrackOption func(*DecodingRemoteTrackParams)

func WithHandleJitter(handleJitter bool) DecodingRemoteTrackOption {
return func(p *DecodingRemoteTrackParams) {
p.HandleJitter = handleJitter
}
}

type DecodingRemoteAudioTrack struct {
*webrtc.TrackRemote
channels int
sampleRate int
once sync.Once

opusWriter media.WriteCloser[opus.Sample]
pcmMWriter media.WriteCloser[media.PCM16Sample]
resampledPCMWriter media.WriteCloser[media.PCM16Sample]
logger protoLogger.Logger
}

// TODO: fix channel messiness, webm writer in the example needs number of channels at the time of init
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into it afterwards, if you want.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🙌

// and NewDecodedAudioTrack is called afterwards. But, we also need to check for channels in the init function
// to make sure user does not pass stereo as target channels for a mono track. Any suggestions on how to handle this?
// TODO: test stereo with resampler
func NewDecodingRemoteAudioTrack(track *webrtc.TrackRemote, writer *media.WriteCloser[media.PCM16Sample], targetSampleRate int, targetChannels int, opts ...DecodingRemoteTrackOption) (*DecodingRemoteAudioTrack, error) {
if track.Codec().MimeType != webrtc.MimeTypeOpus {
return nil, errors.New("track is not opus")
}

if targetChannels <= 0 || targetChannels > 2 || targetSampleRate <= 0 {
return nil, errors.New("invalid target channels or sample rate")
}

options := &DecodingRemoteTrackParams{
HandleJitter: true,
}
for _, opt := range opts {
opt(options)
}

outputChannels := targetChannels
sourceChannels := DetermineOpusChannels(track)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there Opus packets available at this time to determine this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or does this need something like checking the first packet (or maybe few) to determine this in the processing loop?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this on slack with @boks1971, there's a condition like the track is muted when this is being read, in that case the SFU is sending silent opus packets but their ToC byte does not have the stereo bit, this can mess up the calculations. We also have no way to identify if a silent frame is sent from the publisher or from the SFU.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not sure if reading an RTP packet (which can have upto 120ms of audio iirc) is ideal for determining this. This might lose important audio in latency critical environments.

if targetChannels > sourceChannels {
outputChannels = sourceChannels
}

// resampledPCMWriter resamples the PCM16 samples from DefaultOpusSampleRate to targetSampleRate and
// writes them to the writer. If no resampling is needed, we directly point resampledPCMWriter to writer.
resampledPCMWriter := *writer
if targetSampleRate != DefaultOpusSampleRate {
resampledPCMWriter = media.ResampleWriter(*writer, targetSampleRate)
}

// opus writer takes opus samples, decodes them to PCM16 samples
// and writes them to the pcmMWriter
opusWriter, err := opus.Decode(resampledPCMWriter, outputChannels, protoLogger.GetLogger())
if err != nil {
return nil, err
}

t := &DecodingRemoteAudioTrack{
TrackRemote: track,
opusWriter: opusWriter,
pcmMWriter: *writer,
resampledPCMWriter: resampledPCMWriter,
sampleRate: targetSampleRate,
channels: outputChannels,
logger: protoLogger.GetLogger(),
}

go t.process(options.HandleJitter)
return t, nil
}

func (t *DecodingRemoteAudioTrack) process(handleJitter bool) {
// Handler takes RTP packets and writes the payload to opusWriter
var h rtp.Handler = rtp.NewMediaStreamIn[opus.Sample](t.opusWriter)
if handleJitter {
h = rtp.HandleJitter(int(t.TrackRemote.Codec().ClockRate), h)
}

// HandleLoop takes RTP packets from the track and writes them to the handler
// TODO: handle concealment
err := rtp.HandleLoop(t.TrackRemote, h)
if err != nil && !errors.Is(err, io.EOF) {
t.logger.Errorw("error handling rtp from track", err)
}
}

func (t *DecodingRemoteAudioTrack) Channels() int {
return t.channels
}

func (t *DecodingRemoteAudioTrack) SampleRate() int {
return t.sampleRate
}

func (t *DecodingRemoteAudioTrack) Close() {
if t.pcmMWriter.String() != t.resampledPCMWriter.String() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to trace the code, opusWriter closes the one it is initialized with. But, I couldn't see the same with resampler, so that might need explicit closing.

t.pcmMWriter.Close()
}
// opus writer closes resampledPCMWriter internally
t.opusWriter.Close()

}

// ------------------------------------------------------------------

func isOpusPacketStereo(payload []byte) bool {
// the table-of-contents (TOC) header byte is the first byte of the payload
// it is composed of a configuration number, "config", a stereo flag, "s", and a frame count code, "c"
// https://datatracker.ietf.org/doc/html/rfc6716#section-3.1
tocByte := payload[0]
// TOC byte format:
// 0
// 0 1 2 3 4 5 6 7
// +-+-+-+-+-+-+-+-+
// | config |s| c |
// +-+-+-+-+-+-+-+-+
// the 's' bit is stereo bit
return tocByte&0x04 != 0
}

func DetermineOpusChannels(track *webrtc.TrackRemote) int {
rtpPacket, _, err := track.ReadRTP()
if err != nil {
protoLogger.GetLogger().Errorw("error reading rtp from track", err)
return 1
}

stereo := isOpusPacketStereo(rtpPacket.Payload)
if stereo {
return 2
}
return 1
}
Loading
Loading