@@ -20,6 +20,7 @@ import (
2020
2121 "errors"
2222 "github.com/grepplabs/kafka-proxy/plugin/auth/shared"
23+ "github.com/hashicorp/go-hclog"
2324 "github.com/hashicorp/go-plugin"
2425)
2526
@@ -78,6 +79,7 @@ func init() {
7879 Server .Flags ().BoolVar (& c .Proxy .Auth .Enable , "proxy-listener-auth-enable" , false , "Enable SASL/PLAIN listener authentication" )
7980 Server .Flags ().StringVar (& c .Proxy .Auth .Command , "proxy-listener-auth-command" , "" , "Path to authentication plugin binary" )
8081 Server .Flags ().StringSliceVar (& c .Proxy .Auth .Parameters , "proxy-listener-auth-param" , []string {}, "Authentication plugin parameter" )
82+ Server .Flags ().StringVar (& c .Proxy .Auth .LogLevel , "proxy-listener-auth-log-level" , "trace" , "Log level of the auth plugin" )
8183
8284 // kafka
8385 Server .Flags ().StringVar (& c .Kafka .ClientID , "kafka-client-id" , "kafka-proxy" , "An optional identifier to track the source of requests" )
@@ -126,16 +128,9 @@ func Run(_ *cobra.Command, _ []string) {
126128
127129 var passwordAuthenticator shared.PasswordAuthenticator
128130 if c .Proxy .Auth .Enable {
129- client := plugin .NewClient (& plugin.ClientConfig {
130- HandshakeConfig : shared .Handshake ,
131- Plugins : shared .PluginMap ,
132- SyncStdout : os .Stdout ,
133- SyncStderr : os .Stderr ,
134- Cmd : exec .Command (c .Proxy .Auth .Command , c .Proxy .Auth .Parameters ... ),
135- AllowedProtocols : []plugin.Protocol {
136- plugin .ProtocolNetRPC , plugin .ProtocolGRPC },
137- })
131+ client := NewPluginClient ()
138132 defer client .Kill ()
133+
139134 rpcClient , err := client .Client ()
140135 if err != nil {
141136 logrus .Fatal (err )
@@ -150,8 +145,6 @@ func Run(_ *cobra.Command, _ []string) {
150145 logrus .Fatal (errors .New ("unsupported plugin type" ))
151146 }
152147 }
153- _ = passwordAuthenticator
154-
155148 var g group.Group
156149 {
157150 // All active connections are stored in this variable.
@@ -244,7 +237,15 @@ func NewHTTPHandler() http.Handler {
244237
245238func SetLogger () {
246239 if c .Log .Format == "json" {
247- logrus .SetFormatter (& logrus.JSONFormatter {})
240+ formatter := & logrus.JSONFormatter {
241+ FieldMap : logrus.FieldMap {
242+ logrus .FieldKeyTime : "@timestamp" ,
243+ logrus .FieldKeyLevel : "@level" ,
244+ logrus .FieldKeyMsg : "@message" ,
245+ },
246+ TimestampFormat : time .RFC3339 ,
247+ }
248+ logrus .SetFormatter (formatter )
248249 } else {
249250 logrus .SetFormatter (& logrus.TextFormatter {FullTimestamp : true })
250251 }
@@ -255,3 +256,26 @@ func SetLogger() {
255256 }
256257 logrus .SetLevel (level )
257258}
259+
260+ func NewPluginClient () * plugin.Client {
261+ jsonFormat := false
262+ if c .Log .Format == "json" {
263+ jsonFormat = true
264+ }
265+ logger := hclog .New (& hclog.LoggerOptions {
266+ Output : os .Stdout ,
267+ Level : hclog .LevelFromString (c .Proxy .Auth .LogLevel ),
268+ Name : "plugin" ,
269+ JSONFormat : jsonFormat ,
270+ TimeFormat : time .RFC3339 ,
271+ })
272+
273+ return plugin .NewClient (& plugin.ClientConfig {
274+ HandshakeConfig : shared .Handshake ,
275+ Plugins : shared .PluginMap ,
276+ Logger : logger ,
277+ Cmd : exec .Command (c .Proxy .Auth .Command , c .Proxy .Auth .Parameters ... ),
278+ AllowedProtocols : []plugin.Protocol {
279+ plugin .ProtocolNetRPC , plugin .ProtocolGRPC },
280+ })
281+ }
0 commit comments