Skip to content

Commit 382d776

Browse files
committed
Balncer with tags based on CallCounter
1 parent de609da commit 382d776

File tree

6 files changed

+162
-108
lines changed

6 files changed

+162
-108
lines changed

nodes/node_balancer/cmd/nodebalancer/balancer.go

Lines changed: 106 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Load balancer, based on https://github.com/kasvith/simplelb/
2+
Load balancer logic.
33
*/
44
package main
55

@@ -19,10 +19,9 @@ import (
1919

2020
// Main variable of pool of blockchains which contains pool of nodes
2121
// for each blockchain we work during session.
22-
var blockchainPool BlockchainPool
22+
var blockchainPools map[string]*NodePool
2323

2424
// Node structure with
25-
// StatusURL for status server at node endpoint
2625
// Endpoint for geth/bor/etc node http.server endpoint
2726
type Node struct {
2827
Endpoint *url.URL
@@ -36,16 +35,16 @@ type Node struct {
3635
GethReverseProxy *httputil.ReverseProxy
3736
}
3837

39-
type NodePool struct {
40-
Blockchain string
41-
Nodes []*Node
42-
43-
// Counter to observe all nodes
44-
Current uint64
38+
type TopNodeBlock struct {
39+
Block uint64
40+
Node *Node
4541
}
4642

47-
type BlockchainPool struct {
48-
Blockchains []*NodePool
43+
type NodePool struct {
44+
NodesMap map[string][]*Node
45+
NodesSet []*Node
46+
47+
TopNode TopNodeBlock
4948
}
5049

5150
// Node status response struct for HealthCheck
@@ -58,24 +57,25 @@ type NodeStatusResponse struct {
5857
}
5958

6059
// AddNode to the nodes pool
61-
func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) {
62-
var nodePool *NodePool
63-
for _, b := range bpool.Blockchains {
64-
if b.Blockchain == blockchain {
65-
nodePool = b
66-
}
60+
func AddNode(blockchain string, tags []string, node *Node) {
61+
if blockchainPools == nil {
62+
blockchainPools = make(map[string]*NodePool)
6763
}
64+
if blockchainPools[blockchain] == nil {
65+
blockchainPools[blockchain] = &NodePool{}
66+
}
67+
if blockchainPools[blockchain].NodesMap == nil {
68+
blockchainPools[blockchain].NodesMap = make(map[string][]*Node)
69+
}
70+
blockchainPools[blockchain].NodesSet = append(blockchainPools[blockchain].NodesSet, node)
6871

69-
// Check if blockchain not yet in pool
70-
if nodePool == nil {
71-
nodePool = &NodePool{
72-
Blockchain: blockchain,
73-
}
74-
nodePool.Nodes = append(nodePool.Nodes, node)
75-
bpool.Blockchains = append(bpool.Blockchains, nodePool)
76-
} else {
77-
nodePool.Nodes = append(nodePool.Nodes, node)
72+
for _, tag := range tags {
73+
blockchainPools[blockchain].NodesMap[tag] = append(
74+
blockchainPools[blockchain].NodesMap[tag],
75+
node,
76+
)
7877
}
78+
7979
}
8080

8181
// SetAlive with mutex for exact node
@@ -117,59 +117,86 @@ func (node *Node) IncreaseCallCounter() {
117117
node.mux.Unlock()
118118
}
119119

120-
// GetNextNode returns next active peer to take a connection
121-
// Loop through entire nodes to find out an alive one
122-
func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node {
123-
highestBlock := uint64(0)
124-
125-
// Get NodePool with correct blockchain
126-
var np *NodePool
127-
for _, b := range bpool.Blockchains {
128-
if b.Blockchain == blockchain {
129-
np = b
130-
for _, n := range b.Nodes {
131-
if n.CurrentBlock > highestBlock {
132-
highestBlock = n.CurrentBlock
133-
}
134-
}
120+
func containsGeneric[T comparable](b []T, e T) bool {
121+
for _, v := range b {
122+
if v == e {
123+
return true
135124
}
136125
}
126+
return false
127+
}
137128

138-
// Increase Current value with 1
139-
currentInc := atomic.AddUint64(&np.Current, uint64(1))
129+
func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) {
130+
nodesMap := npool.NodesMap
131+
nodesSet := npool.NodesSet
140132

141-
// next is an Atomic incrementer, value always in range from 0 to slice length,
142-
// it returns an index of slice
143-
next := int(currentInc % uint64(len(np.Nodes)))
133+
tagSet := make(map[string]map[*Node]bool)
134+
135+
for tag, nodes := range nodesMap {
136+
if tagSet[tag] == nil {
137+
tagSet[tag] = make(map[*Node]bool)
138+
}
139+
for _, node := range nodes {
140+
tagSet[tag][node] = true
141+
}
142+
}
144143

145-
// Start from next one and move full cycle
146-
l := len(np.Nodes) + next
144+
topNode := TopNodeBlock{}
147145

148-
for i := next; i < l; i++ {
149-
// Take an index by modding with length
150-
idx := i % len(np.Nodes)
151-
// If we have an alive one, use it and store if its not the original one
152-
if np.Nodes[idx].IsAlive() {
153-
if i != next {
154-
// Mark the current one
155-
atomic.StoreUint64(&np.Current, uint64(idx))
146+
var filteredNodes []*Node
147+
for _, node := range nodesSet {
148+
accept := true
149+
for _, tag := range tags {
150+
if tagSet[tag][node] != true {
151+
accept = false
152+
break
156153
}
157-
// Pass nodes with low blocks
158-
// TODO(kompotkot): Re-write to not rotate through not highest blocks
159-
if np.Nodes[idx].CurrentBlock < highestBlock {
160-
continue
154+
}
155+
if accept {
156+
filteredNodes = append(filteredNodes, node)
157+
currentBlock := node.CurrentBlock
158+
if currentBlock >= npool.TopNode.Block {
159+
topNode.Block = currentBlock
160+
topNode.Node = node
161161
}
162+
}
163+
}
162164

163-
return np.Nodes[idx]
165+
return filteredNodes, topNode
166+
}
167+
168+
// GetNextNode returns next active peer to take a connection
169+
// Loop through entire nodes to find out an alive one and chose one with small CallCounter
170+
func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node {
171+
nextNode := topNode.Node
172+
173+
for _, node := range nodes {
174+
if node.IsAlive() {
175+
currentBlock := node.CurrentBlock
176+
if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT {
177+
// Bypass outdated nodes
178+
continue
179+
}
180+
if node.CallCounter < nextNode.CallCounter {
181+
nextNode = node
182+
}
164183
}
165184
}
166-
return nil
185+
186+
if nextNode == nil {
187+
return nil
188+
}
189+
190+
// Increase CallCounter value with 1
191+
atomic.AddUint64(&nextNode.CallCounter, uint64(1))
192+
193+
return nextNode
167194
}
168195

169196
// SetNodeStatus modify status of the node
170-
func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) {
171-
for _, b := range bpool.Blockchains {
172-
for _, n := range b.Nodes {
197+
func SetNodeStatus(url *url.URL, alive bool) {
198+
for _, nodes := range blockchainPools {
199+
for _, n := range nodes.NodesSet {
173200
if n.Endpoint.String() == url.String() {
174201
n.SetAlive(alive)
175202
break
@@ -180,21 +207,21 @@ func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) {
180207

181208
// StatusLog logs node status
182209
// TODO(kompotkot): Print list of alive and dead nodes
183-
func (bpool *BlockchainPool) StatusLog() {
184-
for _, b := range bpool.Blockchains {
185-
for _, n := range b.Nodes {
210+
func StatusLog() {
211+
for blockchain, nodes := range blockchainPools {
212+
for _, n := range nodes.NodesSet {
186213
log.Printf(
187-
"Blockchain %s node %s is alive %t. Blockchain called %d times",
188-
b.Blockchain, n.Endpoint.Host, n.Alive, b.Current,
214+
"Blockchain %s node %s is alive %t",
215+
blockchain, n.Endpoint.Host, n.Alive,
189216
)
190217
}
191218
}
192219
}
193220

194221
// HealthCheck fetch the node latest block
195-
func (bpool *BlockchainPool) HealthCheck() {
196-
for _, b := range bpool.Blockchains {
197-
for _, n := range b.Nodes {
222+
func HealthCheck() {
223+
for blockchain, nodes := range blockchainPools {
224+
for _, n := range nodes.NodesSet {
198225
alive := false
199226

200227
httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT}
@@ -239,8 +266,13 @@ func (bpool *BlockchainPool) HealthCheck() {
239266
}
240267
callCounter := n.UpdateNodeState(blockNumber, alive)
241268

269+
if blockNumber > nodes.TopNode.Block {
270+
nodes.TopNode.Block = blockNumber
271+
nodes.TopNode.Node = n
272+
}
273+
242274
log.Printf(
243-
"Node %s is alive: %t with current block: %d called: %d times", n.Endpoint.Host, alive, blockNumber, callCounter,
275+
"In blockchain %s node %s is alive: %t with current block: %d called: %d times", blockchain, n.Endpoint.Host, alive, blockNumber, callCounter,
244276
)
245277
}
246278
}

nodes/node_balancer/cmd/nodebalancer/configs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ var (
3434
NB_MAX_COUNTER_NUMBER = uint64(10000000)
3535

3636
// Client configuration
37-
NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds
37+
NB_CLIENT_NODE_KEEP_ALIVE = int64(1) // How long to store node in hot list for client in seconds
38+
NB_HIGHEST_BLOCK_SHIFT = uint64(50) // Allowed shift to prefer node with most highest block
3839

3940
NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER")
4041
NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER")

nodes/node_balancer/cmd/nodebalancer/middleware.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ func (ac *AccessCache) Cleanup() (int64, int64) {
9898
return removedAccessIds, totalAccessIds
9999
}
100100

101-
func initCacheCleaning(debug bool) {
101+
func initCacheCleaning() {
102102
t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL)
103103
for {
104104
select {
105105
case <-t.C:
106106
removedAccessIds, totalAccessIds := accessIdCache.Cleanup()
107-
if debug {
107+
if stateCLI.enableDebugFlag {
108108
log.Printf("Removed %d elements from access id cache", removedAccessIds)
109109
}
110110
log.Printf("Elements in access id cache: %d", totalAccessIds)

nodes/node_balancer/cmd/nodebalancer/routes.go

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,33 +53,20 @@ func lbHandler(w http.ResponseWriter, r *http.Request) {
5353
return
5454
}
5555

56-
// Chose one node
57-
var node *Node
58-
cpool := GetClientPool(blockchain)
59-
node = cpool.GetClientNode(currentClientAccess.AccessID)
60-
if node == nil {
61-
node = blockchainPool.GetNextNode(blockchain)
62-
if node == nil {
63-
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
64-
return
65-
}
66-
cpool.AddClientNode(currentClientAccess.AccessID, node)
67-
}
68-
6956
// Save origin path, to use in proxyErrorHandler if node will not response
7057
r.Header.Add("X-Origin-Path", r.URL.Path)
7158

7259
switch {
7360
case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)):
74-
lbJSONRPCHandler(w, r, blockchain, node, currentClientAccess)
61+
lbJSONRPCHandler(w, r, blockchain, currentClientAccess)
7562
return
7663
default:
7764
http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest)
7865
return
7966
}
8067
}
8168

82-
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) {
69+
func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, currentClientAccess ClientResourceData) {
8370
body, err := ioutil.ReadAll(r.Body)
8471
if err != nil {
8572
http.Error(w, "Unable to read body", http.StatusBadRequest)
@@ -94,6 +81,39 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
9481
return
9582
}
9683

84+
// Get tags from request params, sort and generate from it identifier
85+
var tags []string
86+
queries := r.URL.Query()
87+
for k, v := range queries {
88+
if k == "tag" {
89+
for _, tag := range v {
90+
tags = append(tags, tag)
91+
}
92+
}
93+
}
94+
95+
// Chose one node
96+
var node *Node
97+
cpool := GetClientPool(blockchain)
98+
node = cpool.GetClientNode(currentClientAccess.AccessID)
99+
if node == nil {
100+
npool := blockchainPools[blockchain]
101+
var nodes []*Node
102+
var topNode TopNodeBlock
103+
if len(tags) != 0 {
104+
nodes, topNode = npool.FilterTagsNodes(tags)
105+
} else {
106+
topNode = npool.TopNode
107+
nodes = npool.NodesSet
108+
}
109+
node = GetNextNode(nodes, topNode)
110+
if node == nil {
111+
http.Error(w, "There are no nodes available", http.StatusServiceUnavailable)
112+
return
113+
}
114+
cpool.AddClientNode(currentClientAccess.AccessID, node)
115+
}
116+
97117
switch {
98118
case currentClientAccess.dataSource == "blockchain":
99119
if currentClientAccess.BlockchainAccess == false {

0 commit comments

Comments
 (0)