Skip to content

Commit c6cb008

Browse files
committed
Refactor writing metrics for evm
1 parent 2ae798f commit c6cb008

File tree

3 files changed

+150
-24
lines changed

3 files changed

+150
-24
lines changed

src/collectors/evm.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from helpers import strip_url, check_protocol, generate_labels_from_metadata
55
from collectors.ws import websocket_collector
66
from websockets.exceptions import WebSocketException
7+
from metrics_processor import results
78

89

910
class evm_collector():
@@ -12,9 +13,7 @@ def __init__(self, rpc_metadata):
1213
self.url = rpc_metadata['url']
1314
if check_protocol(rpc_metadata['url'], "wss") or check_protocol(rpc_metadata['url'], 'ws'):
1415
self.client = Web3(Web3.WebsocketProvider(self.url, websocket_timeout=cfg.response_timeout))
15-
1616
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
17-
1817
self.ws_collector = websocket_collector(self.url,
1918
sub_payload={
2019
"method": "eth_subscribe",
@@ -24,35 +23,34 @@ def __init__(self, rpc_metadata):
2423
})
2524
self.ws_collector.setDaemon(True)
2625
self.ws_collector.start()
26+
2727
else:
2828
logger.error("Please provide wss/ws endpoint for {}".format(strip_url(self.url)))
2929
exit(1)
3030

31-
def probe(self, metrics):
31+
def probe(self) -> results:
3232
try:
3333
if self.client.isConnected():
34-
metrics['brpc_health'].add_metric(self.labels_values, True)
35-
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
36-
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
37-
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
38-
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.eth.block_number)
39-
metrics['brpc_total_difficulty'].add_metric(self.labels_values,
40-
self.client.eth.get_block('latest')['totalDifficulty'])
41-
metrics['brpc_difficulty'].add_metric(self.labels_values,
42-
self.client.eth.get_block('latest')['difficulty'])
43-
metrics['brpc_gas_price'].add_metric(self.labels_values, self.client.eth.gas_price)
44-
metrics['brpc_max_priority_fee'].add_metric(self.labels_values, self.client.eth.max_priority_fee)
45-
metrics['brpc_client_version'].add_metric(self.labels_values,
46-
value={"client_version": self.client.clientVersion})
34+
results.register(self.url, self.labels_values)
35+
results.record_health(self.url, True)
36+
results.record_head_count(self.url, self.ws_collector.message_counter)
37+
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
38+
results.record_latency(self.url, self.ws_collector.get_latency())
39+
results.record_block_height(self.url, self.client.eth.block_number)
40+
results.record_total_difficulty(self.url, self.client.eth.get_block('latest')['totalDifficulty'])
41+
results.record_difficulty(self.url, self.client.eth.get_block('latest')['difficulty'])
42+
results.record_gas_price(self.url, self.client.eth.gas_price)
43+
results.record_max_priority_fee(self.url, self.client.eth.max_priority_fee)
44+
results.record_client_version(self.url, self.client.clientVersion)
4745
else:
4846
logger.info("Client is not connected to {}".format(strip_url(self.url)))
49-
metrics['brpc_health'].add_metric(self.labels_values, False)
47+
results.record_health(self.url, False)
5048
except asyncio.exceptions.TimeoutError as exc:
5149
logger.info("Client timed out for {}: {}".format(strip_url(self.url), exc))
52-
metrics['brpc_health'].add_metric(self.labels_values, False)
50+
results.record_health(self.url, False)
5351
except WebSocketException as exc:
5452
logger.info("Websocket client exception {}: {}".format(strip_url(self.url), exc))
55-
metrics['brpc_health'].add_metric(self.labels_values, False)
53+
results.record_health(self.url, False)
5654
except Exception as exc:
5755
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
58-
metrics['brpc_health'].add_metric(self.labels_values, False)
56+
results.record_health(self.url, False)

src/exporter.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from collectors.filecoin import filecoin_collector
1313
from collectors.starkware import starkware_collector
1414
from settings import logger, cfg
15+
from metrics_processor import results
1516

1617

1718
class prom_registry(object):
@@ -120,15 +121,25 @@ def collect(self):
120121
"brpc_client_version":
121122
InfoMetricFamily('brpc_client_version',
122123
'Client version for the particular RPC endpoint.',
123-
labels=self.labels)
124+
labels=self.labels),
125+
"brpc_block_height_behind_highest":
126+
GaugeMetricFamily('brpc_block_height_behind_highest',
127+
'Number of blocks behind highest in the pool.',
128+
labels=self.labels),
129+
"brpc_difficulty_behind_highest":
130+
GaugeMetricFamily('brpc_difficulty_behind_highest',
131+
'Delta compared between highest total difficulty of the latest block in the pool.',
132+
labels=self.labels),
124133
}
125134

126-
def write_metrics(prom_collector, metrics):
127-
prom_collector.probe(metrics)
135+
def collect_metrics(prom_collector):
136+
prom_collector.probe()
128137

129138
with ThreadPoolExecutor(max_workers=len(self.collectors)) as executor:
130139
for collector in self.collectors:
131-
executor.submit(write_metrics, collector, metrics)
140+
executor.submit(collect_metrics, collector)
141+
# Process the collected metrics
142+
results.write_metrics(metrics)
132143

133144
for _, metric in metrics.items():
134145
# Only yield metric if samples were provided by the probe

src/metrics_processor.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
class ProbeResults(object):
2+
3+
def __init__(self, metadata={}):
4+
self.metadata = metadata
5+
6+
def record_health(self, url, record: bool):
7+
self.metadata[url]['brpc_health'] = record
8+
9+
def record_head_count(self, url, record: int):
10+
self.metadata[url]['brpc_head_count'] = record
11+
12+
def record_disconnects(self, url, record: int):
13+
self.metadata[url]['brpc_disconnects'] = record
14+
15+
def record_latency(self, url, record: float):
16+
self.metadata[url]['brpc_latency'] = record
17+
18+
def record_block_height(self, url, record: int):
19+
self.metadata[url]['brpc_block_height'] = record
20+
21+
def record_total_difficulty(self, url, record: int):
22+
self.metadata[url]['brpc_total_difficulty'] = record
23+
24+
def record_difficulty(self, url, record: int):
25+
self.metadata[url]['brpc_difficulty'] = record
26+
27+
def record_gas_price(self, url, record: float):
28+
self.metadata[url]['brpc_gas_price'] = record
29+
30+
def record_max_priority_fee(self, url, record: float):
31+
self.metadata[url]['brpc_max_priority_fee'] = record
32+
33+
def record_client_version(self, url, record: str):
34+
self.metadata[url]['brpc_client_version'] = record
35+
36+
def register(self, url, label_values):
37+
self.metadata[url] = {
38+
'brpc_health': None,
39+
'brpc_head_count': None,
40+
'brpc_disconnects': None,
41+
'brpc_latency': None,
42+
'brpc_block_height': None,
43+
'brpc_difficulty': None,
44+
'brpc_total_difficulty': None,
45+
'brpc_gas_price': None,
46+
'brpc_max_priority_fee': None,
47+
'brpc_client_version': None,
48+
'label_values': label_values
49+
}
50+
51+
def get_highest_block(self):
52+
highest_block = 0
53+
for url in self.metadata:
54+
if 'brpc_block_height' in self.metadata[url]:
55+
if self.metadata[url]['brpc_block_height'] > highest_block:
56+
highest_block = self.metadata[url]['brpc_block_height']
57+
return highest_block
58+
59+
def get_highest_total_difficulty(self):
60+
highest_total_difficulty = 0
61+
for url in self.metadata:
62+
if 'brpc_total_difficulty' in self.metadata[url]:
63+
if self.metadata[url]['brpc_total_difficulty'] > highest_total_difficulty:
64+
highest_total_difficulty = self.metadata[url]['brpc_total_difficulty']
65+
return highest_total_difficulty
66+
67+
def write_metrics(self, metrics):
68+
highest_block = self.get_highest_block()
69+
highest_total_difficulty = self.get_highest_total_difficulty()
70+
try:
71+
for url in self.metadata:
72+
if self.metadata[url]['brpc_health'] != None:
73+
metrics['brpc_health'].add_metric(self.metadata[url]['label_values'],
74+
self.metadata[url]['brpc_health'])
75+
if self.metadata[url]['brpc_head_count'] != None:
76+
metrics['brpc_head_count'].add_metric(self.metadata[url]['label_values'],
77+
self.metadata[url]['brpc_head_count'])
78+
if self.metadata[url]['brpc_disconnects'] != None:
79+
metrics['brpc_disconnects'].add_metric(self.metadata[url]['label_values'],
80+
self.metadata[url]['brpc_disconnects'])
81+
if self.metadata[url]['brpc_latency'] != None:
82+
metrics['brpc_latency'].add_metric(self.metadata[url]['label_values'],
83+
self.metadata[url]['brpc_latency'])
84+
if self.metadata[url]['brpc_block_height'] != None:
85+
metrics['brpc_block_height'].add_metric(self.metadata[url]['label_values'],
86+
self.metadata[url]['brpc_block_height'])
87+
behind_highest = highest_block - self.metadata[url]['brpc_block_height']
88+
metrics['brpc_block_height_behind_highest'].add_metric(self.metadata[url]['label_values'],
89+
behind_highest)
90+
if self.metadata[url]['brpc_total_difficulty'] != None:
91+
metrics['brpc_total_difficulty'].add_metric(self.metadata[url]['label_values'],
92+
self.metadata[url]['brpc_total_difficulty'])
93+
behind_highest_total_difficulty = highest_total_difficulty - self.metadata[url][
94+
'brpc_total_difficulty']
95+
metrics['brpc_difficulty_behind_highest'].add_metric(self.metadata[url]['label_values'],
96+
behind_highest_total_difficulty)
97+
98+
if self.metadata[url]['brpc_difficulty'] != None:
99+
metrics['brpc_difficulty'].add_metric(self.metadata[url]['label_values'],
100+
self.metadata[url]['brpc_difficulty'])
101+
if self.metadata[url]['brpc_gas_price'] != None:
102+
metrics['brpc_gas_price'].add_metric(self.metadata[url]['label_values'],
103+
self.metadata[url]['brpc_gas_price'])
104+
if self.metadata[url]['brpc_gas_price'] != None:
105+
metrics['brpc_max_priority_fee'].add_metric(self.metadata[url]['label_values'],
106+
self.metadata[url]['brpc_max_priority_fee'])
107+
if self.metadata[url]['brpc_client_version'] != None:
108+
metrics['brpc_client_version'].add_metric(
109+
self.metadata[url]['label_values'],
110+
value={"client_version": self.metadata[url]['brpc_client_version']})
111+
self.metadata.clear()
112+
except Exception as e:
113+
print(e, type(e))
114+
self.metadata.clear()
115+
116+
117+
results = ProbeResults()

0 commit comments

Comments
 (0)