Skip to content

Commit ddd2ee1

Browse files
authored
logging update (#851)
1 parent 50bd3d8 commit ddd2ee1

File tree

5 files changed

+128
-59
lines changed

5 files changed

+128
-59
lines changed

pkg/config/base.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"strings"
2020
"time"
2121

22+
"github.com/livekit/egress/pkg/logging"
2223
"github.com/livekit/protocol/logger"
2324
"github.com/livekit/protocol/redis"
2425
lksdk "github.com/livekit/server-sdk-go/v2"
@@ -99,14 +100,6 @@ func (c *BaseConfig) initLogger(values ...interface{}) error {
99100
l := zl.WithValues(values...)
100101

101102
logger.SetLogger(l, "egress")
102-
lksdk.SetLogger(&downgradeLogger{Logger: l})
103+
lksdk.SetLogger(logging.NewDowngradeLogger())
103104
return nil
104105
}
105-
106-
type downgradeLogger struct {
107-
logger.Logger
108-
}
109-
110-
func (d *downgradeLogger) Errorw(msg string, err error, keysAndValues ...interface{}) {
111-
d.Logger.Warnw(msg, err, keysAndValues...)
112-
}

pkg/logging/logging.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2025 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package logging
16+
17+
import (
18+
"fmt"
19+
"strings"
20+
"sync"
21+
22+
"github.com/aws/smithy-go/logging"
23+
24+
"github.com/livekit/protocol/logger"
25+
)
26+
27+
// DowngradeLogger converts errors to warnings
28+
type DowngradeLogger struct {
29+
logger.Logger
30+
}
31+
32+
func NewDowngradeLogger() *DowngradeLogger {
33+
return &DowngradeLogger{
34+
Logger: logger.GetLogger(),
35+
}
36+
}
37+
38+
func (l *DowngradeLogger) Errorw(msg string, err error, keysAndValues ...interface{}) {
39+
l.Logger.Warnw(msg, err, keysAndValues...)
40+
}
41+
42+
// DebugLogger logs command outputs
43+
type DebugLogger struct {
44+
cmd string
45+
}
46+
47+
func NewDebugLogger(cmd string) *DebugLogger {
48+
return &DebugLogger{
49+
cmd: cmd,
50+
}
51+
}
52+
53+
func (l *DebugLogger) Write(p []byte) (int, error) {
54+
logger.Infow(fmt.Sprintf("%s: %s", l.cmd, string(p)))
55+
return len(p), nil
56+
}
57+
58+
// ProcessLogger catches stray outputs from handlers
59+
type ProcessLogger struct {
60+
logger logger.Logger
61+
}
62+
63+
func NewProcessLogger(handlerID, egressID string) *ProcessLogger {
64+
return &ProcessLogger{
65+
logger: logger.GetLogger().WithValues("handlerID", handlerID, "egressID", egressID),
66+
}
67+
}
68+
69+
func (l *ProcessLogger) Write(p []byte) (n int, err error) {
70+
s := string(p)
71+
if strings.HasPrefix(s, "00:00:") {
72+
// ignore cuda and template not mapped gstreamer warnings
73+
} else if strings.HasPrefix(s, "turnc") {
74+
// warn on turnc error
75+
l.logger.Warnw(s, nil)
76+
} else {
77+
// panics and unexpected errors
78+
l.logger.Errorw(s, nil)
79+
}
80+
return len(p), nil
81+
}
82+
83+
// S3Logger only logs aws messages on upload failure
84+
type S3Logger struct {
85+
mu sync.Mutex
86+
msgs []string
87+
idx int
88+
}
89+
90+
func NewS3Logger() *S3Logger {
91+
return &S3Logger{
92+
msgs: make([]string, 10),
93+
}
94+
}
95+
96+
func (l *S3Logger) Logf(classification logging.Classification, format string, v ...interface{}) {
97+
format = "aws %s: " + format
98+
v = append([]interface{}{strings.ToLower(string(classification))}, v...)
99+
100+
l.mu.Lock()
101+
l.msgs[l.idx%len(l.msgs)] = fmt.Sprintf(format, v...)
102+
l.idx++
103+
l.mu.Unlock()
104+
}
105+
106+
func (l *S3Logger) WriteLogs() {
107+
l.mu.Lock()
108+
size := len(l.msgs)
109+
for range size {
110+
if msg := l.msgs[l.idx%size]; msg != "" {
111+
logger.Debugw(msg)
112+
}
113+
l.idx++
114+
}
115+
l.mu.Unlock()
116+
}

pkg/pipeline/sink/uploader/s3.go

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,18 @@ import (
2222
"net/url"
2323
"os"
2424
"path"
25-
"strings"
26-
"sync"
2725

2826
"github.com/aws/aws-sdk-go-v2/aws"
2927
"github.com/aws/aws-sdk-go-v2/aws/retry"
3028
awsConfig "github.com/aws/aws-sdk-go-v2/config"
3129
"github.com/aws/aws-sdk-go-v2/credentials"
3230
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
3331
"github.com/aws/aws-sdk-go-v2/service/s3"
34-
"github.com/aws/smithy-go/logging"
3532

3633
"github.com/livekit/egress/pkg/config"
3734
"github.com/livekit/egress/pkg/errors"
35+
"github.com/livekit/egress/pkg/logging"
3836
"github.com/livekit/egress/pkg/types"
39-
"github.com/livekit/protocol/logger"
4037
"github.com/livekit/psrpc"
4138
)
4239

@@ -155,9 +152,7 @@ func (u *S3Uploader) upload(
155152
return "", 0, errors.ErrUploadFailed("S3", err)
156153
}
157154

158-
l := &s3Logger{
159-
msgs: make([]string, 10),
160-
}
155+
l := logging.NewS3Logger()
161156
client := s3.NewFromConfig(*u.awsConf, func(o *s3.Options) {
162157
o.Logger = l
163158
o.ClientLogMode = aws.LogRequest | aws.LogResponse | aws.LogRetries
@@ -182,7 +177,7 @@ func (u *S3Uploader) upload(
182177
}
183178

184179
if _, err = manager.NewUploader(client).Upload(context.Background(), input); err != nil {
185-
l.log()
180+
l.WriteLogs()
186181
return "", 0, errors.ErrUploadFailed("S3", err)
187182
}
188183

@@ -213,35 +208,6 @@ func (u *S3Uploader) upload(
213208
return res.URL, stat.Size(), nil
214209
}
215210

216-
// s3Logger only logs aws messages on upload failure
217-
type s3Logger struct {
218-
mu sync.Mutex
219-
msgs []string
220-
idx int
221-
}
222-
223-
func (l *s3Logger) Logf(classification logging.Classification, format string, v ...interface{}) {
224-
format = "aws %s: " + format
225-
v = append([]interface{}{strings.ToLower(string(classification))}, v...)
226-
227-
l.mu.Lock()
228-
l.msgs[l.idx%len(l.msgs)] = fmt.Sprintf(format, v...)
229-
l.idx++
230-
l.mu.Unlock()
231-
}
232-
233-
func (l *s3Logger) log() {
234-
l.mu.Lock()
235-
size := len(l.msgs)
236-
for range size {
237-
if msg := l.msgs[l.idx%size]; msg != "" {
238-
logger.Debugw(msg)
239-
}
240-
l.idx++
241-
}
242-
l.mu.Unlock()
243-
}
244-
245211
type s3Retryer struct{}
246212

247213
func (r *s3Retryer) IsErrorRetryable(_ error) aws.Ternary {

pkg/pipeline/source/web.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
"github.com/livekit/egress/pkg/config"
3232
"github.com/livekit/egress/pkg/errors"
33+
"github.com/livekit/egress/pkg/logging"
3334
"github.com/livekit/egress/pkg/pipeline/source/pulse"
3435
"github.com/livekit/protocol/livekit"
3536
"github.com/livekit/protocol/logger"
@@ -133,15 +134,6 @@ func (s *WebSource) Close() {
133134
}
134135
}
135136

136-
type debugLogger struct {
137-
cmd string
138-
}
139-
140-
func (l *debugLogger) Write(b []byte) (int, error) {
141-
logger.Debugw(fmt.Sprintf("%s: %s", l.cmd, string(b)))
142-
return len(b), nil
143-
}
144-
145137
// creates a new pulse audio sink
146138
func (s *WebSource) createPulseSink(ctx context.Context, p *config.PipelineConfig) error {
147139
ctx, span := tracer.Start(ctx, "WebInput.createPulseSink")
@@ -156,7 +148,7 @@ func (s *WebSource) createPulseSink(ctx context.Context, p *config.PipelineConfi
156148
fmt.Sprintf("sink_properties=device.description=\"%s\"", p.Info.EgressId),
157149
)
158150
var b bytes.Buffer
159-
l := &debugLogger{cmd: "pactl"}
151+
l := logging.NewDebugLogger("pactl")
160152
cmd.Stdout = &b
161153
cmd.Stderr = l
162154
err := cmd.Run()

pkg/server/server_rpc.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package server
1717
import (
1818
"context"
1919
"net/http"
20-
"os"
2120
"os/exec"
2221
"path"
2322
"syscall"
@@ -28,6 +27,7 @@ import (
2827

2928
"github.com/livekit/egress/pkg/config"
3029
"github.com/livekit/egress/pkg/errors"
30+
"github.com/livekit/egress/pkg/logging"
3131
"github.com/livekit/protocol/egress"
3232
"github.com/livekit/protocol/livekit"
3333
"github.com/livekit/protocol/logger"
@@ -127,8 +127,10 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
127127
"--request", string(reqString),
128128
)
129129
cmd.Dir = "/"
130-
cmd.Stdout = os.Stdout
131-
cmd.Stderr = os.Stderr
130+
131+
l := logging.NewProcessLogger(handlerID, info.EgressId)
132+
cmd.Stdout = l
133+
cmd.Stderr = l
132134
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
133135

134136
if err = s.Launch(context.Background(), handlerID, req, info, cmd); err != nil {

0 commit comments

Comments
 (0)