Skip to content

Commit e40744c

Browse files
committed
Small fixes
1 parent d3e6070 commit e40744c

File tree

4 files changed

+32
-45
lines changed

4 files changed

+32
-45
lines changed

nodes/node_balancer/cmd/nodebalancer/balancer.go

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -105,27 +105,7 @@ func (node *Node) UpdateNodeState(currentBlock uint64, alive bool) (callCounter
105105
return callCounter
106106
}
107107

108-
// IncreaseCallCounter increased to 1 each time node called
109-
func (node *Node) IncreaseCallCounter() {
110-
node.mux.Lock()
111-
if node.CallCounter >= NB_MAX_COUNTER_NUMBER {
112-
log.Printf("Number of calls for node %s reached %d limit, reset the counter.", node.Endpoint, NB_MAX_COUNTER_NUMBER)
113-
node.CallCounter = uint64(0)
114-
} else {
115-
node.CallCounter++
116-
}
117-
node.mux.Unlock()
118-
}
119-
120-
func containsGeneric[T comparable](b []T, e T) bool {
121-
for _, v := range b {
122-
if v == e {
123-
return true
124-
}
125-
}
126-
return false
127-
}
128-
108+
// FilterTagsNodes returns nodes with provided tags
129109
func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) {
130110
nodesMap := npool.NodesMap
131111
nodesSet := npool.NodesSet
@@ -174,7 +154,7 @@ func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node {
174154
if node.IsAlive() {
175155
currentBlock := node.CurrentBlock
176156
if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT {
177-
// Bypass outdated nodes
157+
// Bypass too outdated nodes
178158
continue
179159
}
180160
if node.CallCounter < nextNode.CallCounter {
@@ -183,13 +163,11 @@ func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node {
183163
}
184164
}
185165

186-
if nextNode == nil {
187-
return nil
166+
if nextNode != nil {
167+
// Increase CallCounter value with 1
168+
atomic.AddUint64(&nextNode.CallCounter, uint64(1))
188169
}
189170

190-
// Increase CallCounter value with 1
191-
atomic.AddUint64(&nextNode.CallCounter, uint64(1))
192-
193171
return nextNode
194172
}
195173

@@ -221,41 +199,41 @@ func StatusLog() {
221199
// HealthCheck fetch the node latest block
222200
func HealthCheck() {
223201
for blockchain, nodes := range blockchainPools {
224-
for _, n := range nodes.NodesSet {
202+
for _, node := range nodes.NodesSet {
225203
alive := false
226204

227205
httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT}
228206
resp, err := httpClient.Post(
229-
n.Endpoint.String(),
207+
node.Endpoint.String(),
230208
"application/json",
231209
bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)),
232210
)
233211
if err != nil {
234-
n.UpdateNodeState(0, alive)
235-
log.Printf("Unable to reach node: %s", n.Endpoint.Host)
212+
node.UpdateNodeState(0, alive)
213+
log.Printf("Unable to reach node: %s", node.Endpoint.Host)
236214
continue
237215
}
238216
defer resp.Body.Close()
239217

240218
body, err := ioutil.ReadAll(resp.Body)
241219
if err != nil {
242-
n.UpdateNodeState(0, alive)
243-
log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err)
220+
node.UpdateNodeState(0, alive)
221+
log.Printf("Unable to parse response from %s node, err %v", node.Endpoint.Host, err)
244222
continue
245223
}
246224

247225
var statusResponse NodeStatusResponse
248226
err = json.Unmarshal(body, &statusResponse)
249227
if err != nil {
250-
n.UpdateNodeState(0, alive)
251-
log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err)
228+
node.UpdateNodeState(0, alive)
229+
log.Printf("Unable to read json response from %s node, err: %v", node.Endpoint.Host, err)
252230
continue
253231
}
254232

255233
blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1)
256234
blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64)
257235
if err != nil {
258-
n.UpdateNodeState(0, alive)
236+
node.UpdateNodeState(0, alive)
259237
log.Printf("Unable to parse block number from hex to string, err: %v", err)
260238
continue
261239
}
@@ -264,15 +242,24 @@ func HealthCheck() {
264242
if blockNumber != 0 {
265243
alive = true
266244
}
267-
callCounter := n.UpdateNodeState(blockNumber, alive)
245+
callCounter := node.UpdateNodeState(blockNumber, alive)
268246

269247
if blockNumber > nodes.TopNode.Block {
270248
nodes.TopNode.Block = blockNumber
271-
nodes.TopNode.Node = n
249+
nodes.TopNode.Node = node
250+
}
251+
252+
if node.CallCounter >= NB_MAX_COUNTER_NUMBER {
253+
log.Printf(
254+
"Number of CallCounter for node %s reached %d limit, reset the counter.",
255+
node.Endpoint, NB_MAX_COUNTER_NUMBER,
256+
)
257+
atomic.StoreUint64(&node.CallCounter, uint64(0))
272258
}
273259

274260
log.Printf(
275-
"In blockchain %s node %s is alive: %t with current block: %d called: %d times", blockchain, n.Endpoint.Host, alive, blockNumber, callCounter,
261+
"Blockchain %s node %s is alive: %t with current block: %d called: %d times",
262+
blockchain, node.Endpoint.Host, alive, blockNumber, callCounter,
276263
)
277264
}
278265
}

nodes/node_balancer/cmd/nodebalancer/routes.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string,
130130
}
131131
}
132132

133-
node.IncreaseCallCounter()
134-
135133
// Overwrite Path so response will be returned to correct place
136134
r.URL.Path = "/"
137135
node.GethReverseProxy.ServeHTTP(w, r)

nodes/node_balancer/cmd/nodebalancer/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ func Server() {
128128
fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err)
129129
os.Exit(1)
130130
}
131+
resourcesLog := "Access with resources established."
131132
if len(resources.Resources) != 1 {
132-
log.Println("There are no access IDs for users in resources")
133+
log.Printf("%s There are no access IDs for users in resources", resourcesLog)
133134
} else {
134-
log.Println("Found user access IDs in resources")
135+
log.Printf("%s Found user access IDs in resources", resourcesLog)
135136
}
136137

137138
// Set internal crawlers access to bypass requests from internal services
@@ -145,7 +146,6 @@ func Server() {
145146
BlockchainAccess: true,
146147
ExtendedMethods: true,
147148
}
148-
log.Printf("Internal crawlers access set with user ID: %s", internalCrawlersUserID)
149149

150150
err = InitDatabaseClient()
151151
if err != nil {
@@ -185,6 +185,8 @@ func Server() {
185185
r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER))
186186
// Change r.Host from nodebalancer's to end host so TLS check will be passed
187187
r.Host = r.URL.Host
188+
// Explicit set of r.URL requires, because by default it adds trailing slash and brake some urls
189+
r.URL = endpoint
188190
}
189191
proxyErrorHandler(proxyToEndpoint, endpoint)
190192

nodes/node_balancer/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/bugout-dev/moonstream/nodes/node_balancer
22

3-
go 1.18
3+
go 1.17
44

55
require (
66
github.com/bugout-dev/bugout-go v0.3.4

0 commit comments

Comments
 (0)