Skip to content

Commit 12eb1c5

Browse files
ahmet2mirjeevatkm
authored andcommitted
feat(hedging): support hedging in order to reduce latency when quering endpoints (#1081)
* feat(hedging): support hedging in order to reduce latency when quering endpoints Signed-off-by: Ahmet DEMIR <me@ahmet2mir.eu> * fix(hedging): move fields to struct, remove ratelimit, fail on sethedging func if not enabled, restore retry, allow unsafe funcs Signed-off-by: Ahmet DEMIR <me@ahmet2mir.eu> * fix(hedging): adapt suggestions Signed-off-by: Ahmet DEMIR <me@ahmet2mir.eu> --------- Signed-off-by: Ahmet DEMIR <me@ahmet2mir.eu>
1 parent 72c86c0 commit 12eb1c5

File tree

6 files changed

+1005
-2
lines changed

6 files changed

+1005
-2
lines changed

BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ go_library(
1313
"curl.go",
1414
"debug.go",
1515
"digest.go",
16+
"hedging.go",
1617
"load_balancer.go",
1718
"middleware.go",
1819
"multipart.go",
@@ -42,6 +43,7 @@ go_test(
4243
"context_test.go",
4344
"curl_test.go",
4445
"digest_test.go",
46+
"hedging_test.go",
4547
"load_balancer_test.go",
4648
"middleware_test.go",
4749
"multipart_test.go",

client.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ type Client struct {
227227
contentDecompressers map[string]ContentDecompresser
228228
certWatcherStopChan chan bool
229229
circuitBreaker *CircuitBreaker
230+
hedging *hedgingConfig
230231
}
231232

232233
// CertWatcherOptions allows configuring a watcher that reloads dynamically TLS certs.
@@ -1469,6 +1470,205 @@ func (c *Client) AddRetryHooks(hooks ...RetryHookFunc) *Client {
14691470
return c
14701471
}
14711472

1473+
// isHedgingEnabled method returns true if hedging is enabled and get client clock.
1474+
func (c *Client) IsHedgingEnabled() bool {
1475+
c.lock.RLock()
1476+
defer c.lock.RUnlock()
1477+
return c.isHedgingEnabled()
1478+
}
1479+
1480+
// isHedgingEnabled method returns true if hedging is enabled.
1481+
func (c *Client) isHedgingEnabled() bool {
1482+
return c.hedging != nil && c.hedging.enabled
1483+
}
1484+
1485+
// SetHedgingDelay method sets the delay between hedged requests.
1486+
func (c *Client) SetHedgingDelay(delay time.Duration) *Client {
1487+
c.lock.Lock()
1488+
defer c.lock.Unlock()
1489+
if c.isHedgingEnabled() {
1490+
c.hedging.delay = delay
1491+
return c
1492+
}
1493+
c.log.Errorf("SetHedgingDelay: %v", ErrHedgingDisabled)
1494+
return c
1495+
}
1496+
1497+
// HedgingDelay method returns the configured hedging delay.
1498+
func (c *Client) HedgingDelay() time.Duration {
1499+
c.lock.RLock()
1500+
defer c.lock.RUnlock()
1501+
if c.isHedgingEnabled() {
1502+
return c.hedging.delay
1503+
}
1504+
return hedgingDefaultDelay
1505+
}
1506+
1507+
// SetHedgingUpTo method sets maximum concurrent hedged requests.
1508+
func (c *Client) SetHedgingUpTo(upTo int) *Client {
1509+
c.lock.Lock()
1510+
defer c.lock.Unlock()
1511+
if c.isHedgingEnabled() {
1512+
c.hedging.upTo = upTo
1513+
return c
1514+
}
1515+
c.log.Errorf("SetHedgingUpTo: %v", ErrHedgingDisabled)
1516+
return c
1517+
}
1518+
1519+
// HedgingUpTo method returns the maximum concurrent requests.
1520+
func (c *Client) HedgingUpTo() int {
1521+
c.lock.RLock()
1522+
defer c.lock.RUnlock()
1523+
if c.isHedgingEnabled() {
1524+
return c.hedging.upTo
1525+
}
1526+
return hedgingDefaultUpTo
1527+
}
1528+
1529+
// SetHedgingMaxPerSecond method sets rate limit for hedged requests.
1530+
func (c *Client) SetHedgingMaxPerSecond(maxPerSecond float64) *Client {
1531+
c.lock.Lock()
1532+
defer c.lock.Unlock()
1533+
if c.isHedgingEnabled() {
1534+
c.hedging.maxPerSecond = maxPerSecond
1535+
return c
1536+
}
1537+
c.log.Errorf("SetHedgingMaxPerSecond: %v", ErrHedgingDisabled)
1538+
return c
1539+
}
1540+
1541+
// HedgingMaxPerSecond method returns the hedging rate limit.
1542+
func (c *Client) HedgingMaxPerSecond() float64 {
1543+
c.lock.RLock()
1544+
defer c.lock.RUnlock()
1545+
if c.isHedgingEnabled() {
1546+
return c.hedging.maxPerSecond
1547+
}
1548+
return hedgingDefaultMaxPerSecond
1549+
}
1550+
1551+
// SetHedgingAllowNonReadOnly method allows hedging for non-read-only HTTP methods.
1552+
// By default, only read-only methods (GET, HEAD, OPTIONS, TRACE) are hedged.
1553+
// NOTE:
1554+
// - Use this with caution as hedging write operations can lead to duplicates.
1555+
func (c *Client) SetHedgingAllowNonReadOnly(allow bool) *Client {
1556+
c.lock.Lock()
1557+
defer c.lock.Unlock()
1558+
1559+
if c.isHedgingEnabled() {
1560+
c.hedging.allowNonReadOnly = allow
1561+
// Re-wrap to apply new settings
1562+
c.unwrapHedgingTransport()
1563+
c.wrapTransportWithHedging()
1564+
return c
1565+
}
1566+
c.log.Errorf("SetHedgingAllowNonReadOnly: %v", ErrHedgingDisabled)
1567+
return c
1568+
}
1569+
1570+
// IsHedgingAllowNonReadOnly method returns true if hedging is enabled for non-read-only methods.
1571+
func (c *Client) IsHedgingAllowNonReadOnly() bool {
1572+
c.lock.RLock()
1573+
defer c.lock.RUnlock()
1574+
if c.isHedgingEnabled() {
1575+
return c.hedging.allowNonReadOnly
1576+
}
1577+
return hedgingDefaultAllowNonReadOnly
1578+
}
1579+
1580+
// EnableHedging method enables hedging with the given configuration.
1581+
//
1582+
// Hedging sends multiple concurrent requests with staggered delays and returns
1583+
// the first response to complete to reduce tail latency. Only read-only HTTP methods
1584+
// (GET, HEAD, OPTIONS, TRACE) are hedged by default unless SetHedgingAllowNonReadOnly is used.
1585+
//
1586+
// client.EnableHedging(
1587+
// 50*time.Millisecond, // delay between requests
1588+
// 3, // max 3 concurrent requests
1589+
// 10.0, // max 10 hedged requests per second
1590+
// )
1591+
// Last one come from rate package, to use fractional rates, e.g.
1592+
// - 0.1 = 1 request every 10 seconds
1593+
// - 0.5 = 1 request every 2 seconds
1594+
// - 1.0 = 1 request per second
1595+
// - 2.5 = 2.5 requests per second (5 requests every 2 seconds)
1596+
// - 10.0 = 10 requests per second
1597+
func (c *Client) EnableHedging(delay time.Duration, upTo int, maxPerSecond float64) *Client {
1598+
c.lock.Lock()
1599+
defer c.lock.Unlock()
1600+
1601+
if c.hedging == nil {
1602+
c.hedging = &hedgingConfig{}
1603+
}
1604+
1605+
c.hedging.delay = delay
1606+
c.hedging.upTo = upTo
1607+
c.hedging.maxPerSecond = maxPerSecond
1608+
c.hedging.enabled = true
1609+
1610+
// Disable retry by default when hedging is enabled.
1611+
// Users can re-enable retry if they want it as a fallback mechanism.
1612+
if c.retryCount > 0 {
1613+
c.log.Warnf("Disabling retry (count: %d) as hedging is now enabled. You can re-enable retry with SetRetryCount() if you want it as a fallback.", c.retryCount)
1614+
c.retryCount = 0
1615+
}
1616+
1617+
c.wrapTransportWithHedging()
1618+
1619+
return c
1620+
}
1621+
1622+
// DisableHedging method disables hedging.
1623+
func (c *Client) DisableHedging() *Client {
1624+
c.lock.Lock()
1625+
defer c.lock.Unlock()
1626+
1627+
if c.isHedgingEnabled() {
1628+
c.hedging.enabled = false
1629+
}
1630+
1631+
c.unwrapHedgingTransport()
1632+
1633+
return c
1634+
}
1635+
1636+
func (c *Client) wrapTransportWithHedging() {
1637+
if c.hedging == nil || !c.hedging.enabled {
1638+
return
1639+
}
1640+
1641+
currentTransport := c.httpClient.Transport
1642+
if currentTransport == nil {
1643+
currentTransport = createTransport(nil, nil)
1644+
}
1645+
1646+
// Already set
1647+
if _, ok := currentTransport.(*hedgingTransport); ok {
1648+
return
1649+
}
1650+
1651+
// Calculate rate delay: if maxPerSecond is 10, delay is 100ms (1s / 10)
1652+
var rateDelay time.Duration
1653+
if c.hedging.maxPerSecond > 0 {
1654+
rateDelay = time.Duration(float64(time.Second) / c.hedging.maxPerSecond)
1655+
}
1656+
1657+
c.httpClient.Transport = &hedgingTransport{
1658+
transport: currentTransport,
1659+
delay: c.hedging.delay,
1660+
upTo: c.hedging.upTo,
1661+
rateDelay: rateDelay,
1662+
allowNonReadOnly: c.hedging.allowNonReadOnly,
1663+
}
1664+
}
1665+
1666+
func (c *Client) unwrapHedgingTransport() {
1667+
if ht, ok := c.httpClient.Transport.(*hedgingTransport); ok {
1668+
c.httpClient.Transport = ht.transport
1669+
}
1670+
}
1671+
14721672
// TLSClientConfig method returns the [tls.Config] from underlying client transport
14731673
// otherwise returns nil
14741674
func (c *Client) TLSClientConfig() *tls.Config {

client_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,3 +1533,126 @@ func TestClientOnCloseMultipleHooks(t *testing.T) {
15331533
assertNil(t, err)
15341534
assertEqual(t, []string{"first", "second", "third"}, executionOrder)
15351535
}
1536+
1537+
func TestClientHedgingBasic(t *testing.T) {
1538+
var attemptCount int32
1539+
ts := createHedgingTestServer(t, &attemptCount, 0, 0)
1540+
defer ts.Close()
1541+
1542+
c := dcnl()
1543+
c.EnableHedging(20*time.Millisecond, 3, 0)
1544+
1545+
resp, err := c.R().Get(ts.URL + "/hedging-slow-first")
1546+
assertError(t, err)
1547+
assertEqual(t, http.StatusOK, resp.StatusCode())
1548+
assertNotEqual(t, "", resp.String())
1549+
}
1550+
1551+
func TestClientHedgingDisable(t *testing.T) {
1552+
var attemptCount int32
1553+
ts := createHedgingTestServer(t, &attemptCount, 0, 0)
1554+
defer ts.Close()
1555+
1556+
c := dcnl()
1557+
c.EnableHedging(20*time.Millisecond, 3, 0)
1558+
assertEqual(t, true, c.IsHedgingEnabled())
1559+
1560+
c.DisableHedging()
1561+
assertEqual(t, false, c.IsHedgingEnabled())
1562+
1563+
resp, err := c.R().Get(ts.URL + "/hedging-slow-first")
1564+
assertError(t, err)
1565+
assertEqual(t, http.StatusOK, resp.StatusCode())
1566+
}
1567+
1568+
func TestClientHedgingNil(t *testing.T) {
1569+
c := dcnl()
1570+
c.hedging = nil
1571+
c.wrapTransportWithHedging()
1572+
1573+
assertEqual(t, false, c.IsHedgingEnabled())
1574+
1575+
_, ok := c.httpClient.Transport.(*hedgingTransport)
1576+
if ok {
1577+
t.Error("Transport shouldn't be hedgingTransport when hedging is nil")
1578+
}
1579+
}
1580+
1581+
func TestClientHedgingMutualExclusionWithRetry(t *testing.T) {
1582+
c := dcnl()
1583+
1584+
// Set retry first
1585+
c.SetRetryCount(2)
1586+
assertEqual(t, 2, c.RetryCount())
1587+
1588+
// Enable hedging should disable retry by default
1589+
c.EnableHedging(50*time.Millisecond, 3, 0)
1590+
assertEqual(t, 0, c.RetryCount())
1591+
1592+
// But user can re-enable retry as fallback
1593+
c.SetRetryCount(1)
1594+
assertEqual(t, 1, c.RetryCount())
1595+
assertEqual(t, true, c.IsHedgingEnabled())
1596+
1597+
// Disable hedging
1598+
c.DisableHedging()
1599+
assertEqual(t, false, c.IsHedgingEnabled())
1600+
assertEqual(t, 1, c.RetryCount()) // Retry count should remain
1601+
}
1602+
1603+
func TestClientHedgingConfiguration(t *testing.T) {
1604+
c := dcnl()
1605+
1606+
// Setters require hedging to be enabled first
1607+
assertEqual(t, false, c.IsHedgingEnabled())
1608+
1609+
c.EnableHedging(50*time.Millisecond, 3, 10.0)
1610+
1611+
assertEqual(t, true, c.IsHedgingEnabled())
1612+
assertEqual(t, 50*time.Millisecond, c.HedgingDelay())
1613+
assertEqual(t, 3, c.HedgingUpTo())
1614+
assertEqual(t, 10.0, c.HedgingMaxPerSecond())
1615+
1616+
// Now we can update individual settings
1617+
c.SetHedgingDelay(100 * time.Millisecond)
1618+
assertEqual(t, 100*time.Millisecond, c.HedgingDelay())
1619+
1620+
c.SetHedgingUpTo(5)
1621+
assertEqual(t, 5, c.HedgingUpTo())
1622+
1623+
c.SetHedgingMaxPerSecond(20.0)
1624+
assertEqual(t, 20.0, c.HedgingMaxPerSecond())
1625+
}
1626+
1627+
func TestClientHedgingWithRateLimit(t *testing.T) {
1628+
var attemptCount int32
1629+
ts := createHedgingTestServer(t, &attemptCount, 0, 0)
1630+
defer ts.Close()
1631+
1632+
c := dcnl()
1633+
c.EnableHedging(10*time.Millisecond, 10, 5.0)
1634+
1635+
resp, err := c.R().Get(ts.URL + "/hedging-slow-all")
1636+
assertError(t, err)
1637+
assertEqual(t, http.StatusOK, resp.StatusCode())
1638+
}
1639+
1640+
func TestClientHedgingSafeMethodsOnly(t *testing.T) {
1641+
ts := createGetServer(t)
1642+
defer ts.Close()
1643+
1644+
c := dcnl()
1645+
c.EnableHedging(20*time.Millisecond, 3, 0)
1646+
1647+
resp, err := c.R().Get(ts.URL + "/")
1648+
assertError(t, err)
1649+
assertEqual(t, http.StatusOK, resp.StatusCode())
1650+
1651+
resp2, err2 := c.R().Head(ts.URL + "/")
1652+
assertError(t, err2)
1653+
assertEqual(t, http.StatusOK, resp2.StatusCode())
1654+
1655+
resp3, err3 := c.R().Options(ts.URL + "/")
1656+
assertError(t, err3)
1657+
assertEqual(t, http.StatusOK, resp3.StatusCode())
1658+
}

0 commit comments

Comments
 (0)