Skip to content

Commit 1af28a2

Browse files
authored
v4.2.0
- Standardise on websockets and requests |- Implement structured logging
1 parent 048632c commit 1af28a2

File tree

14 files changed

+297
-403
lines changed

14 files changed

+297
-403
lines changed

requirements.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,5 @@ prometheus-client==0.13.1
33
pyyaml==6.0
44
schema==0.7.5
55
websockets==10.3
6-
web3==v6.0.0-beta.4
7-
solana==0.25.1
8-
conflux-web3==1.0.0b1
6+
structlog==22.1.0
7+
requests==2.28.1

src/collectors/bitcoin.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,46 @@
1-
from helpers import check_protocol, strip_url, generate_labels_from_metadata
2-
from time import perf_counter
1+
from helpers import strip_url, generate_labels_from_metadata
32
from settings import logger
43
from metrics_processor import results
54
import requests
5+
from collectors.https import https_connection
66

77

88
class bitcoin_collector():
99

1010
def __init__(self, rpc_metadata):
11-
self.url = rpc_metadata['url']
12-
if check_protocol(self.url, "https"):
13-
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
14-
else:
15-
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
16-
exit(1)
11+
self.url, self.stripped_url = rpc_metadata['url'], strip_url(rpc_metadata['url'])
12+
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
13+
self.client = https_connection(self.url)
14+
15+
def _get_client_version(self):
16+
payload = {"jsonrpc": "1.0", "id": "exporter", "method": "getnetworkinfo", "params": []}
17+
response = requests.post(self.url, json=payload).json()['result']
18+
version, subversion, protocolversion = response['version'], response['subversion'], response['protocolversion']
19+
version = f"version:{version} subversion:{subversion} protocolversion:{protocolversion}"
20+
return version
21+
22+
def _get_height_and_difficulty(self):
23+
payload = {"jsonrpc": "1.0", "id": "exporter", "method": "getblockchaininfo", "params": []}
24+
response = requests.post(self.url, json=payload).json()
25+
block_height = response['result']['blocks']
26+
total_difficulty = response['result']['difficulty']
27+
return block_height, total_difficulty
1728

1829
def probe(self) -> results:
1930
results.register(self.url, self.labels_values)
31+
health_check_payload = {"jsonrpc": "1.0", "id": "exporter", "method": "getnetworkinfo"}
2032
try:
21-
payload = {"jsonrpc": "1.0", "id": "exporter", "method": "getblockchaininfo", "params": []}
22-
start = perf_counter()
23-
response = requests.post(self.url, json=payload).json()
24-
latency = (perf_counter() - start) * 1000
25-
26-
if response:
33+
if self.client.is_connected_post_check(health_check_payload):
34+
results.record_latency(self.url, self.client.get_latency(health_check_payload))
2735
results.record_health(self.url, True)
28-
results.record_latency(self.url, latency)
29-
results.record_block_height(self.url, response['result']['blocks'])
30-
results.record_total_difficulty(self.url, response['result']['difficulty'])
36+
37+
block_height, total_difficulty = self._get_height_and_difficulty()
38+
results.record_client_version(self.url, self._get_client_version())
39+
results.record_block_height(self.url, block_height)
40+
results.record_total_difficulty(self.url, total_difficulty)
3141
else:
32-
logger.error("Bad response from client {}: {}".format(strip_url(self.url), exc))
42+
logger.error("The url did not pass liveness check.", self.stripped_url)
3343
results.record_health(self.url, False)
34-
except requests.RequestException as exc:
35-
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
36-
results.record_health(self.url, False)
3744
except Exception as exc:
38-
logger.error("Health check failed for {}: {}".format(strip_url(self.url), exc))
45+
logger.error(f"{exc}", url=self.stripped_url)
3946
results.record_health(self.url, False)

src/collectors/cardano.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
1-
from settings import logger
2-
import json
3-
from collectors.ws import websocket_collector
4-
from helpers import check_protocol, strip_url, generate_labels_from_metadata
1+
from settings import logger, cfg
2+
from helpers import strip_url, generate_labels_from_metadata, key_from_json_str
53
from metrics_processor import results
4+
import json
5+
import websockets
6+
import asyncio
7+
from collectors.ws import fetch_latency
68

79

810
class cardano_collector():
911

1012
def __init__(self, rpc_metadata):
11-
self.url = rpc_metadata['url']
12-
if check_protocol(self.url, "wss") or check_protocol(self.url, "ws"):
13-
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
14-
self.ws_collector = websocket_collector(self.url)
15-
else:
16-
logger.error("Please provide https endpoint for {}".format(strip_url(self.url)))
17-
exit(1)
13+
self.url, self.stripped_url = rpc_metadata['url'], strip_url(rpc_metadata['url'])
14+
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
1815

19-
def _get_block_height(self):
20-
blk_height_payload = {
16+
async def _blockHeight(self, websocket):
17+
payload = {
2118
"type": "jsonwsp/request",
2219
"version": "1.0",
2320
"servicename": "ogmios",
@@ -26,21 +23,24 @@ def _get_block_height(self):
2623
"query": "blockHeight"
2724
}
2825
}
29-
try:
30-
return json.loads(self.ws_collector.query(blk_height_payload))['result']
31-
except KeyError as err:
32-
logger.error("Failed to fetch block height for {}, error: {}".format(strip_url(self.url), err))
26+
await websocket.send(json.dumps(payload))
27+
result = await websocket.recv()
28+
return key_from_json_str(result, "result")
3329

34-
def probe(self) -> results:
30+
async def _probe(self) -> results:
3531
results.register(self.url, self.labels_values)
3632
try:
37-
alive = self.ws_collector.get_liveliness()
38-
if alive:
33+
async with websockets.connect(self.url,
34+
open_timeout=cfg.open_timeout,
35+
close_timeout=cfg.close_timeout,
36+
ping_interval=cfg.ping_interval,
37+
ping_timeout=cfg.ping_timeout) as websocket:
38+
results.record_latency(self.url, await fetch_latency(websocket))
3939
results.record_health(self.url, True)
40-
results.record_latency(self.url, self.ws_collector.get_latency())
41-
results.record_block_height(self.url, self._get_block_height())
42-
else:
43-
results.record_health(self.url, False)
40+
results.record_block_height(self.url, await self._blockHeight(websocket))
4441
except Exception as exc:
45-
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
4642
results.record_health(self.url, False)
43+
logger.error(f"{exc}", url=self.stripped_url)
44+
45+
def probe(self):
46+
asyncio.run(self._probe())

src/collectors/conflux.py

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,54 @@
1-
from settings import cfg, logger
2-
from conflux_web3 import Web3
3-
import asyncio
4-
from helpers import strip_url, check_protocol, generate_labels_from_metadata
5-
from collectors.ws import websocket_collector
1+
from settings import logger, cfg
2+
from helpers import strip_url, generate_labels_from_metadata, hex_to_int, key_from_json_str
63
from metrics_processor import results
74

5+
from collectors.ws import subscription, fetch_latency
6+
import websockets
7+
import asyncio
8+
import json
9+
810

911
class conflux_collector():
1012

1113
def __init__(self, rpc_metadata):
12-
self.url = rpc_metadata['url']
13-
if check_protocol(self.url, "wss") or check_protocol(self.url, 'ws'):
14-
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
15-
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
16-
self.client = Web3(Web3.WebsocketProvider(self.url, websocket_timeout=cfg.response_timeout))
17-
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
18-
19-
self.ws_collector = websocket_collector(self.url,
20-
sub_payload={
21-
"method": "cfx_subscribe",
22-
"jsonrpc": "2.0",
23-
"id": 1,
24-
"params": ["newHeads"]
25-
})
26-
self.ws_collector.setDaemon(True)
27-
self.ws_collector.start()
28-
else:
29-
logger.error("Please provide wss/ws endpoint for {}".format(strip_url(self.url)))
30-
exit(1)
31-
32-
def probe(self) -> results:
14+
self.url, self.chain_id, self.stripped_url = rpc_metadata['url'], rpc_metadata['chain_id'], strip_url(
15+
rpc_metadata['url'])
16+
self.labels, self.labels_values = generate_labels_from_metadata(rpc_metadata)
17+
18+
sub_payload = {"method": "cfx_subscribe", "jsonrpc": "2.0", "id": 1, "params": ["newHeads"]}
19+
self.sub = subscription(self.url, sub_payload)
20+
self.sub.isDaemon()
21+
self.sub.start()
22+
23+
async def _cfx_clientVersion(self, websocket):
24+
payload = {"jsonrpc": "2.0", "method": "cfx_clientVersion", "params": [], "id": 1}
25+
await websocket.send(json.dumps(payload))
26+
result = await websocket.recv()
27+
return key_from_json_str(result, "result")
28+
29+
async def _cfx_epochNumber(self, websocket):
30+
payload = {"jsonrpc": "2.0", "method": "cfx_epochNumber", "params": [], "id": 1}
31+
await websocket.send(json.dumps(payload))
32+
result = await websocket.recv()
33+
return hex_to_int(key_from_json_str(result, "result"))
34+
35+
async def _probe(self) -> results:
3336
results.register(self.url, self.labels_values)
3437
try:
35-
if self.client.isConnected():
38+
async with websockets.connect(self.url,
39+
open_timeout=cfg.open_timeout,
40+
close_timeout=cfg.close_timeout,
41+
ping_interval=cfg.ping_interval,
42+
ping_timeout=cfg.ping_timeout) as websocket:
43+
results.record_latency(self.url, await fetch_latency(websocket))
3644
results.record_health(self.url, True)
37-
results.record_head_count(self.url, self.ws_collector.message_counter)
38-
results.record_disconnects(self.url, self.ws_collector.disconnects_counter)
39-
results.record_latency(self.url, self.ws_collector.get_latency())
40-
results.record_block_height(self.url, self.client.cfx.epoch_number)
41-
try:
42-
difficulty = self.client.cfx.get_block_by_hash(self.client.cfx.get_best_block_hash())['difficulty']
43-
results.record_difficulty(self.url, difficulty)
44-
except TypeError:
45-
logger.error(
46-
"RPC Endpoint sent faulty response type when querying for difficulty. This is most likely issue with RPC endpoint."
47-
)
48-
results.record_gas_price(self.url, self.client.cfx.gas_price)
49-
results.record_client_version(self.url, self.client.clientVersion)
50-
else:
51-
logger.info("Client is not connected to {}".format(strip_url(self.url)))
52-
results.record_health(self.url, False)
53-
except asyncio.exceptions.TimeoutError:
54-
logger.info("Client timed out for {}".format(strip_url(self.url)))
55-
results.record_health(self.url, False)
45+
results.record_block_height(self.url, await self._cfx_epochNumber(websocket))
46+
results.record_client_version(self.url, await self._cfx_clientVersion(websocket))
47+
results.record_head_count(self.url, self.sub.head_counter)
48+
results.record_disconnects(self.url, self.sub.disconnects)
5649
except Exception as exc:
57-
logger.error("Failed probing {} with error: {}".format(strip_url(self.url), exc))
5850
results.record_health(self.url, False)
51+
logger.error(f"{exc}", url=self.stripped_url)
52+
53+
def probe(self):
54+
asyncio.run(self._probe())

src/collectors/dogecoin.py

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)