Skip to content

Commit 49343a4

Browse files
smb2lovegaius-qi
andauthored
fix(manager,scheduler): IPv6-safe host:port formatting (#4555)
* fix(manager,scheduler): IPv6-safe host:port formatting Use net.JoinHostPort for host:port strings to correctly bracket IPv6 literals. Keep listener address formatting with fmt.Sprintf when ip.FormatIP already returns bracketed IPv6, to avoid double-bracketing (e.g. [[::]]:port). Tests: go test ./manager/... ./scheduler/... Signed-off-by: smb2love <smb2love@users.noreply.github.com> * refactor(network): simplify listener address construction Signed-off-by: Gaius <gaius.qi@gmail.com> * chore(ip): remove unused IP package and tests Signed-off-by: Gaius <gaius.qi@gmail.com> --------- Signed-off-by: smb2love <smb2love@users.noreply.github.com> Signed-off-by: Gaius <gaius.qi@gmail.com> Co-authored-by: smb2love <smb2love@users.noreply.github.com> Co-authored-by: Gaius <gaius.qi@gmail.com>
1 parent a9301ea commit 49343a4

File tree

13 files changed

+43
-116
lines changed

13 files changed

+43
-116
lines changed

cmd/dependency/dependency.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"os/signal"
2626
"path/filepath"
2727
"reflect"
28+
"strconv"
2829
"strings"
2930
"syscall"
3031
"time"
@@ -123,7 +124,7 @@ func InitMonitor(ctx context.Context, pprofPort int, tracingConfig base.TracingC
123124

124125
// startStatsView starts the statsview server on the specified port.
125126
func startStatsView(port int) func() {
126-
addr := fmt.Sprintf("%s:%d", net.IPv4zero.String(), port)
127+
addr := net.JoinHostPort(net.IPv4zero.String(), strconv.Itoa(port))
127128
viewer.SetConfiguration(viewer.WithAddr(addr))
128129
sv := statsview.New()
129130

manager/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package config
1818

1919
import (
2020
"errors"
21-
"fmt"
2221
"net"
22+
"strconv"
2323
"time"
2424

2525
"d7y.io/dragonfly/v2/cmd/dependency/base"
@@ -638,7 +638,7 @@ func (cfg *Config) Validate() error {
638638
func (cfg *Config) Convert() error {
639639
// TODO Compatible with deprecated fields host and port.
640640
if len(cfg.Database.Redis.Addrs) == 0 && cfg.Database.Redis.Host != "" && cfg.Database.Redis.Port > 0 {
641-
cfg.Database.Redis.Addrs = []string{fmt.Sprintf("%s:%d", cfg.Database.Redis.Host, cfg.Database.Redis.Port)}
641+
cfg.Database.Redis.Addrs = []string{net.JoinHostPort(cfg.Database.Redis.Host, strconv.Itoa(cfg.Database.Redis.Port))}
642642
}
643643

644644
if cfg.Server.GRPC.AdvertiseIP == nil {

manager/database/mysql.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
package database
1818

1919
import (
20-
"fmt"
20+
"net"
21+
"strconv"
2122
"time"
2223

2324
"github.com/docker/go-connections/tlsconfig"
@@ -90,7 +91,7 @@ func formatMysqlDSN(cfg *config.MysqlConfig) (string, error) {
9091
mysqlCfg := mysql.Config{
9192
User: cfg.User,
9293
Passwd: cfg.Password,
93-
Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port),
94+
Addr: net.JoinHostPort(cfg.Host, strconv.Itoa(int(cfg.Port))),
9495
Net: "tcp",
9596
DBName: cfg.DBName,
9697
Loc: time.Local,

manager/manager.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package manager
1919
import (
2020
"context"
2121
"embed"
22-
"fmt"
2322
"io/fs"
2423
"net"
2524
"net/http"
25+
"strconv"
2626
"time"
2727

2828
"github.com/gin-contrib/static"
@@ -44,7 +44,6 @@ import (
4444
"d7y.io/dragonfly/v2/manager/service"
4545
"d7y.io/dragonfly/v2/pkg/dfpath"
4646
pkggc "d7y.io/dragonfly/v2/pkg/gc"
47-
"d7y.io/dragonfly/v2/pkg/net/ip"
4847
"d7y.io/dragonfly/v2/pkg/redis"
4948
"d7y.io/dragonfly/v2/pkg/rpc"
5049
)
@@ -297,13 +296,7 @@ func (s *Server) Serve() error {
297296
s.gc.Start(context.Background())
298297
logger.Info("started gc server")
299298

300-
// Generate GRPC listener.
301-
ip, ok := ip.FormatIP(s.config.Server.GRPC.ListenIP.String())
302-
if !ok {
303-
return fmt.Errorf("format ip failed: %s", ip)
304-
}
305-
306-
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, s.config.Server.GRPC.Port.Start))
299+
listener, err := net.Listen("tcp", net.JoinHostPort(s.config.Server.GRPC.ListenIP.String(), strconv.Itoa(s.config.Server.GRPC.Port.Start)))
307300
if err != nil {
308301
logger.Fatalf("net listener failed to start: %v", err)
309302
}

pkg/net/ip/ip.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

pkg/net/ip/ip_test.go

Lines changed: 0 additions & 38 deletions
This file was deleted.

scheduler/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package config
1818

1919
import (
2020
"errors"
21-
"fmt"
2221
"net"
22+
"strconv"
2323
"time"
2424

2525
"d7y.io/dragonfly/v2/cmd/dependency/base"
@@ -554,7 +554,7 @@ func (cfg *Config) Convert() error {
554554

555555
// TODO Compatible with deprecated fields host and port of redis of job.
556556
if len(cfg.Database.Redis.Addrs) == 0 && len(cfg.Job.Redis.Addrs) == 0 && cfg.Job.Redis.Host != "" && cfg.Job.Redis.Port > 0 {
557-
cfg.Database.Redis.Addrs = []string{fmt.Sprintf("%s:%d", cfg.Job.Redis.Host, cfg.Job.Redis.Port)}
557+
cfg.Database.Redis.Addrs = []string{net.JoinHostPort(cfg.Job.Redis.Host, strconv.Itoa(cfg.Job.Redis.Port))}
558558
}
559559

560560
// TODO Compatible with deprecated fields master name of redis of job.

scheduler/job/job.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323
"errors"
2424
"fmt"
2525
"io"
26+
"net"
2627
"slices"
28+
"strconv"
2729
"sync"
2830

2931
"github.com/dragonflyoss/machinery/v1"
@@ -279,7 +281,7 @@ func (j *job) preheatV1SingleSeedPeer(ctx context.Context, req *internaljob.Preh
279281
return nil, err
280282
}
281283

282-
addr := fmt.Sprintf("%s:%d", selected.IP, selected.Port)
284+
addr := net.JoinHostPort(selected.IP, strconv.Itoa(int(selected.Port)))
283285
log.Infof("[preheat]: selected seed peer %s", addr)
284286

285287
// TODO(chlins): reuse the client if we encounter the performance issue in future.
@@ -363,7 +365,7 @@ func (j *job) preheatV2SingleSeedPeerByURL(ctx context.Context, url string, req
363365
return nil, err
364366
}
365367

366-
addr := fmt.Sprintf("%s:%d", selected.IP, selected.Port)
368+
addr := net.JoinHostPort(selected.IP, strconv.Itoa(int(selected.Port)))
367369
log.Infof("[preheat]: selected seed peer %s", addr)
368370

369371
client, err := j.resource.PeerClientPool().Get(addr, j.dialOptions...)
@@ -445,7 +447,7 @@ func (j *job) PreheatAllSeedPeers(ctx context.Context, req *internaljob.PreheatR
445447
port = seedPeer.Port
446448
)
447449

448-
addr := fmt.Sprintf("%s:%d", ip, port)
450+
addr := net.JoinHostPort(ip, strconv.Itoa(int(port)))
449451
peg, _ := errgroup.WithContext(ctx)
450452
peg.SetLimit(int(req.ConcurrentPeerCount))
451453
peg.Go(func() error {
@@ -665,7 +667,7 @@ func (j *job) PreheatAllPeers(ctx context.Context, req *internaljob.PreheatReque
665667
port = peer.Port
666668
)
667669

668-
addr := fmt.Sprintf("%s:%d", ip, port)
670+
addr := net.JoinHostPort(ip, strconv.Itoa(int(port)))
669671
peg, _ := errgroup.WithContext(ctx)
670672
peg.SetLimit(int(req.ConcurrentPeerCount))
671673
peg.Go(func() error {
@@ -921,7 +923,7 @@ func (j *job) GetTask(ctx context.Context, req *internaljob.GetTaskRequest, log
921923
eg.SetLimit(int(req.ConcurrentPeerCount))
922924
for _, host := range hosts {
923925
eg.Go(func() error {
924-
addr := fmt.Sprintf("%s:%d", host.IP, host.Port)
926+
addr := net.JoinHostPort(host.IP, strconv.Itoa(int(host.Port)))
925927
dfdaemonClient, err := j.resource.PeerClientPool().Get(addr, j.dialOptions...)
926928
if err != nil {
927929
log.Warnf("[get-task] get client from %s failed: %s", addr, err.Error())
@@ -996,7 +998,7 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
996998
for _, finishedPeer := range finishedPeers {
997999
log := logger.WithDeleteTaskJobAndPeer(req.GroupUUID, req.TaskUUID, finishedPeer.Host.ID, finishedPeer.Task.ID, finishedPeer.ID)
9981000

999-
addr := fmt.Sprintf("%s:%d", finishedPeer.Host.IP, finishedPeer.Host.Port)
1001+
addr := net.JoinHostPort(finishedPeer.Host.IP, strconv.Itoa(int(finishedPeer.Host.Port)))
10001002
dfdaemonClient, err := j.resource.PeerClientPool().Get(addr, j.dialOptions...)
10011003
if err != nil {
10021004
log.Errorf("[delete-task] get client from %s failed: %s", addr, err.Error())
@@ -1058,7 +1060,7 @@ func (j *job) ListTaskEntries(ctx context.Context, req *internaljob.ListTaskEntr
10581060
selected = peers[0]
10591061
}
10601062

1061-
addr := fmt.Sprintf("%s:%d", selected.IP, selected.Port)
1063+
addr := net.JoinHostPort(selected.IP, strconv.Itoa(int(selected.Port)))
10621064
log.Infof("[list-task-entries] selected seed peer %s for task %s", addr, req.TaskID)
10631065

10641066
dfdaemonClient, err := j.resource.PeerClientPool().Get(addr, j.dialOptions...)

scheduler/resource/standard/peer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
"crypto/tls"
2222
"fmt"
2323
"io"
24+
"net"
2425
"net/http"
2526
"net/url"
2627
"regexp"
28+
"strconv"
2729
"time"
2830

2931
"github.com/bits-and-blooms/bitset"
@@ -439,7 +441,7 @@ func (p *Peer) DownloadTinyFile() ([]byte, error) {
439441
// Download path: ${host}:${port}/download/${taskIndex}/${taskID}?peerId=${peerID}
440442
targetURL := url.URL{
441443
Scheme: "http",
442-
Host: fmt.Sprintf("%s:%d", p.Host.IP, p.Host.DownloadPort),
444+
Host: net.JoinHostPort(p.Host.IP, strconv.Itoa(int(p.Host.DownloadPort))),
443445
Path: fmt.Sprintf("download/%s/%s", p.Task.ID[:3], p.Task.ID),
444446
RawQuery: fmt.Sprintf("peerId=%s", p.ID),
445447
}

scheduler/resource/standard/seed_peer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"context"
2323
"fmt"
2424
"io"
25+
"net"
26+
"strconv"
2527
"sync"
2628
"time"
2729

@@ -123,7 +125,7 @@ func (s *seedPeer) TriggerDownloadTask(ctx context.Context, taskID string, req *
123125
return err
124126
}
125127

126-
addr := fmt.Sprintf("%s:%d", selected.IP, selected.Port)
128+
addr := net.JoinHostPort(selected.IP, strconv.Itoa(int(selected.Port)))
127129
logger.Infof("selected seed peer %s for task %s", addr, taskID)
128130

129131
client, err := s.clientPool.Get(addr, s.dialOptions...)
@@ -173,7 +175,7 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
173175
return nil, nil, err
174176
}
175177

176-
addr := fmt.Sprintf("%s:%d", selected.IP, selected.Port)
178+
addr := net.JoinHostPort(selected.IP, strconv.Itoa(int(selected.Port)))
177179
logger.Infof("selected seed peer %s for task %s", addr, task.ID)
178180

179181
// TODO(chlins): reuse the client if we encounter the performance issue in future.
@@ -341,7 +343,7 @@ func (s *seedPeer) refresh(ctx context.Context) error {
341343
healthyHosts := &sync.Map{}
342344
// Do the health check for each seed peer.
343345
for _, host := range hosts {
344-
addr := fmt.Sprintf("%s:%d", host.IP, host.Port)
346+
addr := net.JoinHostPort(host.IP, strconv.Itoa(int(host.Port)))
345347
if err := healthclient.Check(ctx, addr, s.dialOptions...); err != nil {
346348
logger.Errorf("failed to check the healthy for seed peer %s: %v", addr, err)
347349
} else {

0 commit comments

Comments
 (0)