@@ -3,6 +3,7 @@ package kafka
3
3
import (
4
4
"context"
5
5
"crypto/tls"
6
+ "fmt"
6
7
"io"
7
8
"net"
8
9
"strconv"
@@ -281,8 +282,13 @@ func (d *Dialer) connect(ctx context.Context, network, address string, connCfg C
281
282
conn := NewConnWith (c , connCfg )
282
283
283
284
if d .SASLMechanism != nil {
285
+ host , port , err := splitHostPortNumber (address )
286
+ if err != nil {
287
+ return nil , err
288
+ }
284
289
metadata := & sasl.Metadata {
285
- Host : address ,
290
+ Host : host ,
291
+ Port : port ,
286
292
}
287
293
if err := d .authenticateSASL (sasl .WithMetadata (ctx , metadata ), conn ); err != nil {
288
294
_ = conn .Close ()
@@ -435,14 +441,28 @@ func backoff(attempt int, min time.Duration, max time.Duration) time.Duration {
435
441
return d
436
442
}
437
443
444
+ func canonicalAddress (s string ) string {
445
+ return net .JoinHostPort (splitHostPort (s ))
446
+ }
447
+
438
448
func splitHostPort (s string ) (host string , port string ) {
439
449
host , port , _ = net .SplitHostPort (s )
440
450
if len (host ) == 0 && len (port ) == 0 {
441
451
host = s
452
+ port = "9092"
442
453
}
443
454
return
444
455
}
445
456
457
+ func splitHostPortNumber (s string ) (host string , portNumber int , err error ) {
458
+ host , port := splitHostPort (s )
459
+ portNumber , err = strconv .Atoi (port )
460
+ if err != nil {
461
+ return host , 0 , fmt .Errorf ("%s: %w" , s , err )
462
+ }
463
+ return host , portNumber , nil
464
+ }
465
+
446
466
func lookupHost (ctx context.Context , address string , resolver Resolver ) (string , error ) {
447
467
host , port := splitHostPort (address )
448
468
@@ -468,9 +488,5 @@ func lookupHost(ctx context.Context, address string, resolver Resolver) (string,
468
488
}
469
489
}
470
490
471
- if port == "" {
472
- port = "9092"
473
- }
474
-
475
491
return net .JoinHostPort (host , port ), nil
476
492
}
0 commit comments