Skip to content

Commit e17c1b9

Browse files
authored
Merge pull request #296 from moov-io/feat-sarama-logger
kafka: support a debug sarama logger
2 parents ac49fea + 52745d0 commit e17c1b9

File tree

4 files changed

+84
-0
lines changed

4 files changed

+84
-0
lines changed

cmd/achgateway/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/moov-io/achgateway"
2626
"github.com/moov-io/achgateway/internal"
27+
"github.com/moov-io/achgateway/internal/kafka"
2728
"github.com/moov-io/achgateway/internal/service"
2829
"github.com/moov-io/base/log"
2930
)
@@ -33,6 +34,8 @@ func main() {
3334
Logger: log.NewDefaultLogger().Set("app", log.String("achgateway")).Set("version", log.String(achgateway.Version)),
3435
}
3536

37+
kafka.EnableSaramaDebugLogging(env.Logger)
38+
3639
env, err := internal.NewEnvironment(env)
3740
if err != nil {
3841
env.Logger.Fatal().LogErrorf("Error loading up environment: %v", err)

docs/config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ ACHGateway:
174174
[ Endpoint: <string> | default = "" ]
175175
```
176176
177+
> TIP: Set `SARAMA_DEBUG_LOGGING=yes` to enable debug logging for the IBM/sarama (kafka) library.
178+
177179
### Sharding
178180
```yaml
179181
Sharding:

internal/kafka/kafka_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package kafka_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/moov-io/achgateway/internal/kafka"
8+
"github.com/moov-io/achgateway/internal/service"
9+
"github.com/moov-io/base/log"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestSaramaDebugLogging(t *testing.T) {
15+
t.Setenv("SARAMA_DEBUG_LOGGING", "yes")
16+
17+
buf, logger := log.NewBufferLogger()
18+
kafka.EnableSaramaDebugLogging(logger)
19+
20+
cfg := service.KafkaConfig{
21+
Brokers: []string{"127.0.0.1:55555"},
22+
}
23+
24+
topic, err := kafka.OpenTopic(log.NewNopLogger(), &cfg)
25+
require.Nil(t, topic)
26+
require.Error(t, err)
27+
28+
fmt.Printf("\n%s\n", buf.String())
29+
require.Contains(t, buf.String(), `msg="ClientID is the default of 'sarama',`)
30+
}

internal/kafka/logger.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package kafka
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/moov-io/base/log"
8+
"github.com/moov-io/base/strx"
9+
10+
"github.com/IBM/sarama"
11+
)
12+
13+
// SaramaLogger implements the sarama.StdLogger interface with a moov-io/base logger
14+
//
15+
// All messages are logged at a debug level.
16+
type SaramaLogger struct {
17+
logger log.Logger
18+
}
19+
20+
func NewSaramaLogger(logger log.Logger) *SaramaLogger {
21+
return &SaramaLogger{
22+
logger: logger,
23+
}
24+
}
25+
26+
var _ sarama.StdLogger = (&SaramaLogger{})
27+
28+
func (l *SaramaLogger) Print(v ...interface{}) {
29+
l.logger.Debug().Log(fmt.Sprint(v...))
30+
}
31+
32+
func (l *SaramaLogger) Printf(format string, v ...interface{}) {
33+
l.logger.Debug().Logf(format, v...)
34+
35+
}
36+
37+
func (l *SaramaLogger) Println(v ...interface{}) {
38+
l.Print(v...)
39+
}
40+
41+
// EnableSaramaDebugLogging overrides the default sarama logger (which discards everything)
42+
// with the provided moov-io/base logger.
43+
//
44+
// All messages are logged at a debug level.
45+
func EnableSaramaDebugLogging(logger log.Logger) {
46+
if strx.Yes(os.Getenv("SARAMA_DEBUG_LOGGING")) {
47+
sarama.Logger = NewSaramaLogger(logger)
48+
}
49+
}

0 commit comments

Comments
 (0)