Skip to content

Commit da9a61f

Browse files
author
Ian Pye
committed
Adding logic to eras to automatically update using the BirdBrain code
1 parent e57f0cb commit da9a61f

File tree

3 files changed

+200
-1
lines changed

3 files changed

+200
-1
lines changed

cluster/era.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package cluster
22

3+
import (
4+
"logger"
5+
"stash.cloudflare.com/go-stream/util/slog"
6+
)
7+
38
type Era interface {
49
GetNodes() []Node
510
}
@@ -19,3 +24,60 @@ func (s *SimpleEra) Add(n Node) {
1924
func (s *SimpleEra) GetNodes() []Node {
2025
return s.nodes
2126
}
27+
28+
type WeightedEra struct {
29+
nodes []Node
30+
nodeMap [MAX_WEIGHT]*WeightedNode
31+
}
32+
33+
func NewWeightedEra() *WeightedEra {
34+
return &WeightedEra{make([]Node, 0, 0), [MAX_WEIGHT]*WeightedNode{}}
35+
}
36+
37+
func (s *WeightedEra) Add(n Node) {
38+
s.nodes = append(s.nodes, n)
39+
}
40+
41+
func (s *WeightedEra) GetNodes() []Node {
42+
return s.nodes
43+
}
44+
45+
func (s *WeightedEra) GetNode(posit int) *WeightedNode {
46+
if posit < MAX_WEIGHT {
47+
return s.nodeMap[posit]
48+
}
49+
return s.nodeMap[0]
50+
}
51+
52+
func (s *WeightedEra) NormalizeAndPopulateMap() {
53+
total := uint32(0)
54+
scalar := uint32(1)
55+
for _, n := range s.nodes {
56+
wn := n.(*WeightedNode)
57+
total += wn.weight
58+
}
59+
60+
if total == 0 {
61+
slog.Logf(logger.Levels.Error, "Total Node Wieght 0")
62+
return
63+
}
64+
65+
if total < MAX_WEIGHT {
66+
// Scale weights up
67+
scalar = MAX_WEIGHT / total
68+
total = MAX_WEIGHT
69+
}
70+
71+
lastPosit := 0
72+
for _, n := range s.nodes {
73+
wn := n.(*WeightedNode)
74+
wn.weight = ((wn.weight * scalar) / total) * MAX_WEIGHT
75+
slog.Logf(logger.Levels.Debug, "New Weight %d", wn.weight)
76+
for i := lastPosit; uint32(i) < wn.weight && i < MAX_WEIGHT; i++ {
77+
s.nodeMap[i] = wn
78+
lastPosit++
79+
}
80+
}
81+
82+
return
83+
}

cluster/manager.go

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
package cluster
22

3-
import "time"
3+
import (
4+
"encoding/json"
5+
"io/ioutil"
6+
"logger"
7+
"net/http"
8+
"stash.cloudflare.com/go-stream/util/slog"
9+
"time"
10+
)
11+
12+
const (
13+
MAX_ERAS_SAVED = 10
14+
)
415

516
/* There should be a separate manager for each service */
617
type Manager interface {
@@ -23,3 +34,93 @@ func (c *StaticManager) GetCurrentEra() Era {
2334
func NewStaticManager(e Era) *StaticManager {
2435
return &StaticManager{e}
2536
}
37+
38+
type DynamicBBManager struct {
39+
Eras map[time.Time]Era
40+
ErasAdded []time.Time
41+
BBHosts []string
42+
CurrentTime time.Time
43+
}
44+
45+
type BBHost struct {
46+
Disk_free float32
47+
Load float32
48+
Name string
49+
Host string
50+
Port string
51+
}
52+
53+
type BBResult struct {
54+
Nodes []BBHost
55+
}
56+
57+
func (c *DynamicBBManager) GetEra(t time.Time) Era {
58+
if ct, ok := c.Eras[t]; ok {
59+
return ct
60+
}
61+
return c.Eras[c.CurrentTime]
62+
}
63+
64+
func (c *DynamicBBManager) GetCurrentEra() Era {
65+
return c.Eras[c.CurrentTime]
66+
}
67+
68+
func (c *DynamicBBManager) pullLatestEra() (err error) {
69+
for _, url := range c.BBHosts {
70+
if resp, err := http.Get(url); err == nil {
71+
if bbbody, err := ioutil.ReadAll(resp.Body); err == nil {
72+
// Try parsing this.
73+
bbr := BBResult{}
74+
if err := json.Unmarshal(bbbody, &bbr); err == nil {
75+
ctime := time.Now()
76+
77+
we := NewWeightedEra()
78+
for _, node := range bbr.Nodes {
79+
n := NewWeightedNode(node.Name, node.Host, node.Port, node.Disk_free, node.Load)
80+
slog.Logf(logger.Levels.Debug, "Trasport LOG INFO %v", n)
81+
we.Add(n)
82+
}
83+
84+
// Once all the nodes are in for this era, re-weight the Era
85+
we.NormalizeAndPopulateMap()
86+
c.Eras[ctime] = we
87+
c.CurrentTime = ctime
88+
c.ErasAdded = append(c.ErasAdded, ctime)
89+
90+
// And Remove any super old eras
91+
if len(c.ErasAdded) > MAX_ERAS_SAVED {
92+
delete(c.Eras, c.ErasAdded[0])
93+
c.ErasAdded = append(c.ErasAdded[:1], c.ErasAdded[2:]...)
94+
}
95+
96+
// Once we have hit one BB server with no error, no need to try any others.
97+
break
98+
}
99+
}
100+
}
101+
}
102+
return
103+
}
104+
105+
func (c *DynamicBBManager) keepErasCurrent() {
106+
for {
107+
time.Sleep(60 * time.Second)
108+
slog.Logf(logger.Levels.Debug, "Updating to new era")
109+
err := c.pullLatestEra()
110+
if err != nil {
111+
slog.Logf(logger.Levels.Error, "Cannot get a valid era %v", err)
112+
}
113+
}
114+
}
115+
116+
func NewDynamicBBManager(bbHosts []string) *DynamicBBManager {
117+
dm := DynamicBBManager{make(map[time.Time]Era), make([]time.Time, 0, 0), bbHosts, time.Now()}
118+
err := dm.pullLatestEra()
119+
if err != nil || len(dm.Eras) == 0 {
120+
slog.Fatalf("Cannot get a valid era %v", err)
121+
}
122+
123+
// Keep updating with periodic info
124+
go dm.keepErasCurrent()
125+
return &dm
126+
}

cluster/node.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
package cluster
22

3+
const (
4+
MAX_WEIGHT = 100
5+
MEDIAN_WEIGHT = 50
6+
MIN_DISK_TO_WORRY = 0.000809804
7+
DEGRADED_WEIGHT = 30
8+
)
9+
310
type Node interface {
411
Name() string
512
}
@@ -31,3 +38,32 @@ func (n *SimpleNode) Ip() string {
3138
func (n *SimpleNode) Port() string {
3239
return n.port
3340
}
41+
42+
type WeightedNode struct {
43+
name string
44+
ip string
45+
port string
46+
weight uint32
47+
}
48+
49+
func NewWeightedNode(name string, ip string, port string, disk float32, load float32) *WeightedNode {
50+
wn := WeightedNode{name, ip, port, MEDIAN_WEIGHT}
51+
// For right now, keep it simple. Once a disk falls below a given threshold, trottle it a bit.
52+
// Don't pay attention to load now, to avoid flapping.
53+
if disk < MIN_DISK_TO_WORRY {
54+
wn.weight = DEGRADED_WEIGHT
55+
}
56+
return &wn
57+
}
58+
59+
func (n *WeightedNode) Name() string {
60+
return n.name
61+
}
62+
63+
func (n *WeightedNode) Ip() string {
64+
return n.ip
65+
}
66+
67+
func (n *WeightedNode) Port() string {
68+
return n.port
69+
}

0 commit comments

Comments
 (0)