@@ -28,9 +28,11 @@ const (
2828 // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport.
2929 DefaultTimeoutScale = 256 * 1024 // 256KB
3030
31- // rpcMaxPipeline controls the maximum number of outstanding
32- // AppendEntries RPC calls.
33- rpcMaxPipeline = 128
31+ // DefaultMaxRPCsInFlight is the default value used for pipelining configuration
32+ // if a zero value is passed. See https://github.com/hashicorp/raft/pull/541
33+ // for rationale. Note, if this is changed we should update the doc comments
34+ // below for NetworkTransportConfig.MaxRPCsInFlight.
35+ DefaultMaxRPCsInFlight = 2
3436
3537 // connReceiveBufferSize is the size of the buffer we will use for reading RPC requests into
3638 // on followers
@@ -39,6 +41,16 @@ const (
3941 // connSendBufferSize is the size of the buffer we will use for sending RPC request data from
4042 // the leader to followers.
4143 connSendBufferSize = 256 * 1024 // 256KB
44+
45+ // minInFlightForPipelining is a property of our current pipelining
46+ // implementation and must not be changed unless we change the invariants of
47+ // that implementation. Roughly speaking even with a zero-length in-flight
48+ // buffer we still allow 2 requests to be in-flight before we block because we
49+ // only block after sending and the receiving go-routine always unblocks the
50+ // chan right after first send. This is a constant just to provide context
51+ // rather than a magic number in a few places we have to check invariants to
52+ // avoid panics etc.
53+ minInFlightForPipelining = 2
4254)
4355
4456var (
@@ -76,7 +88,8 @@ type NetworkTransport struct {
7688
7789 logger hclog.Logger
7890
79- maxPool int
91+ maxPool int
92+ maxInFlight int
8093
8194 serverAddressProvider ServerAddressProvider
8295
@@ -108,6 +121,39 @@ type NetworkTransportConfig struct {
108121 // MaxPool controls how many connections we will pool
109122 MaxPool int
110123
124+ // MaxRPCsInFlight controls the pipelining "optimization" when replicating
125+ // entries to followers.
126+ //
127+ // Setting this to 1 explicitly disables pipelining since no overlapping of
128+ // request processing is allowed. If set to 1 the pipelining code path is
129+ // skipped entirely and every request is entirely synchronous.
130+ //
131+ // If zero is set (or left as default), DefaultMaxRPCsInFlight is used which
132+ // is currently 2. A value of 2 overlaps the preparation and sending of the
133+ // next request while waiting for the previous response, but avoids additional
134+ // queuing.
135+ //
136+ // Historically this was internally fixed at (effectively) 130 however
137+ // performance testing has shown that in practice the pipelining optimization
138+ // combines badly with batching and actually has a very large negative impact
139+ // on commit latency when throughput is high, whilst having very little
140+ // benefit on latency or throughput in any other case! See
141+ // [#541](https://github.com/hashicorp/raft/pull/541) for more analysis of the
142+ // performance impacts.
143+ //
144+ // Increasing this beyond 2 is likely to be beneficial only in very
145+ // high-latency network conditions. HashiCorp doesn't recommend using our own
146+ // products this way.
147+ //
148+ // To maintain the behavior from before version 1.4.1 exactly, set this to
149+ // 130. The old internal constant was 128 but was used directly as a channel
150+ // buffer size. Since we send before blocking on the channel and unblock the
151+ // channel as soon as the receiver is done with the earliest outstanding
152+ // request, even an unbuffered channel (buffer=0) allows one request to be
153+ // sent while waiting for the previous one (i.e. 2 inflight). so the old
154+ // buffer actually allowed 130 RPCs to be inflight at once.
155+ MaxRPCsInFlight int
156+
111157 // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
112158 // the timeout by (SnapshotSize / TimeoutScale).
113159 Timeout time.Duration
@@ -162,11 +208,17 @@ func NewNetworkTransportWithConfig(
162208 Level : hclog .DefaultLevel ,
163209 })
164210 }
211+ maxInFlight := config .MaxRPCsInFlight
212+ if maxInFlight == 0 {
213+ // Default zero value
214+ maxInFlight = DefaultMaxRPCsInFlight
215+ }
165216 trans := & NetworkTransport {
166217 connPool : make (map [ServerAddress ][]* netConn ),
167218 consumeCh : make (chan RPC ),
168219 logger : config .Logger ,
169220 maxPool : config .MaxPool ,
221+ maxInFlight : maxInFlight ,
170222 shutdownCh : make (chan struct {}),
171223 stream : config .Stream ,
172224 timeout : config .Timeout ,
@@ -379,14 +431,20 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
379431// AppendEntriesPipeline returns an interface that can be used to pipeline
380432// AppendEntries requests.
381433func (n * NetworkTransport ) AppendEntriesPipeline (id ServerID , target ServerAddress ) (AppendPipeline , error ) {
434+ if n .maxInFlight < minInFlightForPipelining {
435+ // Pipelining is disabled since no more than one request can be outstanding
436+ // at once. Skip the whole code path and use synchronous requests.
437+ return nil , ErrPipelineReplicationNotSupported
438+ }
439+
382440 // Get a connection
383441 conn , err := n .getConnFromAddressProvider (id , target )
384442 if err != nil {
385443 return nil , err
386444 }
387445
388446 // Create the pipeline
389- return newNetPipeline (n , conn ), nil
447+ return newNetPipeline (n , conn , n . maxInFlight ), nil
390448}
391449
392450// AppendEntries implements the Transport interface.
@@ -720,14 +778,25 @@ func sendRPC(conn *netConn, rpcType uint8, args interface{}) error {
720778 return nil
721779}
722780
723- // newNetPipeline is used to construct a netPipeline from a given
724- // transport and connection.
725- func newNetPipeline (trans * NetworkTransport , conn * netConn ) * netPipeline {
781+ // newNetPipeline is used to construct a netPipeline from a given transport and
782+ // connection. It is a bug to ever call this with maxInFlight less than 2
783+ // (minInFlightForPipelining) and will cause a panic.
784+ func newNetPipeline (trans * NetworkTransport , conn * netConn , maxInFlight int ) * netPipeline {
785+ if maxInFlight < minInFlightForPipelining {
786+ // Shouldn't happen (tm) since we validate this in the one call site and
787+ // skip pipelining if it's lower.
788+ panic ("pipelining makes no sense if maxInFlight < 2" )
789+ }
726790 n := & netPipeline {
727- conn : conn ,
728- trans : trans ,
729- doneCh : make (chan AppendFuture , rpcMaxPipeline ),
730- inprogressCh : make (chan * appendFuture , rpcMaxPipeline ),
791+ conn : conn ,
792+ trans : trans ,
793+ // The buffer size is 2 less than the configured max because we send before
794+ // waiting on the channel and the decode routine unblocks the channel as
795+ // soon as it's waiting on the first request. So a zero-buffered channel
796+ // still allows 1 request to be sent even while decode is still waiting for
797+ // a response from the previous one. i.e. two are inflight at the same time.
798+ inprogressCh : make (chan * appendFuture , maxInFlight - 2 ),
799+ doneCh : make (chan AppendFuture , maxInFlight - 2 ),
731800 shutdownCh : make (chan struct {}),
732801 }
733802 go n .decodeResponses ()
0 commit comments