Skip to content

Commit f91c41b

Browse files
authored
Add a key ordering to clog logging (#3424)
* Add a key ordering to clog logging * update log line * Update test
1 parent d2e6fb4 commit f91c41b

File tree

3 files changed

+18
-9
lines changed

3 files changed

+18
-9
lines changed

clog/clog.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,15 @@ func init() {
5252
}
5353

5454
type values struct {
55-
mu sync.RWMutex
56-
vals map[string]string
55+
mu sync.RWMutex
56+
keysOrder []string
57+
vals map[string]string
5758
}
5859

5960
func newValues() *values {
6061
return &values{
61-
vals: make(map[string]string),
62+
vals: make(map[string]string),
63+
keysOrder: []string{},
6264
}
6365
}
6466

@@ -71,6 +73,7 @@ func Clone(parentCtx, logCtx context.Context) context.Context {
7173
cmap.mu.RLock()
7274
for k, v := range cmap.vals {
7375
newCmap.vals[k] = v
76+
newCmap.keysOrder = append(newCmap.keysOrder, k)
7477
}
7578
cmap.mu.RUnlock()
7679
}
@@ -109,6 +112,10 @@ func AddVal(ctx context.Context, key, val string) context.Context {
109112
ctx = context.WithValue(ctx, clogContextKey, cmap)
110113
}
111114
cmap.mu.Lock()
115+
// add to keysOrder only if the key is not already present to avoid duplicate fields
116+
if _, ok := cmap.vals[key]; !ok {
117+
cmap.keysOrder = append(cmap.keysOrder, key)
118+
}
112119
cmap.vals[key] = val
113120
cmap.mu.Unlock()
114121
return ctx
@@ -203,11 +210,11 @@ func messageFromContext(ctx context.Context, sb *strings.Builder) {
203210
sb.WriteString(" ")
204211
}
205212
}
206-
for key, val := range cmap.vals {
213+
for _, key := range cmap.keysOrder {
207214
if _, ok := stdKeys[key]; !ok {
208215
sb.WriteString(key)
209216
sb.WriteString("=")
210-
sb.WriteString(val)
217+
sb.WriteString(cmap.vals[key])
211218
sb.WriteString(" ")
212219
}
213220
}

clog/clog_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ func TestStdKeys(t *testing.T) {
2222
assert.Equal("manifestID=manID sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID ethaddress=0x0 orchestrator=http://127.0.0.1:8935 customKey=customVal testing message num=452", msg)
2323
ctxCloned := Clone(context.Background(), ctx)
2424
ctxCloned = AddManifestID(ctxCloned, "newManifest")
25+
ctxCloned = AddVal(ctxCloned, "customKey2", "customVal2")
2526
msgCloned, _ := formatMessage(ctxCloned, false, false, "testing message num=%d", 4521)
26-
assert.Equal("manifestID=newManifest sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID ethaddress=0x0 orchestrator=http://127.0.0.1:8935 customKey=customVal testing message num=4521", msgCloned)
27+
assert.Equal("manifestID=newManifest sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID ethaddress=0x0 orchestrator=http://127.0.0.1:8935 customKey=customVal customKey2=customVal2 testing message num=4521", msgCloned)
2728
// old context shouldn't change
2829
msg, _ = formatMessage(ctx, false, false, "testing message num=%d", 452)
2930
assert.Equal("manifestID=manID sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID ethaddress=0x0 orchestrator=http://127.0.0.1:8935 customKey=customVal testing message num=452", msg)

server/ai_mediaserver.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
540540
ms := media.MediaSegmenter{Workdir: ls.LivepeerNode.WorkDir, MediaMTXClient: mediaMTXClient}
541541
ms.RunSegmentation(ctx, fmt.Sprintf("rtmp://%s/%s%s", remoteHost, mediaMTXStreamPrefix, streamName), ssr.Read)
542542
ssr.Close()
543-
ls.cleanupLive(streamName)
543+
ls.cleanupLive(ctx, streamName)
544544
}()
545545

546546
sendErrorEvent := LiveErrorEventSender(ctx, streamID, map[string]string{
@@ -557,7 +557,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
557557
if err == nil {
558558
return
559559
}
560-
clog.Errorf(ctx, "Live video pipeline stopping: %s", err)
560+
clog.Errorf(ctx, "Live video pipeline finished with error: %s", err)
561561

562562
sendErrorEvent(err)
563563

@@ -687,7 +687,8 @@ func (ls *LivepeerServer) GetLiveVideoToVideoStatus() http.Handler {
687687
})
688688
}
689689

690-
func (ls *LivepeerServer) cleanupLive(stream string) {
690+
func (ls *LivepeerServer) cleanupLive(ctx context.Context, stream string) {
691+
clog.Infof(ctx, "Live video pipeline finished")
691692
ls.LivepeerNode.LiveMu.Lock()
692693
pub, ok := ls.LivepeerNode.LivePipelines[stream]
693694
delete(ls.LivepeerNode.LivePipelines, stream)

0 commit comments

Comments
 (0)