Skip to content

Commit 8f68d4a

Browse files
authored
Refactor metric writing (#20)
* Refactor writing metrics for evm * Fix bug evm collector, record and process block height only if it is recorded * Refactor bitcoin collector to use metrics processor * Refactor cardano collector to use metrics processor * Refactor conflux to use metrics collector * Refactor dogecoin to use metrics collector * Refactor filecoinn to use metrics collector * Refactor Solana to use metrics collector * Refactor Starkware to use metrics collector * Refactor Starkware to use metrics collector
1 parent 2ae798f commit 8f68d4a

File tree

10 files changed

+219
-82
lines changed

10 files changed

+219
-82
lines changed

src/collectors/bitcoin.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from helpers import check_protocol, strip_url, generate_labels_from_metadata
44
from time import perf_counter
55
from settings import logger
6+
from metrics_processor import results
67

78

89
class bitcoin_collector():
@@ -15,20 +16,21 @@ def __init__(self, rpc_metadata):
1516
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
1617
exit(1)
1718

18-
async def _probe(self, metrics):
19+
async def _probe(self) -> results:
20+
results.register(self.url, self.labels_values)
1921
try:
2022
async with BitcoinRPC(self.url, "admin", "admin") as rpc:
2123
start = perf_counter()
2224
chain_info = await rpc.getblockchaininfo()
2325
latency = (perf_counter() - start) * 1000
2426

25-
metrics['brpc_health'].add_metric(self.labels_values, True)
26-
metrics['brpc_latency'].add_metric(self.labels_values, latency)
27-
metrics['brpc_block_height'].add_metric(self.labels_values, chain_info['headers'])
28-
metrics['brpc_total_difficulty'].add_metric(self.labels_values, chain_info['difficulty'])
27+
results.record_health(self.url, True)
28+
results.record_latency(self.url, latency)
29+
results.record_block_height(self.url, chain_info['headers'])
30+
results.record_total_difficulty(self.url, chain_info['difficulty'])
2931
except Exception as exc:
3032
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
31-
metrics['brpc_health'].add_metric(self.labels_values, False)
33+
results.record_health(self.url, False)
3234

33-
def probe(self, metrics):
34-
asyncio.run(self._probe(metrics))
35+
def probe(self):
36+
asyncio.run(self._probe())

src/collectors/cardano.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
from collectors.ws import websocket_collector
44
from helpers import check_protocol, strip_url, generate_labels_from_metadata
5+
from metrics_processor import results
56

67

78
class cardano_collector():
@@ -30,14 +31,16 @@ def _get_block_height(self):
3031
except KeyError as err:
3132
logger.error("Failed to fetch block height for {}, error: {}".format(strip_url(self.url), err))
3233

33-
def probe(self, metrics):
34+
def probe(self) -> results:
35+
results.register(self.url, self.labels_values)
3436
try:
3537
alive = self.ws_collector.get_liveliness()
3638
if alive:
37-
metrics['brpc_health'].add_metric(self.labels_values, True)
38-
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
39-
metrics['brpc_block_height'].add_metric(self.labels_values, self._get_block_height())
39+
results.record_health(self.url, True)
40+
results.record_latency(self.url, self.ws_collector.get_latency())
41+
results.record_block_height(self.url, self._get_block_height())
4042
else:
41-
metrics['brpc_health'].add_metric(self.labels_values, False)
43+
results.record_health(self.url, False)
4244
except Exception as exc:
4345
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
46+
results.record_health(self.url, False)

src/collectors/conflux.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
from helpers import strip_url, check_protocol, generate_labels_from_metadata
55
from collectors.ws import websocket_collector
6+
from metrics_processor import results
67

78

89
class conflux_collector():
@@ -28,31 +29,30 @@ def __init__(self, rpc_metadata):
2829
logger.error("Please provide wss/ws endpoint for {}".format(strip_url(self.url)))
2930
exit(1)
3031

31-
def probe(self, metrics):
32+
def probe(self) -> results:
33+
results.register(self.url, self.labels_values)
3234
try:
3335
if self.client.isConnected():
34-
metrics['brpc_health'].add_metric(self.labels_values, True)
35-
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
36-
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
37-
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
38-
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.cfx.epoch_number)
39-
36+
results.record_health(self.url, True)
37+
results.record_head_count(self.url, self.ws_collector.message_counter)
38+
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
39+
results.record_latency(self.url, self.ws_collector.get_latency())
40+
results.record_block_height(self.url, self.client.cfx.epoch_number)
4041
try:
4142
difficulty = self.client.cfx.get_block_by_hash(self.client.cfx.get_best_block_hash())['difficulty']
42-
metrics['brpc_difficulty'].add_metric(self.labels_values, difficulty)
43+
results.record_difficulty(self.url, difficulty)
4344
except TypeError:
4445
logger.error(
4546
"RPC Endpoint sent faulty response type when querying for difficulty. This is most likely issue with RPC endpoint."
4647
)
47-
48-
metrics['brpc_gas_price'].add_metric(self.labels_values, self.client.cfx.gas_price)
49-
metrics['brpc_client_version'].add_metric(self.labels_values,
50-
value={"client_version": self.client.clientVersion})
48+
results.record_gas_price(self.url, self.client.cfx.gas_price)
49+
results.record_client_version(self.url, self.client.clientVersion)
5150
else:
5251
logger.info("Client is not connected to {}".format(strip_url(self.url)))
53-
metrics['brpc_health'].add_metric(self.labels_values, False)
52+
results.record_health(self.url, False)
5453
except asyncio.exceptions.TimeoutError:
5554
logger.info("Client timed out for {}".format(strip_url(self.url)))
56-
metrics['brpc_health'].add_metric(self.labels_values, False)
55+
results.record_health(self.url, False)
5756
except Exception as exc:
5857
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
58+
results.record_health(self.url, False)

src/collectors/dogecoin.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from settings import logger
33
from time import perf_counter
44
import requests
5-
5+
from metrics_processor import results
66

77
class doge_collector():
88

@@ -14,24 +14,25 @@ def __init__(self, rpc_metadata):
1414
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
1515
exit(1)
1616

17-
def probe(self, metrics):
17+
def probe(self) -> results:
18+
results.register(self.url, self.labels_values)
1819
try:
1920
payload = {'version': '1.1', 'method': "getinfo", 'id': 1}
2021
start = perf_counter()
2122
response = requests.post(self.url, json=payload).json()
2223
latency = (perf_counter() - start) * 1000
2324

2425
if response:
25-
metrics['brpc_health'].add_metric(self.labels_values, True)
26-
metrics['brpc_latency'].add_metric(self.labels_values, latency)
27-
metrics['brpc_block_height'].add_metric(self.labels_values, response['result']['blocks'])
28-
metrics['brpc_total_difficulty'].add_metric(self.labels_values, response['result']['difficulty'])
26+
results.record_health(self.url, True)
27+
results.record_latency(self.url, latency)
28+
results.record_block_height(self.url, response['result']['blocks'])
29+
results.record_total_difficulty(self.url, response['result']['difficulty'])
2930
else:
3031
logger.error("Bad response from client {}: {}".format(strip_url(self.url), exc))
31-
metrics['brpc_health'].add_metric(self.labels_values, False)
32+
results.record_health(self.url, False)
3233
except requests.RequestException as exc:
3334
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
34-
metrics['brpc_health'].add_metric(self.labels_values, False)
35+
results.record_health(self.url, False)
3536
except Exception as exc:
3637
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
37-
metrics['brpc_health'].add_metric(self.labels_values, False)
38+
results.record_health(self.url, False)

src/collectors/evm.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from helpers import strip_url, check_protocol, generate_labels_from_metadata
55
from collectors.ws import websocket_collector
66
from websockets.exceptions import WebSocketException
7+
from metrics_processor import results
78

89

910
class evm_collector():
@@ -12,9 +13,7 @@ def __init__(self, rpc_metadata):
1213
self.url = rpc_metadata['url']
1314
if check_protocol(rpc_metadata['url'], "wss") or check_protocol(rpc_metadata['url'], 'ws'):
1415
self.client = Web3(Web3.WebsocketProvider(self.url, websocket_timeout=cfg.response_timeout))
15-
1616
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
17-
1817
self.ws_collector = websocket_collector(self.url,
1918
sub_payload={
2019
"method": "eth_subscribe",
@@ -24,35 +23,34 @@ def __init__(self, rpc_metadata):
2423
})
2524
self.ws_collector.setDaemon(True)
2625
self.ws_collector.start()
26+
2727
else:
2828
logger.error("Please provide wss/ws endpoint for {}".format(strip_url(self.url)))
2929
exit(1)
3030

31-
def probe(self, metrics):
31+
def probe(self) -> results:
32+
results.register(self.url, self.labels_values)
3233
try:
3334
if self.client.isConnected():
34-
metrics['brpc_health'].add_metric(self.labels_values, True)
35-
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
36-
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
37-
metrics['brpc_latency'].add_metric(self.labels_values, self.ws_collector.get_latency())
38-
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.eth.block_number)
39-
metrics['brpc_total_difficulty'].add_metric(self.labels_values,
40-
self.client.eth.get_block('latest')['totalDifficulty'])
41-
metrics['brpc_difficulty'].add_metric(self.labels_values,
42-
self.client.eth.get_block('latest')['difficulty'])
43-
metrics['brpc_gas_price'].add_metric(self.labels_values, self.client.eth.gas_price)
44-
metrics['brpc_max_priority_fee'].add_metric(self.labels_values, self.client.eth.max_priority_fee)
45-
metrics['brpc_client_version'].add_metric(self.labels_values,
46-
value={"client_version": self.client.clientVersion})
35+
results.record_health(self.url, True)
36+
results.record_head_count(self.url, self.ws_collector.message_counter)
37+
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
38+
results.record_latency(self.url, self.ws_collector.get_latency())
39+
results.record_block_height(self.url, self.client.eth.block_number)
40+
results.record_total_difficulty(self.url, self.client.eth.get_block('latest')['totalDifficulty'])
41+
results.record_difficulty(self.url, self.client.eth.get_block('latest')['difficulty'])
42+
results.record_gas_price(self.url, self.client.eth.gas_price)
43+
results.record_max_priority_fee(self.url, self.client.eth.max_priority_fee)
44+
results.record_client_version(self.url, self.client.clientVersion)
4745
else:
4846
logger.info("Client is not connected to {}".format(strip_url(self.url)))
49-
metrics['brpc_health'].add_metric(self.labels_values, False)
47+
results.record_health(self.url, False)
5048
except asyncio.exceptions.TimeoutError as exc:
5149
logger.info("Client timed out for {}: {}".format(strip_url(self.url), exc))
52-
metrics['brpc_health'].add_metric(self.labels_values, False)
50+
results.record_health(self.url, False)
5351
except WebSocketException as exc:
5452
logger.info("Websocket client exception {}: {}".format(strip_url(self.url), exc))
55-
metrics['brpc_health'].add_metric(self.labels_values, False)
53+
results.record_health(self.url, False)
5654
except Exception as exc:
5755
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
58-
metrics['brpc_health'].add_metric(self.labels_values, False)
56+
results.record_health(self.url, False)

src/collectors/filecoin.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from settings import logger
33
from time import perf_counter
44
import requests
5+
from metrics_processor import results
56

67

78
class filecoin_collector():
@@ -14,24 +15,25 @@ def __init__(self, rpc_metadata):
1415
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
1516
exit(1)
1617

17-
def probe(self, metrics):
18+
def probe(self) -> results:
19+
results.register(self.url, self.labels_values)
1820
try:
1921
payload = {'jsonrpc': '2.0', 'method': "Filecoin.ChainHead", 'id': 1}
2022
start = perf_counter()
2123
response = requests.post(self.url, json=payload)
2224
latency = (perf_counter() - start) * 1000
2325

2426
if response.ok:
25-
metrics['brpc_health'].add_metric(self.labels_values, True)
26-
metrics['brpc_latency'].add_metric(self.labels_values, latency)
27-
metrics['brpc_block_height'].add_metric(self.labels_values, response.json()['result']['Height'])
27+
results.record_health(self.url, True)
28+
results.record_latency(self.url, latency)
29+
results.record_block_height(self.url, response.json()['result']['Height'])
2830
else:
2931
logger.error("Bad response from client while fetching Filecoin.ChainHead method for {}: {}".format(
3032
strip_url(self.url), response))
31-
metrics['brpc_health'].add_metric(self.labels_values, False)
33+
results.record_health(self.url, False)
3234
except requests.RequestException as exc:
3335
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
34-
metrics['brpc_health'].add_metric(self.labels_values, False)
36+
results.record_health(self.url, False)
3537
except Exception as e:
3638
logger.error("Health check failed for {}: {}".format(strip_url(self.url), e))
37-
metrics['brpc_health'].add_metric(self.labels_values, False)
39+
results.record_health(self.url, False)

src/collectors/solana.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from helpers import strip_url, url_join, check_protocol, generate_labels_from_metadata
44
from collectors.ws import websocket_collector
55
import requests
6-
6+
from metrics_processor import results
77

88
class solana_collector():
99

@@ -48,14 +48,16 @@ def is_connected(self) -> bool:
4848
return False
4949
return response.ok
5050

51-
def probe(self, metrics):
51+
def probe(self) -> results:
52+
results.register(self.url, self.labels_values)
5253
try:
5354
if self.is_connected():
54-
metrics['brpc_health'].add_metric(self.labels_values, True)
55-
metrics['brpc_head_count'].add_metric(self.labels_values, self.ws_collector.message_counter)
56-
metrics['brpc_disconnects'].add_metric(self.labels_values, self.ws_collector.disconnects_counter)
57-
metrics['brpc_block_height'].add_metric(self.labels_values, self.client.get_block_height()['result'])
55+
results.record_health(self.url, True)
56+
results.record_head_count(self.url, self.ws_collector.message_counter)
57+
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
58+
results.record_block_height(self.url, self.client.get_block_height()['result'])
5859
else:
59-
metrics['brpc_health'].add_metric(self.labels_values, False)
60+
results.record_health(self.url, False)
6061
except Exception as exc:
6162
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
63+
results.record_health(self.url, False)

src/collectors/starkware.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from helpers import strip_url, check_protocol, generate_labels_from_metadata
33
from time import perf_counter
44
import requests
5-
5+
from metrics_processor import results
66

77
class starkware_collector():
88

@@ -14,7 +14,8 @@ def __init__(self, rpc_metadata):
1414
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
1515
exit(1)
1616

17-
def probe(self, metrics):
17+
def probe(self) -> results:
18+
results.register(self.url, self.labels_values)
1819
try:
1920
payload = {"method": "starknet_blockNumber", "jsonrpc": "2.0", "id": 1}
2021
start = perf_counter()
@@ -23,15 +24,15 @@ def probe(self, metrics):
2324
latency = (perf_counter() - start) * 1000
2425

2526
if response:
26-
metrics['brpc_health'].add_metric(self.labels_values, True)
27-
metrics['brpc_latency'].add_metric(self.labels_values, latency)
28-
metrics['brpc_block_height'].add_metric(self.labels_values, response['result'])
27+
results.record_health(self.url, True)
28+
results.record_latency(self.url, latency)
29+
results.record_block_height(self.url, response['result'])
2930
else:
3031
logger.error("Bad response from client {}: {}".format(strip_url(self.url), exc))
31-
metrics['brpc_health'].add_metric(self.labels_values, False)
32+
results.record_health(self.url, False)
3233
except requests.RequestException as exc:
3334
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
34-
metrics['brpc_health'].add_metric(self.labels_values, False)
35+
results.record_health(self.url, False)
3536
except Exception as exc:
3637
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
37-
metrics['brpc_health'].add_metric(self.labels_values, False)
38+
results.record_health(self.url, False)

0 commit comments

Comments
 (0)