Skip to content

Commit 86eff05

Browse files
committed
Refactor codec interface, allow using codecs without RTP.
1 parent c70932f commit 86eff05

File tree

10 files changed

+247
-128
lines changed

10 files changed

+247
-128
lines changed

audio.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package media
2+
3+
// AudioCodec is an audio codec implementation that can encode/decode bytes<->PCM.
4+
//
5+
// Each audio codec also implements AudioFrameCodec that can encode/decode bytes to a native Frame format of the codec.
6+
type AudioCodec interface {
7+
Codec
8+
// EncodeBytes creates a PCM->bytes encoder using the codec.
9+
EncodeBytes(w BytesWriter) PCM16Writer
10+
// DecodeBytes creates a bytes->PCM decoder using the codec.
11+
DecodeBytes(w PCM16Writer) BytesWriter
12+
}
13+
14+
// AudioFrameCodec is an audio codec implementation that can encode/decode bytes to/from a native Frame format of the codec.
15+
type AudioFrameCodec[S BytesFrame] interface {
16+
AudioCodec
17+
// Encode creates a Frame->bytes encoder using the codec.
18+
// The frame is in a native format of the codec.
19+
Encode(w WriteCloser[S]) PCM16Writer
20+
// Decode creates a bytes->Frame decoder using the codec.
21+
// The frame is in a native format of the codec.
22+
Decode(w PCM16Writer) WriteCloser[S]
23+
}
24+
25+
type AudioDecodeFunc[S Frame] func(w PCM16Writer) WriteCloser[S]
26+
type AudioEncodeFunc[S Frame] func(w WriteCloser[S]) PCM16Writer
27+
28+
// NewAudioCodec creates an audio codec with a given encode and decode implementations.
29+
func NewAudioCodec[S BytesFrame](
30+
info CodecInfo,
31+
decode AudioDecodeFunc[S],
32+
encode AudioEncodeFunc[S],
33+
) AudioFrameCodec[S] {
34+
if info.SampleRate <= 0 {
35+
panic("invalid sample rate")
36+
}
37+
if info.RTPClockRate == 0 {
38+
info.RTPClockRate = info.SampleRate
39+
}
40+
return &audioCodec[S]{
41+
info: info,
42+
encode: encode,
43+
decode: decode,
44+
}
45+
}
46+
47+
type audioCodec[S BytesFrame] struct {
48+
info CodecInfo
49+
decode AudioDecodeFunc[S]
50+
encode AudioEncodeFunc[S]
51+
}
52+
53+
func (c *audioCodec[S]) Info() CodecInfo {
54+
return c.info
55+
}
56+
57+
func (c *audioCodec[S]) Encode(w WriteCloser[S]) PCM16Writer {
58+
return c.encode(w)
59+
}
60+
61+
func (c *audioCodec[S]) Decode(w PCM16Writer) WriteCloser[S] {
62+
return c.decode(w)
63+
}
64+
65+
func (c *audioCodec[S]) EncodeBytes(w BytesWriter) PCM16Writer {
66+
bw := EncodeBytes[S](w, c.info.SampleRate)
67+
return c.encode(bw)
68+
}
69+
70+
func (c *audioCodec[S]) DecodeBytes(w PCM16Writer) BytesWriter {
71+
pw := c.decode(w)
72+
return DecodeBytes[S](pw)
73+
}

bytes.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package media
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"io"
7+
)
8+
9+
type BytesFrame interface {
10+
~[]byte
11+
Frame
12+
}
13+
14+
// BytesWriter is similar to io.WriteCloser, but intentionally breaks API compatibility.
15+
//
16+
// This is done to emphasize that BytesWriter implementations are aware of the frame boundaries,
17+
// and will only use buffer sizes that match the frame size.
18+
type BytesWriter interface {
19+
String() string
20+
WriteRaw(frame []byte) error
21+
Close() error
22+
}
23+
24+
// NewBytesWriter creates a BytesWriter that writes to a standard io.WriteCloser.
25+
//
26+
// This process will erase the frame boundaries. Implement BytesWriter directly to preserve frame boundaries.
27+
func NewBytesWriter(w io.WriteCloser) BytesWriter {
28+
return &fileWriter{
29+
w: w,
30+
bw: bufio.NewWriter(w),
31+
}
32+
}
33+
34+
type fileWriter struct {
35+
w io.WriteCloser
36+
bw *bufio.Writer
37+
}
38+
39+
func (w *fileWriter) String() string {
40+
return "FileWriter"
41+
}
42+
43+
func (w *fileWriter) WriteRaw(data []byte) error {
44+
_, err := w.bw.Write(data)
45+
return err
46+
}
47+
48+
func (w *fileWriter) Close() error {
49+
if err := w.bw.Flush(); err != nil {
50+
_ = w.w.Close()
51+
return err
52+
}
53+
return w.w.Close()
54+
}
55+
56+
// NewFileWriter creates a new frame writer that encodes frame to a binary stream.
57+
//
58+
// This process will erase the frame boundaries. Use EncodeBytes to preserve frame boundaries.
59+
func NewFileWriter[T Frame](w io.WriteCloser, sampleRate int) WriteCloser[T] {
60+
bw := NewBytesWriter(w)
61+
return EncodeBytes[T](bw, sampleRate)
62+
}
63+
64+
// EncodeBytes creates a writer that converts every frame write to a single binary Write call on a standard io.WriteCloser.
65+
//
66+
// If preserving frame boundaries is not required, using NewFileWriter would be more efficient.
67+
func EncodeBytes[S Frame](w BytesWriter, sampleRate int) WriteCloser[S] {
68+
return &byteEncoder[S]{w: w, sampleRate: sampleRate}
69+
}
70+
71+
type byteEncoder[S Frame] struct {
72+
w BytesWriter
73+
sampleRate int
74+
buf []byte
75+
}
76+
77+
func (w *byteEncoder[S]) String() string {
78+
return fmt.Sprintf("ByteEncoder(%d) -> %s", w.sampleRate, w.w.String())
79+
}
80+
81+
func (w *byteEncoder[S]) SampleRate() int {
82+
return w.sampleRate
83+
}
84+
85+
func (w *byteEncoder[S]) WriteSample(sample S) error {
86+
if sz := sample.Size(); cap(w.buf) < sz {
87+
w.buf = make([]byte, sz)
88+
} else {
89+
w.buf = w.buf[:sz]
90+
}
91+
n, err := sample.CopyTo(w.buf)
92+
if err != nil {
93+
return err
94+
}
95+
return w.w.WriteRaw(w.buf[:n])
96+
}
97+
98+
func (w *byteEncoder[T]) Close() error {
99+
w.buf = nil
100+
return w.w.Close()
101+
}
102+
103+
// DecodeBytes creates a writer that converts every binary write from a standard io.WriteCloser to a frame write.
104+
//
105+
// Note that directly reading from a file is not possible in this case, as the frame boundaries are unknown.
106+
func DecodeBytes[S BytesFrame](w WriteCloser[S]) BytesWriter {
107+
return &byteDecoder[S]{w: w}
108+
}
109+
110+
type byteDecoder[S BytesFrame] struct {
111+
w WriteCloser[S]
112+
}
113+
114+
func (w *byteDecoder[S]) String() string {
115+
return fmt.Sprintf("ByteDecoder -> %s", w.w.String())
116+
}
117+
118+
func (w *byteDecoder[S]) SampleRate() int {
119+
return w.w.SampleRate()
120+
}
121+
122+
func (w *byteDecoder[S]) WriteRaw(sample []byte) error {
123+
return w.w.WriteSample(S(sample))
124+
}
125+
126+
func (w *byteDecoder[T]) Close() error {
127+
return w.w.Close()
128+
}

codecs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func EnabledCodecs() []Codec {
9191
return out
9292
}
9393

94+
// RegisterCodec registers the codec.
9495
func RegisterCodec(c Codec) {
9596
codecs = append(codecs, c)
9697
if info := c.Info(); info.Disabled {
@@ -101,6 +102,7 @@ func RegisterCodec(c Codec) {
101102
}
102103
}
103104

105+
// NewCodec creates a generic codec definition without a specific implementation.
104106
func NewCodec(info CodecInfo) Codec {
105107
if info.SampleRate <= 0 {
106108
panic("invalid sample rate")

g711/alaw.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ import (
2121
prtp "github.com/pion/rtp"
2222

2323
"github.com/livekit/media-sdk"
24-
"github.com/livekit/media-sdk/rtp"
2524
)
2625

2726
const ALawSDPName = "PCMA/8000"
2827

2928
func init() {
30-
media.RegisterCodec(rtp.NewAudioCodec(media.CodecInfo{
29+
media.RegisterCodec(media.NewAudioCodec(media.CodecInfo{
3130
SDPName: ALawSDPName,
3231
SampleRate: 8000,
3332
RTPDefType: prtp.PayloadTypePCMA,

g711/ulaw.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@ import (
2121
prtp "github.com/pion/rtp"
2222

2323
"github.com/livekit/media-sdk"
24-
"github.com/livekit/media-sdk/rtp"
2524
)
2625

2726
const ULawSDPName = "PCMU/8000"
2827

2928
func init() {
30-
media.RegisterCodec(rtp.NewAudioCodec(media.CodecInfo{
29+
media.RegisterCodec(media.NewAudioCodec(media.CodecInfo{
3130
SDPName: ULawSDPName,
3231
SampleRate: 8000,
3332
RTPDefType: prtp.PayloadTypePCMU,

g722/g722.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
prtp "github.com/pion/rtp"
2525

2626
"github.com/livekit/media-sdk"
27-
"github.com/livekit/media-sdk/rtp"
2827
)
2928

3029
const SDPName = "G722/8000"
@@ -35,7 +34,7 @@ var (
3534
)
3635

3736
func init() {
38-
media.RegisterCodec(rtp.NewAudioCodec(media.CodecInfo{
37+
media.RegisterCodec(media.NewAudioCodec(media.CodecInfo{
3938
SDPName: SDPName,
4039
SampleRate: 16000,
4140
RTPClockRate: 8000,

media.go

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
package media
1616

1717
import (
18-
"bufio"
1918
"fmt"
20-
"io"
2119
"strings"
2220
"sync/atomic"
2321
"time"
@@ -209,51 +207,3 @@ func (s MultiWriter[T]) Close() error {
209207
}
210208
return last
211209
}
212-
213-
func NewFileWriter[T Frame](w io.WriteCloser, sampleRate int) WriteCloser[T] {
214-
return &fileWriter[T]{
215-
w: w,
216-
bw: bufio.NewWriter(w),
217-
sampleRate: sampleRate,
218-
}
219-
}
220-
221-
type fileWriter[T Frame] struct {
222-
w io.WriteCloser
223-
bw *bufio.Writer
224-
sampleRate int
225-
buf []byte
226-
}
227-
228-
func (w *fileWriter[T]) String() string {
229-
return fmt.Sprintf("RawFile(%d)", w.sampleRate)
230-
}
231-
232-
func (w *fileWriter[T]) SampleRate() int {
233-
return w.sampleRate
234-
}
235-
236-
func (w *fileWriter[T]) WriteSample(sample T) error {
237-
if sz := sample.Size(); cap(w.buf) < sz {
238-
w.buf = make([]byte, sz)
239-
} else {
240-
w.buf = w.buf[:sz]
241-
}
242-
n, err := sample.CopyTo(w.buf)
243-
if err != nil {
244-
return err
245-
}
246-
_, err = w.bw.Write(w.buf[:n])
247-
return err
248-
}
249-
250-
func (w *fileWriter[T]) Close() error {
251-
if err := w.bw.Flush(); err != nil {
252-
_ = w.w.Close()
253-
return err
254-
}
255-
if err := w.w.Close(); err != nil {
256-
return err
257-
}
258-
return nil
259-
}

0 commit comments

Comments
 (0)