Skip to content

Commit 0c20bdd

Browse files
authored
[Backport 7.13] Discovery should filter master only nodes (#270)
* Discovery: filter out master only nodes * Transport: Update discovery tests to reflect update on discovery
1 parent 5c9e449 commit 0c20bdd

File tree

3 files changed

+222
-11
lines changed

3 files changed

+222
-11
lines changed

estransport/discovery.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type nodeInfo struct {
4343
ID string
4444
Name string
4545
URL *url.URL
46-
Roles []string
46+
Roles []string `json:"roles"`
4747
Attributes map[string]interface{}
4848
HTTP struct {
4949
PublishAddress string `json:"publish_address"`
@@ -65,31 +65,27 @@ func (c *Client) DiscoverNodes() error {
6565

6666
for _, node := range nodes {
6767
var (
68-
isDataNode bool
69-
isIngestNode bool
68+
isMasterOnlyNode bool
7069
)
7170

7271
roles := append(node.Roles[:0:0], node.Roles...)
7372
sort.Strings(roles)
7473

75-
if i := sort.SearchStrings(roles, "data"); i < len(roles) && roles[i] == "data" {
76-
isDataNode = true
77-
}
78-
if i := sort.SearchStrings(roles, "ingest"); i < len(roles) && roles[i] == "ingest" {
79-
isIngestNode = true
74+
if len(roles) == 1 && roles[0] == "master" {
75+
isMasterOnlyNode = true
8076
}
8177

8278
if debugLogger != nil {
8379
var skip string
84-
if !isDataNode || !isIngestNode {
80+
if isMasterOnlyNode {
8581
skip = "; [SKIP]"
8682
}
8783
debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip)
8884
}
8985

9086
// Skip master only nodes
9187
// TODO(karmi): Move logic to Selector?
92-
if !isDataNode || !isIngestNode {
88+
if isMasterOnlyNode {
9389
continue
9490
}
9591

estransport/discovery_internal_test.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@
2020
package estransport
2121

2222
import (
23+
"bytes"
2324
"crypto/tls"
25+
"encoding/json"
2426
"fmt"
2527
"io"
28+
"io/ioutil"
2629
"net/http"
2730
"net/url"
2831
"os"
32+
"reflect"
2933
"testing"
3034
"time"
3135
)
@@ -186,4 +190,215 @@ func TestDiscovery(t *testing.T) {
186190
t.Errorf("Unexpected number of nodes, want=2, got=%d", numURLs)
187191
}
188192
})
193+
194+
t.Run("Role based nodes discovery", func(t *testing.T) {
195+
type Node struct {
196+
URL string
197+
Roles []string
198+
}
199+
200+
type fields struct {
201+
Nodes map[string]Node
202+
}
203+
type wants struct {
204+
wantErr bool
205+
wantsNConn int
206+
}
207+
tests := []struct {
208+
name string
209+
args fields
210+
want wants
211+
}{
212+
{
213+
"Default roles should allow every node to be selected",
214+
fields{
215+
Nodes: map[string]Node{
216+
"es1": {
217+
URL: "http://es1:9200",
218+
Roles: []string{
219+
"data",
220+
"data_cold",
221+
"data_content",
222+
"data_frozen",
223+
"data_hot",
224+
"data_warm",
225+
"ingest",
226+
"master",
227+
"ml",
228+
"remote_cluster_client",
229+
"transform",
230+
},
231+
},
232+
"es2": {
233+
URL: "http://es2:9200",
234+
Roles: []string{
235+
"data",
236+
"data_cold",
237+
"data_content",
238+
"data_frozen",
239+
"data_hot",
240+
"data_warm",
241+
"ingest",
242+
"master",
243+
"ml",
244+
"remote_cluster_client",
245+
"transform",
246+
},
247+
},
248+
"es3": {
249+
URL: "http://es3:9200",
250+
Roles: []string{
251+
"data",
252+
"data_cold",
253+
"data_content",
254+
"data_frozen",
255+
"data_hot",
256+
"data_warm",
257+
"ingest",
258+
"master",
259+
"ml",
260+
"remote_cluster_client",
261+
"transform",
262+
},
263+
},
264+
},
265+
},
266+
wants{
267+
false, 3,
268+
},
269+
},
270+
{
271+
"Master only node should not be selected",
272+
fields{
273+
Nodes: map[string]Node{
274+
"es1": {
275+
URL: "http://es1:9200",
276+
Roles: []string{
277+
"master",
278+
},
279+
},
280+
"es2": {
281+
URL: "http://es2:9200",
282+
Roles: []string{
283+
"data",
284+
"data_cold",
285+
"data_content",
286+
"data_frozen",
287+
"data_hot",
288+
"data_warm",
289+
"ingest",
290+
"master",
291+
"ml",
292+
"remote_cluster_client",
293+
"transform",
294+
},
295+
},
296+
"es3": {
297+
URL: "http://es3:9200",
298+
Roles: []string{
299+
"data",
300+
"data_cold",
301+
"data_content",
302+
"data_frozen",
303+
"data_hot",
304+
"data_warm",
305+
"ingest",
306+
"master",
307+
"ml",
308+
"remote_cluster_client",
309+
"transform",
310+
},
311+
},
312+
},
313+
},
314+
315+
wants{
316+
false, 2,
317+
},
318+
},
319+
{
320+
"Master and data only nodes should be selected",
321+
fields{
322+
Nodes: map[string]Node{
323+
"es1": {
324+
URL: "http://es1:9200",
325+
Roles: []string{
326+
"data",
327+
"master",
328+
},
329+
},
330+
"es2": {
331+
URL: "http://es2:9200",
332+
Roles: []string{
333+
"data",
334+
"master",
335+
},
336+
},
337+
},
338+
},
339+
340+
wants{
341+
false, 2,
342+
},
343+
},
344+
}
345+
for _, tt := range tests {
346+
t.Run(tt.name, func(t *testing.T) {
347+
var names []string
348+
var urls []*url.URL
349+
for name, node := range tt.args.Nodes {
350+
u, _ := url.Parse(node.URL)
351+
urls = append(urls, u)
352+
names = append(names, name)
353+
}
354+
355+
newRoundTripper := func() http.RoundTripper {
356+
return &mockTransp{
357+
RoundTripFunc: func(req *http.Request) (*http.Response, error) {
358+
nodes := make(map[string]map[string]nodeInfo)
359+
nodes["nodes"] = make(map[string]nodeInfo)
360+
for name, node := range tt.args.Nodes {
361+
nodes["nodes"][name] = nodeInfo{Roles: node.Roles}
362+
}
363+
364+
b, _ := json.Marshal(nodes)
365+
366+
return &http.Response{
367+
Status: "200 OK",
368+
StatusCode: 200,
369+
ContentLength: int64(len(b)),
370+
Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}),
371+
Body: ioutil.NopCloser(bytes.NewReader(b)),
372+
}, nil
373+
},
374+
}
375+
}
376+
377+
c, _ := New(Config{
378+
URLs: urls,
379+
Transport: newRoundTripper(),
380+
})
381+
c.DiscoverNodes()
382+
383+
pool, ok := c.pool.(*statusConnectionPool)
384+
if !ok {
385+
t.Fatalf("Unexpected pool, want=statusConnectionPool, got=%T", c.pool)
386+
}
387+
388+
if len(pool.live) != tt.want.wantsNConn {
389+
t.Errorf("Unexpected number of nodes, want=%d, got=%d", tt.want.wantsNConn, len(pool.live))
390+
}
391+
392+
for _, conn := range pool.live {
393+
if !reflect.DeepEqual(tt.args.Nodes[conn.ID].Roles, conn.Roles) {
394+
t.Errorf("Unexpected roles for node %s, want=%s, got=%s", conn.Name, tt.args.Nodes[conn.ID], conn.Roles)
395+
}
396+
}
397+
398+
if err := c.DiscoverNodes(); (err != nil) != tt.want.wantErr {
399+
t.Errorf("DiscoverNodes() error = %v, wantErr %v", err, tt.want.wantErr)
400+
}
401+
})
402+
}
403+
})
189404
}

estransport/testdata/nodes.info.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
"build_flavor": "default",
6565
"build_type": "tar",
6666
"build_hash": "2f90bbf7b93631e52bafb59b3b049cb44ec25e96",
67-
"roles": ["master", "ml"],
67+
"roles": ["master"],
6868
"attributes": {
6969
"ml.machine_memory": "8589934592",
7070
"ml.max_open_jobs": "20",

0 commit comments

Comments
 (0)