Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ type Client struct {
// By default connection duration is unlimited.
MaxConnDuration time.Duration

// TCPKeepalivePeriod is the duration the connection needs to
// remain idle before TCP starts sending keepalive probes.
//
// This field is only effective when TCPKeepalive is set to true.
// A zero value indicates that the operating system's default setting will be used.
TCPKeepalivePeriod time.Duration

// Maximum number of attempts for idempotent calls.
//
// DefaultMaxIdemponentCallAttempts is used if not set.
Expand Down Expand Up @@ -315,6 +322,14 @@ type Client struct {

// StreamResponseBody enables response body streaming.
StreamResponseBody bool

// Whether the operating system should send tcp keep-alive messages on the tcp connection.
//
// By default tcp keep-alive connections are disabled.
//
// This option is used only if default TCP dialer is used,
// i.e. if Dial and DialTimeout are blank.
TCPKeepalive bool
}

// Get returns the status code and body of url.
Expand Down Expand Up @@ -527,6 +542,8 @@ func (c *Client) Do(req *Request, resp *Response) error {
MaxConns: c.MaxConnsPerHost,
MaxIdleConnDuration: c.MaxIdleConnDuration,
MaxConnDuration: c.MaxConnDuration,
TCPKeepalivePeriod: c.TCPKeepalivePeriod,
TCPKeepalive: c.TCPKeepalive,
MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
ReadBufferSize: c.ReadBufferSize,
WriteBufferSize: c.WriteBufferSize,
Expand Down Expand Up @@ -796,6 +813,13 @@ type HostClient struct {
// By default will not waiting, return ErrNoFreeConns immediately
MaxConnWaitTimeout time.Duration

// TCPKeepalivePeriod is the duration the connection needs to
// remain idle before TCP starts sending keepalive probes.
//
// This field is only effective when TCPKeepalive is set to true.
// A zero value indicates that the operating system's default setting will be used.
TCPKeepalivePeriod time.Duration

// Connection pool strategy. Can be either LIFO or FIFO (default).
ConnPoolStrategy ConnPoolStrategyType

Expand Down Expand Up @@ -871,6 +895,16 @@ type HostClient struct {
StreamResponseBody bool

connsCleanerRun bool

// Whether to enable tcp keep-alive connections.
//
// Whether the operating system should send tcp keep-alive messages on the tcp connection.
//
// By default tcp keep-alive connections are disabled.
//
// This option is used only if default TCP dialer is used,
// i.e. if Dial and DialTimeout are blank.
TCPKeepalive bool
}

type clientConn struct {
Expand Down Expand Up @@ -1886,6 +1920,22 @@ func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err
// use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or if
// c.DialTimeout has not been set and c.Dial has been set.
// attempt to dial all the available hosts before giving up.
if c.Dial == nil && c.DialTimeout == nil && c.TCPKeepalive {
d := &TCPDialer{Concurrency: defaultDialer.Concurrency, TCPKeepalive: true, TCPKeepalivePeriod: c.TCPKeepalivePeriod}
if dialTimeout > 0 {
if c.DialDualStack {
c.DialTimeout = d.DialDualStackTimeout
} else {
c.DialTimeout = d.DialTimeout
}
} else {
if c.DialDualStack {
c.Dial = d.DialDualStack
} else {
c.Dial = d.Dial
}
}
}

c.addrsLock.Lock()
n := len(c.addrs)
Expand Down Expand Up @@ -2326,6 +2376,12 @@ type pipelineConnClient struct {
WriteBufferSize int
ReadTimeout time.Duration
WriteTimeout time.Duration
// TCPKeepalivePeriod is the duration the connection needs to
// remain idle before TCP starts sending keepalive probes.
//
// This field is only effective when TCPKeepalive is set to true.
// A zero value indicates that the operating system's default setting will be used.
TCPKeepalivePeriod time.Duration

chLock sync.Mutex

Expand All @@ -2335,6 +2391,14 @@ type pipelineConnClient struct {
DisableHeaderNamesNormalizing bool
DisablePathNormalizing bool
IsTLS bool

// Whether the operating system should send tcp keep-alive messages on the tcp connection.
//
// By default tcp keep-alive connections are disabled.
//
// This option is used only if default TCP dialer is used,
// i.e. if Dial is blank.
TCPKeepalive bool
}

type pipelineWork struct {
Expand Down Expand Up @@ -2666,6 +2730,14 @@ func (c *pipelineConnClient) init() {

func (c *pipelineConnClient) worker() error {
tlsConfig := c.cachedTLSConfig()
if c.TCPKeepalive && c.Dial == nil {
d := &TCPDialer{Concurrency: defaultDialer.Concurrency, TCPKeepalive: true, TCPKeepalivePeriod: c.TCPKeepalivePeriod}
if c.DialDualStack {
c.Dial = d.DialDualStack
} else {
c.Dial = d.Dial
}
}
conn, err := dialAddr(c.Addr, c.Dial, nil, c.DialDualStack, c.IsTLS, tlsConfig, 0, c.WriteTimeout)
if err != nil {
return err
Expand Down
39 changes: 30 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,11 @@ type Server struct {
// worker pool of the Server. Idle workers beyond this time will be cleared.
MaxIdleWorkerDuration time.Duration

// Period between tcp keep-alive messages.
// TCPKeepalivePeriod is the duration the connection needs to
// remain idle before TCP starts sending keepalive probes.
//
// TCP keep-alive period is determined by operation system by default.
// This field is only effective when TCPKeepalive is set to true.
// A zero value indicates that the operating system's default setting will be used.
TCPKeepalivePeriod time.Duration

// Maximum request body size.
Expand Down Expand Up @@ -329,11 +331,14 @@ type Server struct {
// By default keep-alive connections are enabled.
DisableKeepalive bool

// Whether to enable tcp keep-alive connections.
//
// Whether the operating system should send tcp keep-alive messages on the tcp connection.
//
// By default tcp keep-alive connections are disabled.
// When this field is set to false, it is only effective if the net.Listener
// is created by this package itself. i.e. listened by ListenAndServe, ListenAndServeUNIX,
// ListenAndServeTLS and ListenAndServeTLSEmbed method.
//
// If you have created a net.Listener yourself and configured TCP keepalive,
// this field should be set to false to avoid redundant additional system calls.
TCPKeepalive bool

// Aggressively reduces memory usage at the cost of higher CPU usage
Expand Down Expand Up @@ -1623,7 +1628,7 @@ func (s *Server) getNextProto(c net.Conn) (proto string, err error) {
//
// Accepted connections are configured to enable TCP keep-alives.
func (s *Server) ListenAndServe(addr string) error {
ln, err := net.Listen("tcp4", addr)
ln, err := createTCPListener("tcp4", addr)
if err != nil {
return err
}
Expand All @@ -1639,7 +1644,7 @@ func (s *Server) ListenAndServeUNIX(addr string, mode os.FileMode) error {
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("unexpected error when trying to remove unix socket file %q: %w", addr, err)
}
ln, err := net.Listen("unix", addr)
ln, err := createTCPListener("unix", addr)
if err != nil {
return err
}
Expand All @@ -1661,7 +1666,7 @@ func (s *Server) ListenAndServeUNIX(addr string, mode os.FileMode) error {
//
// Accepted connections are configured to enable TCP keep-alives.
func (s *Server) ListenAndServeTLS(addr, certFile, keyFile string) error {
ln, err := net.Listen("tcp4", addr)
ln, err := createTCPListener("tcp4", addr)
if err != nil {
return err
}
Expand All @@ -1680,7 +1685,7 @@ func (s *Server) ListenAndServeTLS(addr, certFile, keyFile string) error {
//
// Accepted connections are configured to enable TCP keep-alives.
func (s *Server) ListenAndServeTLSEmbed(addr string, certData, keyData []byte) error {
ln, err := net.Listen("tcp4", addr)
ln, err := createTCPListener("tcp4", addr)
if err != nil {
return err
}
Expand Down Expand Up @@ -2983,3 +2988,19 @@ var stateName = map[ConnState]string{
func (c ConnState) String() string {
return stateName[c]
}

// If the net.Listener is created by us, we override the default TCP
// keepalive behavior of net.ListenConfig, so that whether TCP keepalive is
// enabled is entirely determined by Server.KeepAlive.
//
// In the implementation of the net package, TCPListener
// by default always enables KeepAlive for new connections.
func createTCPListener(network, addr string) (ln net.Listener, err error) {
var lc net.ListenConfig
// Overriding the default TCP keepalive behavior of
// net.ListenConfig, whether TCP keepalive is enabled
// is determined by Server.KeepAlive.
lc.KeepAlive = -1
ln, err = lc.Listen(context.Background(), network, addr)
return
}
27 changes: 27 additions & 0 deletions tcpdialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,22 @@ type TCPDialer struct {
// DNSCacheDuration may be used to override the default DNS cache duration (DefaultDNSCacheDuration)
DNSCacheDuration time.Duration

// TCPKeepalivePeriod is the duration the connection needs to
// remain idle before TCP starts sending keepalive probes.
//
// This field is only effective when TCPKeepalive is set to true.
// A zero value indicates that the operating system's default setting will be used.
TCPKeepalivePeriod time.Duration

once sync.Once

// DisableDNSResolution may be used to disable DNS resolution
DisableDNSResolution bool

// Whether the operating system should send tcp keep-alive messages on the tcp connection.
//
// By default tcp keep-alive connections are disabled.
TCPKeepalive bool
}

// Dial dials the given TCP addr using tcp4.
Expand Down Expand Up @@ -344,6 +356,9 @@ func (d *TCPDialer) tryDial(
if d.LocalAddr != nil {
dialer.LocalAddr = d.LocalAddr
}
// Overriding the default behavior of net.Dialer,
// whether TCP keepalive is enabled is determined by Dialer.KeepAlive.
dialer.KeepAlive = -1

ctx, cancelCtx := context.WithDeadline(context.Background(), deadline)
defer cancelCtx()
Expand All @@ -354,6 +369,18 @@ func (d *TCPDialer) tryDial(
}
return nil, wrapDialWithUpstream(err, addr)
}
if tc, ok := conn.(*net.TCPConn); ok && d.TCPKeepalive {
if err = tc.SetKeepAlive(d.TCPKeepalive); err != nil {
_ = tc.Close()
return nil, err
}
if d.TCPKeepalivePeriod > 0 {
if err = tc.SetKeepAlivePeriod(d.TCPKeepalivePeriod); err != nil {
_ = tc.Close()
return nil, err
}
}
}
return conn, nil
}

Expand Down