@@ -19,7 +19,8 @@ import (
1919 "time"
2020
2121 "errors"
22- "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
22+ gatewayclient "github.com/grepplabs/kafka-proxy/plugin/gateway-client/shared"
23+ localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
2324 "github.com/hashicorp/go-hclog"
2425 "github.com/hashicorp/go-plugin"
2526)
@@ -143,7 +144,7 @@ func init() {
143144func Run (_ * cobra.Command , _ []string ) {
144145 logrus .Infof ("Starting kafka-proxy version %s" , config .Version )
145146
146- var passwordAuthenticator shared .PasswordAuthenticator
147+ var passwordAuthenticator localauth .PasswordAuthenticator
147148 if c .Auth .Local .Enable {
148149 client := NewLocalAuthPluginClient ()
149150 defer client .Kill ()
@@ -157,11 +158,32 @@ func Run(_ *cobra.Command, _ []string) {
157158 logrus .Fatal (err )
158159 }
159160 var ok bool
160- passwordAuthenticator , ok = raw .(shared .PasswordAuthenticator )
161+ passwordAuthenticator , ok = raw .(localauth .PasswordAuthenticator )
161162 if ! ok {
162163 logrus .Fatal (errors .New ("unsupported plugin type" ))
163164 }
164165 }
166+
167+ var tokenProvider gatewayclient.TokenProvider
168+ if c .Auth .Gateway .Client .Enable {
169+ client := NewGatewayClientPluginClient ()
170+ defer client .Kill ()
171+
172+ rpcClient , err := client .Client ()
173+ if err != nil {
174+ logrus .Fatal (err )
175+ }
176+ raw , err := rpcClient .Dispense ("tokenProvider" )
177+ if err != nil {
178+ logrus .Fatal (err )
179+ }
180+ var ok bool
181+ tokenProvider , ok = raw .(gatewayclient.TokenProvider )
182+ if ! ok {
183+ logrus .Fatal (errors .New ("unsupported plugin type" ))
184+ }
185+ }
186+
165187 var g group.Group
166188 {
167189 // All active connections are stored in this variable.
@@ -175,7 +197,7 @@ func Run(_ *cobra.Command, _ []string) {
175197 if err != nil {
176198 logrus .Fatal (err )
177199 }
178- proxyClient , err := proxy .NewClient (connset , c , listeners .GetNetAddressMapping , passwordAuthenticator )
200+ proxyClient , err := proxy .NewClient (connset , c , listeners .GetNetAddressMapping , passwordAuthenticator , tokenProvider )
179201 if err != nil {
180202 logrus .Fatal (err )
181203 }
@@ -274,24 +296,32 @@ func SetLogger() {
274296 logrus .SetLevel (level )
275297}
276298
299+ func NewGatewayClientPluginClient () * plugin.Client {
300+ return NewPluginClient (gatewayclient .Handshake , gatewayclient .PluginMap , c .Auth .Gateway .Client .LogLevel , c .Auth .Gateway .Client .Command , c .Auth .Gateway .Client .Parameters )
301+ }
302+
277303func NewLocalAuthPluginClient () * plugin.Client {
304+ return NewPluginClient (localauth .Handshake , localauth .PluginMap , c .Auth .Local .LogLevel , c .Auth .Local .Command , c .Auth .Local .Parameters )
305+ }
306+
307+ func NewPluginClient (handshakeConfig plugin.HandshakeConfig , plugins map [string ]plugin.Plugin , logLevel string , command string , params []string ) * plugin.Client {
278308 jsonFormat := false
279309 if c .Log .Format == "json" {
280310 jsonFormat = true
281311 }
282312 logger := hclog .New (& hclog.LoggerOptions {
283313 Output : os .Stdout ,
284- Level : hclog .LevelFromString (c . Auth . Local . LogLevel ),
314+ Level : hclog .LevelFromString (logLevel ),
285315 Name : "plugin" ,
286316 JSONFormat : jsonFormat ,
287317 TimeFormat : time .RFC3339 ,
288318 })
289319
290320 return plugin .NewClient (& plugin.ClientConfig {
291- HandshakeConfig : shared . Handshake ,
292- Plugins : shared . PluginMap ,
321+ HandshakeConfig : handshakeConfig ,
322+ Plugins : plugins ,
293323 Logger : logger ,
294- Cmd : exec .Command (c . Auth . Local . Command , c . Auth . Local . Parameters ... ),
324+ Cmd : exec .Command (command , params ... ),
295325 AllowedProtocols : []plugin.Protocol {
296326 plugin .ProtocolNetRPC , plugin .ProtocolGRPC },
297327 })
0 commit comments