Skip to content

Commit 770af28

Browse files
authored
[exporter/loadbalancing] Use Linear probing to deal with hash collisions, and increase uniformity of endpoint distribution (open-telemetry#41271)
#### Description Updates the ring hash algorithm to use linear probing to decrease the variance of endpoints chosen due to hash collisions. Also updates it to use 4 bytes instead of one - this would cause collision when using more than 256 endpoints, since the byte would overflow. These changes are discussed and benchmarked in: open-telemetry#41200. The benchmarks showed clear benefits for negligible resource consumption. The only actual changes to main code in the PR is that in `consistent_hashing.go`. The rest is just changing the expectations of the test to match the altered hashing algorithm. #### Link to tracking issue Partially fixes open-telemetry#41200 #### Testing The tests needed to be re-arranged. This was really quite tedious, since they all depend on the specific randomness of the existing hashing algorithm. So I had to re arrange some of the test's expectations with that, too. At some point, they should be redesigned to be more deterministic so more work can easily be done on the hash ring in the future.
1 parent 55dade9 commit 770af28

File tree

10 files changed

+123
-128
lines changed

10 files changed

+123
-128
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
change_type: enhancement
2+
component: loadbalancingexporter
3+
note: Use a linear probe to decrease variance caused by hash collisions, which was causing a non-uniform distribution of loadbalancing.
4+
issues: [41200]
5+
change_logs: [user]

exporter/loadbalancingexporter/consistent_hashing.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
55

66
import (
7+
"encoding/binary"
78
"hash/crc32"
89
"sort"
910
)
1011

1112
const (
12-
maxPositions uint32 = 36000 // 360 degrees with two decimal places
13-
defaultWeight int = 100 // the number of points in the ring for each entry. For better results, it should be greater than 100.
13+
maxPositions uint32 = 36000 // 360 degrees with two decimal places
14+
defaultWeight int = 100 // the number of points in the ring for each entry. For better results, it should be greater than 100.
15+
linearProbeLimit int = 10 // The number of times to probe ahead in the hash ring if there is a collision while constructing the hash ring
1416
)
1517

1618
// position represents a specific angle in the ring.
@@ -103,10 +105,12 @@ func bsearch(pos position, left []ringItem, right []ringItem) ringItem {
103105
// The slice length of the result matches the numPoints.
104106
func positionsFor(endpoint string, numPoints int) []position {
105107
res := make([]position, 0, numPoints)
108+
buf := make([]byte, 4)
106109
for i := 0; i < numPoints; i++ {
107110
h := crc32.NewIEEE()
111+
binary.LittleEndian.PutUint32(buf, uint32(i))
108112
h.Write([]byte(endpoint))
109-
h.Write([]byte{byte(i)})
113+
h.Write(buf)
110114
hash := h.Sum32()
111115
pos := hash % maxPositions
112116
res = append(res, position(pos))
@@ -122,14 +126,21 @@ func positionsForEndpoints(endpoints []string, weight int) []ringItem {
122126
for _, endpoint := range endpoints {
123127
// for this initial implementation, we don't allow endpoints to have custom weights
124128
for _, pos := range positionsFor(endpoint, weight) {
125-
// if this position is occupied already, skip this item
126-
if _, found := positions[pos]; found {
127-
continue
129+
// if this position is occupied already, look ahead in the array for a free position
130+
actualPos := pos
131+
positionsProbed := 0
132+
for positions[actualPos] && positionsProbed < linearProbeLimit {
133+
actualPos = (actualPos + 1) % position(maxPositions)
134+
positionsProbed++
128135
}
129-
positions[pos] = true
136+
if positionsProbed >= linearProbeLimit {
137+
continue // Not able to find a free spot; skip this item
138+
}
139+
140+
positions[actualPos] = true
130141

131142
item := ringItem{
132-
pos: pos,
143+
pos: actualPos,
133144
endpoint: endpoint,
134145
}
135146
items = append(items, item)

exporter/loadbalancingexporter/consistent_hashing_test.go

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ func TestEndpointFor(t *testing.T) {
3131
expected string
3232
}{
3333
// check that we are indeed alternating endpoints for different inputs
34-
{[]byte{1, 2, 0, 0}, "endpoint-1"},
35-
{[]byte{128, 128, 0, 0}, "endpoint-2"},
36-
{[]byte("ad-service-7"), "endpoint-1"},
37-
{[]byte("get-recommendations-1"), "endpoint-2"},
34+
{[]byte{1, 2, 0, 0}, "endpoint-2"},
35+
{[]byte{128, 128, 0, 0}, "endpoint-1"},
36+
{[]byte("ad-service-7"), "endpoint-2"},
37+
{[]byte("get-recommendations-1"), "endpoint-1"},
3838
} {
3939
t.Run(fmt.Sprintf("Endpoint for id %s", string(tt.id)), func(t *testing.T) {
4040
// test
@@ -106,40 +106,47 @@ func TestPositionsForEndpoints(t *testing.T) {
106106
[]string{"endpoint-1"},
107107
[]ringItem{
108108
// this was first calculated by running the algorithm and taking its output
109-
{pos: 1401, endpoint: "endpoint-1"},
110-
{pos: 4175, endpoint: "endpoint-1"},
111-
{pos: 14133, endpoint: "endpoint-1"},
112-
{pos: 17836, endpoint: "endpoint-1"},
113-
{pos: 21667, endpoint: "endpoint-1"},
109+
{pos: 0x21ca, endpoint: "endpoint-1"},
110+
{pos: 0x29d3, endpoint: "endpoint-1"},
111+
{pos: 0x3984, endpoint: "endpoint-1"},
112+
{pos: 0x5eaf, endpoint: "endpoint-1"},
113+
{pos: 0x8bc1, endpoint: "endpoint-1"},
114114
},
115115
},
116116
{
117117
"Duplicate Endpoint",
118118
[]string{"endpoint-1", "endpoint-1"},
119119
[]ringItem{
120-
// we expect to not have duplicate items
121-
{pos: 1401, endpoint: "endpoint-1"},
122-
{pos: 4175, endpoint: "endpoint-1"},
123-
{pos: 14133, endpoint: "endpoint-1"},
124-
{pos: 17836, endpoint: "endpoint-1"},
125-
{pos: 21667, endpoint: "endpoint-1"},
120+
// We expect to not have duplicate items.
121+
// When a clash occurs, the next free positions should be taken. In this case, there will always be
122+
// exactly one clash because of duplicate endpoints. So, the pos will always be i and i+1.
123+
{pos: 0x21ca, endpoint: "endpoint-1"},
124+
{pos: 0x21cb, endpoint: "endpoint-1"},
125+
{pos: 0x29d3, endpoint: "endpoint-1"},
126+
{pos: 0x29d4, endpoint: "endpoint-1"},
127+
{pos: 0x3984, endpoint: "endpoint-1"},
128+
{pos: 0x3985, endpoint: "endpoint-1"},
129+
{pos: 0x5eaf, endpoint: "endpoint-1"},
130+
{pos: 0x5eb0, endpoint: "endpoint-1"},
131+
{pos: 0x8bc1, endpoint: "endpoint-1"},
132+
{pos: 0x8bc2, endpoint: "endpoint-1"},
126133
},
127134
},
128135
{
129136
"Multiple Endpoints",
130-
[]string{"endpoint-1", "endpoint-2"},
137+
[]string{"endpoint-A", "endpoint-B"},
131138
[]ringItem{
132139
// we expect to have 5 positions for each endpoint
133-
{pos: 1401, endpoint: "endpoint-1"},
134-
{pos: 4175, endpoint: "endpoint-1"},
135-
{pos: 10240, endpoint: "endpoint-2"},
136-
{pos: 14133, endpoint: "endpoint-1"},
137-
{pos: 15002, endpoint: "endpoint-2"},
138-
{pos: 17836, endpoint: "endpoint-1"},
139-
{pos: 21263, endpoint: "endpoint-2"},
140-
{pos: 21667, endpoint: "endpoint-1"},
141-
{pos: 26806, endpoint: "endpoint-2"},
142-
{pos: 27020, endpoint: "endpoint-2"},
140+
{pos: 0xdde, endpoint: "endpoint-B"},
141+
{pos: 0x162e, endpoint: "endpoint-A"},
142+
{pos: 0x21f5, endpoint: "endpoint-B"},
143+
{pos: 0x34e5, endpoint: "endpoint-A"},
144+
{pos: 0x61fb, endpoint: "endpoint-B"},
145+
{pos: 0x6910, endpoint: "endpoint-B"},
146+
{pos: 0x76a0, endpoint: "endpoint-A"},
147+
{pos: 0x7e2b, endpoint: "endpoint-A"},
148+
{pos: 0x7f7c, endpoint: "endpoint-A"},
149+
{pos: 0x85ac, endpoint: "endpoint-B"},
143150
},
144151
},
145152
} {

exporter/loadbalancingexporter/loadbalancer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,14 +399,14 @@ func TestFailedExporterInRing(t *testing.T) {
399399

400400
// test
401401
// this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info
402-
_, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0})
402+
_, _, err = p.exporterAndEndpoint([]byte{128, 128, 1, 0})
403403

404404
// verify
405405
assert.Error(t, err)
406406

407407
// test
408408
// this service name will reach the endpoint-2 -- see the consistent hashing tests for more info
409-
_, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1"))
409+
_, _, err = p.exporterAndEndpoint([]byte("get-recommendations-2"))
410410

411411
// verify
412412
assert.Error(t, err)

exporter/loadbalancingexporter/metrics_exporter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ const (
5050
signal1Attr3Value = true
5151
signal1Attr4Key = "sigattr4k"
5252
signal1Attr4Value = 3.3
53-
serviceName1 = "service-name-1"
54-
serviceName2 = "service-name-2"
53+
serviceName1 = "service-name-01"
54+
serviceName2 = "service-name-02"
5555
)
5656

5757
func TestNewMetricsExporter(t *testing.T) {

exporter/loadbalancingexporter/testdata/metrics/consume_metrics/triple_endpoint/metric_name/output.yaml

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ endpoint-1:
5555
value:
5656
stringValue: bbb
5757
endpoint-2:
58+
resourceMetrics: []
59+
endpoint-3:
5860
resourceMetrics:
5961
- schemaUrl: https://test-res-schema.com/schema
6062
resource:
@@ -83,41 +85,23 @@ endpoint-2:
8385
- key: aaa
8486
value:
8587
stringValue: bbb
86-
- schemaUrl: https://test-res-schema.com/schema
87-
resource:
88-
attributes:
89-
- key: resource_key
90-
value:
91-
stringValue: bar
92-
scopeMetrics:
93-
- schemaUrl: https://test-scope-schema.com/schema
94-
scope:
95-
name: MyTestInstrument
96-
version: "1.2.3"
97-
attributes:
98-
- key: scope_key
99-
value:
100-
stringValue: foo
101-
metrics:
102-
- name: first.monotonic.sum
88+
- name: second.monotonic.sum
10389
sum:
10490
aggregationTemporality: 2
10591
isMonotonic: true
10692
dataPoints:
107-
- timeUnixNano: 80
108-
asDouble: 444
93+
- timeUnixNano: 50
94+
asDouble: 945
10995
attributes:
11096
- key: aaa
11197
value:
11298
stringValue: bbb
113-
endpoint-3:
114-
resourceMetrics:
11599
- schemaUrl: https://test-res-schema.com/schema
116100
resource:
117101
attributes:
118102
- key: resource_key
119103
value:
120-
stringValue: foo
104+
stringValue: bar
121105
scopeMetrics:
122106
- schemaUrl: https://test-scope-schema.com/schema
123107
scope:
@@ -128,33 +112,17 @@ endpoint-3:
128112
value:
129113
stringValue: foo
130114
metrics:
131-
- name: second.monotonic.sum
115+
- name: first.monotonic.sum
132116
sum:
133117
aggregationTemporality: 2
134118
isMonotonic: true
135119
dataPoints:
136-
- timeUnixNano: 50
137-
asDouble: 945
120+
- timeUnixNano: 80
121+
asDouble: 444
138122
attributes:
139123
- key: aaa
140124
value:
141125
stringValue: bbb
142-
- schemaUrl: https://test-res-schema.com/schema
143-
resource:
144-
attributes:
145-
- key: resource_key
146-
value:
147-
stringValue: bar
148-
scopeMetrics:
149-
- schemaUrl: https://test-scope-schema.com/schema
150-
scope:
151-
name: MyTestInstrument
152-
version: "1.2.3"
153-
attributes:
154-
- key: scope_key
155-
value:
156-
stringValue: foo
157-
metrics:
158126
- name: second.monotonic.sum
159127
sum:
160128
aggregationTemporality: 2
@@ -165,4 +133,4 @@ endpoint-3:
165133
attributes:
166134
- key: aaa
167135
value:
168-
stringValue: bbb
136+
stringValue: bbb

exporter/loadbalancingexporter/testdata/metrics/consume_metrics/triple_endpoint/resource_id/output.yaml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
endpoint-1:
2+
resourceMetrics: []
3+
endpoint-2:
24
resourceMetrics:
35
- schemaUrl: https://test-res-schema.com/schema
46
resource:
57
attributes:
68
- key: resource_key
79
value:
8-
stringValue: asdf
10+
stringValue: foo
911
scopeMetrics:
1012
- schemaUrl: https://test-scope-schema.com/schema
1113
scope:
@@ -21,13 +23,13 @@ endpoint-1:
2123
aggregationTemporality: 2
2224
isMonotonic: true
2325
dataPoints:
24-
- timeUnixNano: 90
25-
asDouble: 666
26+
- timeUnixNano: 50
27+
asDouble: 333
2628
attributes:
2729
- key: aaa
2830
value:
2931
stringValue: bbb
30-
endpoint-2:
32+
endpoint-3:
3133
resourceMetrics:
3234
- schemaUrl: https://test-res-schema.com/schema
3335
resource:
@@ -56,14 +58,12 @@ endpoint-2:
5658
- key: aaa
5759
value:
5860
stringValue: bbb
59-
endpoint-3:
60-
resourceMetrics:
6161
- schemaUrl: https://test-res-schema.com/schema
6262
resource:
6363
attributes:
6464
- key: resource_key
6565
value:
66-
stringValue: foo
66+
stringValue: asdf
6767
scopeMetrics:
6868
- schemaUrl: https://test-scope-schema.com/schema
6969
scope:
@@ -79,8 +79,8 @@ endpoint-3:
7979
aggregationTemporality: 2
8080
isMonotonic: true
8181
dataPoints:
82-
- timeUnixNano: 50
83-
asDouble: 333
82+
- timeUnixNano: 90
83+
asDouble: 666
8484
attributes:
8585
- key: aaa
8686
value:

exporter/loadbalancingexporter/testdata/metrics/consume_metrics/triple_endpoint/resource_service_name/input.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ resourceMetrics:
44
attributes:
55
- key: service.name
66
value:
7-
stringValue: serviceA
7+
stringValue: service-alpha
88
scopeMetrics:
99
- schemaUrl: https://test-scope-schema.com/schema
1010
scope:
@@ -31,7 +31,7 @@ resourceMetrics:
3131
attributes:
3232
- key: service.name
3333
value:
34-
stringValue: serviceB
34+
stringValue: service-beta
3535
scopeMetrics:
3636
- schemaUrl: https://test-scope-schema.com/schema
3737
scope:
@@ -58,7 +58,7 @@ resourceMetrics:
5858
attributes:
5959
- key: service.name
6060
value:
61-
stringValue: serviceC
61+
stringValue: service-gamma
6262
scopeMetrics:
6363
- schemaUrl: https://test-scope-schema.com/schema
6464
scope:

0 commit comments

Comments
 (0)