Skip to content

Commit a4b1e63

Browse files
authored
Merge pull request #256 from rstudio/glin-slog
Finish migrating platform-lib to slog
2 parents 60a285e + 2d6f25c commit a4b1e63

File tree

19 files changed

+150
-155
lines changed

19 files changed

+150
-155
lines changed

pkg/rscache/file.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ func (o *fileCache) retryingGet(ctx context.Context, dir, address string, get fu
102102
// Preemptive get attempt
103103
if flushingGet() {
104104
if flushed == 0 {
105-
slog.Log(ctx, LevelTrace, fmt.Sprintf("Found cached item at address '%s' immediately", address))
105+
slog.Log(ctx, LevelTrace, "Found cached item immediately", "address", address)
106106
} else {
107-
slog.Log(ctx, LevelTrace, fmt.Sprintf("Found cached item at address '%s' after one flush", address))
107+
slog.Log(ctx, LevelTrace, "Found cached item after one flush", "address", address)
108108
}
109109
return true
110110
}
@@ -184,7 +184,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64
184184
err = o.queue.AddressedPush(ctx, resolver.Priority, resolver.GroupId, resolver.Address(), resolver.Work)
185185
if o.duplicateMatcher.IsDuplicate(err) {
186186
// Do nothing since; someone else has already inserted the work we need.
187-
slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", resolver.Address()))
187+
slog.Debug("FileCache: duplicate address push", "address", resolver.Address())
188188
} else if err != nil {
189189
return
190190
}
@@ -205,7 +205,7 @@ func (o *fileCache) Head(ctx context.Context, resolver ResolverSpec) (size int64
205205
if o.retryingGet(ctx, resolver.Dir(), resolver.Address(), head) {
206206
return
207207
} else {
208-
slog.Debug(fmt.Sprintf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address()))
208+
slog.Debug("error: FileCache reported address complete, but item was not found in cache", "address", resolver.Address())
209209
err = fmt.Errorf("error: FileCache reported address '%s' complete, but item was not found in cache", resolver.Address())
210210
return
211211
}
@@ -274,7 +274,7 @@ func (o *fileCache) Get(ctx context.Context, resolver ResolverSpec) (value *Cach
274274
err = o.queue.AddressedPush(ctx, resolver.Priority, resolver.GroupId, address, resolver.Work)
275275
if o.duplicateMatcher.IsDuplicate(err) {
276276
// Do nothing since; someone else has already inserted the work we need.
277-
slog.Debug(fmt.Sprintf("FileCache: duplicate address push for '%s'", address))
277+
slog.Debug("FileCache: duplicate address push", "address", address)
278278
} else if err != nil {
279279
value = &CacheReturn{
280280
Err: err,

pkg/rscache/internal/integration_test/memory_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"flag"
1010
"fmt"
1111
"io"
12-
"log"
12+
"log/slog"
1313
"os"
1414
"path/filepath"
1515
"testing"
@@ -217,7 +217,7 @@ func (s *MemoryCacheIntegrationSuite) TestInMemoryCaching(c *check.C) {
217217

218218
// Create an in-memory cache that is just large enough to hold 64 entries
219219
maxCost := entrySize * 64
220-
log.Printf("Creating cache with MaxCost=%d", maxCost)
220+
slog.Info("Creating cache", "maxCost", maxCost)
221221
rc, err := ristretto.NewCache(&ristretto.Config{
222222
NumCounters: 1000,
223223
MaxCost: maxCost,

pkg/rscache/memory_backed_file_cache.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"context"
99
"encoding/gob"
1010
"errors"
11-
"fmt"
1211
"io"
1312
"log/slog"
1413
"time"
@@ -92,7 +91,7 @@ func (mbfc *MemoryBackedFileCache) Get(ctx context.Context, resolver ResolverSpe
9291
if resolver.CacheInMemory && mbfc.mc != nil && mbfc.mc.Enabled() && ptr.Err == nil && ptr.GetSize() < mbfc.maxMemoryPerObject {
9392
err = mbfc.mc.Put(address, ptr)
9493
if err != nil {
95-
slog.Debug(fmt.Sprintf("error caching to memory: %s", err.Error()))
94+
slog.Debug("error caching to memory", "error", err)
9695
}
9796
}
9897
return *ptr
@@ -131,7 +130,7 @@ func (mbfc *MemoryBackedFileCache) GetObject(ctx context.Context, resolver Resol
131130
if resolver.CacheInMemory && mbfc.mc != nil && mbfc.mc.Enabled() && ptr.GetSize() < mbfc.maxMemoryPerObject {
132131
err = mbfc.mc.Put(address, ptr)
133132
if err != nil {
134-
slog.Debug(fmt.Sprintf("error caching to memory: %s", err.Error()))
133+
slog.Debug("error caching to memory", "error", err)
135134
}
136135
}
137136

pkg/rselection/impls/pgx/follower.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ package pgxelection
55
import (
66
"context"
77
"encoding/json"
8-
"fmt"
9-
"log"
108
"log/slog"
119
"time"
1210

@@ -105,7 +103,7 @@ func (p *PgxFollower) Follow(ctx context.Context) (result FollowResult) {
105103
case <-timeout.C:
106104
// Follower has received no pings for the timeout duration. It is time to
107105
// ask for a new leader.
108-
slog.Debug(fmt.Sprintf("Follower '%s' ping receipt timeout. Requesting a new leader", p.address))
106+
slog.Debug("Follower ping receipt timeout. Requesting a new leader", "address", p.address)
109107
go p.requestLeader(ctx)
110108
}
111109
return
@@ -122,13 +120,13 @@ func (p *PgxFollower) handleNotify(ctx context.Context, cn *electiontypes.Cluste
122120
resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP())
123121
b, err := json.Marshal(resp)
124122
if err != nil {
125-
log.Printf("Error marshaling notification to JSON: %s", err)
123+
slog.Error("Error marshaling notification to JSON", "error", err)
126124
return
127125
}
128-
slog.Log(ctx, LevelTrace, fmt.Sprintf("Follower %s responding to ping from leader %s", p.address, cn.SrcAddr))
126+
slog.Log(ctx, LevelTrace, "Follower responding to ping from leader", "follower", p.address, "leader", cn.SrcAddr)
129127
err = p.notify.Notify(ctx, p.chLeader, b)
130128
if err != nil {
131-
log.Printf("Follower error responding to leader ping: %s", err)
129+
slog.Error("Follower error responding to leader ping", "error", err)
132130
}
133131
}
134132

@@ -138,7 +136,7 @@ func (p *PgxFollower) requestLeader(ctx context.Context) {
138136
now := time.Now()
139137
// Limit how often this message logs to avoid too much spam
140138
if p.lastRequestLeaderErr.IsZero() || p.lastRequestLeaderErr.Add(5*time.Minute).Before(now) {
141-
log.Printf("Error pushing leader assumption work to queue: %s", err)
139+
slog.Error("Error pushing leader assumption work to queue", "error", err)
142140
p.lastRequestLeaderErr = now
143141
}
144142
}

pkg/rselection/impls/pgx/leader.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"context"
77
"encoding/json"
88
"fmt"
9-
"log"
109
"log/slog"
1110
"sort"
1211
"strings"
@@ -218,7 +217,11 @@ func (p *PgxLeader) verify(vCh chan bool) {
218217
// Upon exit, notify channel with `true` or `false` depending upon error status
219218
defer func(err *error) {
220219
if *err != nil {
221-
log.Printf("Error verifying cluster integrity: %s", *err)
220+
// TODO this should be worded better because sometimes this error is normal and expected,
221+
// e.g. when restarting a cluster node and the node list is temporarily out of sync:
222+
// Error verifying cluster integrity: node list length differs. Store node count 3 does not match leader count 2
223+
// It would be great to be able to distinguish between expected/unexpected errors here and log accordingly.
224+
slog.Error("Error verifying cluster integrity", "error", *err)
222225
vCh <- false
223226
} else {
224227
vCh <- true
@@ -262,7 +265,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) {
262265
}
263266
b, err := json.Marshal(req)
264267
if err != nil {
265-
log.Printf("Error marshaling notification to JSON: %s", err)
268+
slog.Error("Error marshaling notification to JSON", "error", err)
266269
return
267270
}
268271

@@ -274,7 +277,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) {
274277
err = p.notify.Notify(ctx, p.chFollower, b)
275278
if err != nil {
276279
p.pingSuccess = false
277-
log.Printf("Leader error pinging followers: %s", err)
280+
slog.Error("Leader error pinging followers", "error", err)
278281
return
279282
}
280283

@@ -284,7 +287,7 @@ func (p *PgxLeader) pingNodes(ctx context.Context) {
284287
err = p.notify.Notify(ctx, p.chLeader, b)
285288
if err != nil {
286289
p.pingSuccess = false
287-
log.Printf("Leader error pinging leaders: %s", err)
290+
slog.Error("Leader error pinging leaders", "error", err)
288291
return
289292
}
290293
}
@@ -308,22 +311,22 @@ func (p *PgxLeader) handleNodesRequest(ctx context.Context, cn *electiontypes.Cl
308311
resp := electiontypes.NewClusterNodesNotification(nodes, cn.Guid())
309312
b, err := json.Marshal(resp)
310313
if err != nil {
311-
log.Printf("Error marshaling notification to JSON: %s", err)
314+
slog.Error("Error marshaling notification to JSON", "error", err)
312315
return
313316
}
314317

315318
// Broadcast the response on the generic channel
316319
err = p.notify.Notify(ctx, p.chMessages, b)
317320
if err != nil {
318-
log.Printf("Leader error notifying of cluster nodes: %s", err)
321+
slog.Error("Leader error notifying of cluster nodes", "error", err)
319322
return
320323
}
321324
}
322325

323326
func (p *PgxLeader) handleLeaderPing(cn *electiontypes.ClusterPingRequest) {
324327
// If we received a ping from another leader, then stop leading
325328
if cn.SrcAddr != p.address {
326-
slog.Debug(fmt.Sprintf("Leader received ping from another leader. Stopping and moving back to the follower loop."))
329+
slog.Debug("Leader received ping from another leader. Stopping and moving back to the follower loop.")
327330
p.stop <- true
328331
} else {
329332
resp := electiontypes.NewClusterPingResponse(p.address, cn.SrcAddr, p.awb.IP())
@@ -367,7 +370,7 @@ func (p *PgxLeader) sweepNodes() {
367370
defer p.mutex.Unlock()
368371

369372
if p.unsuccessfulPing() {
370-
slog.Debug(fmt.Sprintf("Skipping cluster sweep due to unsuccessful pings"))
373+
slog.Debug("Skipping cluster sweep due to unsuccessful pings")
371374
return
372375
}
373376

@@ -378,7 +381,7 @@ func (p *PgxLeader) sweepNodes() {
378381
}
379382

380383
if node.Ping.Before(time.Now().Add(-p.maxPingAge)) {
381-
slog.Debug(fmt.Sprintf("Leader sweep removing cluster node %s", key))
384+
slog.Debug("Leader sweep removing cluster node", "node", key)
382385
delete(p.nodes, key)
383386
}
384387
}

pkg/rsnotify/broadcaster/broadcaster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package broadcaster
33
// Copyright (C) 2022 by RStudio, PBC.
44

55
import (
6-
"log"
6+
"log/slog"
77

88
"github.com/rstudio/platform-lib/v3/pkg/rsnotify/listener"
99
)
@@ -96,7 +96,7 @@ func (b *NotificationBroadcaster) broadcast() {
9696
}
9797
case err, more := <-b.errs:
9898
if more {
99-
log.Printf("Received error on queue addressed work notification channel: %s", err)
99+
slog.Error("Received error on queue addressed work notification channel", "error", err)
100100
}
101101
case sink := <-b.subscribe:
102102
sinks = append(sinks, sink)

pkg/rsnotify/listeners/local/locallistener.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ package local
44

55
import (
66
"errors"
7-
"fmt"
8-
"log"
97
"log/slog"
108
"sync"
119
"time"
@@ -82,10 +80,10 @@ func (l *Listener) Listen() (chan listener.Notification, chan error, error) {
8280

8381
func (l *Listener) listen(msgs chan listener.Notification, errs chan error) {
8482
defer func() {
85-
slog.Debug(fmt.Sprintf("Stopping listener..."))
83+
slog.Debug("Stopping listener...")
8684
l.items = nil
8785
close(l.stop)
88-
slog.Debug(fmt.Sprintf("Stopped."))
86+
slog.Debug("Stopped.")
8987
}()
9088

9189
l.wait(msgs, errs, l.stop)
@@ -97,27 +95,27 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c
9795
// channel is signaled or closed.
9896
//
9997
// This path is currently run only in the unit tests, so I included some
100-
// `log.Printf` usages are for the benefit of verbose testing output.
98+
// `slog.Debug` usages are for the benefit of verbose testing output.
10199
if l.deferredStart != nil {
102100
// First, wait for the test to notify that it's time to start listening. This
103101
// gives us a chance to set up a deadlock condition in the test.
104-
log.Printf("Waiting for test notification to start")
102+
slog.Debug("Waiting for test notification to start")
105103
<-l.deferredStart
106104
// Next, simulate an unexpected stop by waiting for a stop signal after
107105
// which we return without ever receiving from the `l.items` channel.
108-
log.Printf("Proceeding with wait by waiting for stop signal")
106+
slog.Debug("Proceeding with wait by waiting for stop signal")
109107
<-stop
110-
log.Printf("Stopped. Returning without receiving from l.items.")
108+
slog.Debug("Stopped. Returning without receiving from l.items.")
111109
return
112110
}
113111
for {
114112
select {
115113
case <-stop:
116-
slog.Debug(fmt.Sprintf("Stopping wait"))
114+
slog.Debug("Stopping wait")
117115
return
118116
case i := <-l.items:
119117
if msg, ok := i.(listener.Notification); ok {
120-
slog.Debug(fmt.Sprintf("Received message: %s. Sending to buffered channel with current size %d", msg.Guid(), len(msgs)))
118+
slog.Debug("Received message. Sending to buffered channel", "guid", msg.Guid(), "channelSize", len(msgs))
121119
msgs <- msg
122120
} else {
123121
errs <- ErrNotNotificationType
@@ -129,13 +127,13 @@ func (l *Listener) wait(msgs chan listener.Notification, errs chan error, stop c
129127
func (l *Listener) Stop() {
130128
if l.stop != nil {
131129
// Signal to stop
132-
slog.Debug(fmt.Sprintf("Signaling stop channel to stop..."))
130+
slog.Debug("Signaling stop channel to stop...")
133131
l.stop <- true
134132

135133
// Wait for stop
136-
slog.Debug(fmt.Sprintf("Waiting for stop channel to close..."))
134+
slog.Debug("Waiting for stop channel to close...")
137135
<-l.stop
138-
slog.Debug(fmt.Sprintf("Stop channel for %s closed.", l.guid))
136+
slog.Debug("Stop channel closed.", "guid", l.guid)
139137
l.stop = nil
140138

141139
// Remove from provider
@@ -169,7 +167,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
169167
notifyTxt = "renotify"
170168
}
171169

172-
slog.Debug(fmt.Sprintf("Notify called with type=%s on %d listeners", notifyTxt, len(l.listeners)))
170+
slog.Debug("Notify called", "type", notifyTxt, "listenerCount", len(l.listeners))
173171
for _, ll := range l.listeners {
174172
var needNotify bool
175173
if prevMissedItems == nil {
@@ -184,7 +182,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
184182
// There's a chance that `ll.items` could be non-nil, but not receiving. Timeout
185183
// to prevent deadlock, but keep trying until we're sure that the listener is
186184
// closed.
187-
slog.Debug(fmt.Sprintf("Ready to %s internal items with guid %s for channel %s %s: %+v", notifyTxt, notifyGuid, ll.guid, channel, n))
185+
slog.Debug("Ready to notify internal items", "type", notifyTxt, "notifyGuid", notifyGuid, "listenerGuid", ll.guid, "channel", channel, "notification", n)
188186
func() {
189187
// It's important to create a ticker and stop it so it doesn't leak. This has the potential to be called
190188
// thousands of times in a relatively short period on a busy server, so timer leaks can result in
@@ -193,9 +191,9 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
193191
defer timeout.Stop()
194192
select {
195193
case ll.items <- n:
196-
slog.Debug(fmt.Sprintf("Done with %s in local listener with guid %s for channel %s: %+v", notifyTxt, notifyGuid, channel, n))
194+
slog.Debug("Done with notify in local listener", "type", notifyTxt, "notifyGuid", notifyGuid, "channel", channel, "notification", n)
197195
case <-timeout.C:
198-
slog.Debug(fmt.Sprintf("Timeout during %s for listener %s/%s with guid %s for channel %s: %+v", notifyTxt, ll.guid, ll.name, notifyGuid, channel, n))
196+
slog.Debug("Timeout during notify", "type", notifyTxt, "listenerGuid", ll.guid, "listenerName", ll.name, "notifyGuid", notifyGuid, "channel", channel, "notification", n)
199197
// Record the timed-out notification in the `missed` map so we can retry it
200198
missed[ll.guid] = notifyGuid
201199
}
@@ -212,7 +210,7 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
212210
// provider after the mutex is again locked by the recursive call to `notify` will be
213211
// attempted again as needed.
214212
if len(missed) > 0 {
215-
slog.Debug(fmt.Sprintf("calling l.notify for %+v with guid %s for %d missed items on channel %s", n, notifyGuid, len(missed), channel))
213+
slog.Debug("calling l.notify for missed items", "notification", n, "notifyGuid", notifyGuid, "missedCount", len(missed), "channel", channel)
216214
stopCh := make(chan struct{})
217215
defer close(stopCh)
218216
// If logging is enabled, periodically record notifications that are still waiting
@@ -222,13 +220,13 @@ func (l *ListenerProvider) notify(channel string, n interface{}, prevMissedItems
222220
for {
223221
select {
224222
case <-tick.C:
225-
slog.Debug(fmt.Sprintf("still waiting on l.notify for +%v with guid %s on channel %s", n, notifyGuid, channel))
223+
slog.Debug("still waiting on l.notify", "notification", n, "notifyGuid", notifyGuid, "channel", channel)
226224
case <-stop:
227225
return
228226
}
229227
}
230228
}(stopCh)
231229
l.notify(channel, n, missed)
232-
slog.Debug(fmt.Sprintf("completed calling l.notify for %d missed items with guid %s on channel %s", len(missed), notifyGuid, channel))
230+
slog.Debug("completed calling l.notify for missed items", "missedCount", len(missed), "notifyGuid", notifyGuid, "channel", channel)
233231
}
234232
}

0 commit comments

Comments
 (0)