-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmempool_api_async.py
More file actions
157 lines (135 loc) · 6.26 KB
/
mempool_api_async.py
File metadata and controls
157 lines (135 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from json import loads
from asyncio import run
from pandas import DataFrame, concat, merge, to_datetime, json_normalize
from async_api_client import API
"""
# nest_asyncio is needed for async code in some environments like Jupyter
# nest_asyncio requires full asyncio package, your mileage may vary
import asyncio
from nest_asyncio import apply as nest
nest()
"""
class MempoolAPI:
"""
A client for making calls to the Mempool API using httpx.
Uses asynchronous code which may result in rate limiting
(or blacklisting) on the public Mempool server:
https://mempool.space/api
Includes a get_blocks method for fetching data for a range
of blocks, returned in a pandas DataFrame object.
"""
def __init__(self, url:str='https://mempool.space/api'):
"""
Requires Mempool API server base url to initialize.
Requires general async_api_client.py API object.
param url: string, base url of Mempool API server
"""
self.url = url
self.api = API(self.url)
def _compile_blocks(
self,
blocks:list,
blocks_v1:list,
cols:dict = {
'height': 'blockheight',
'id': 'blockhash',
'previousblockhash': 'prev_blockhash',
'timestamp': 'timestamp',
'mediantime': 'blocktime_median',
'difficulty': 'difficulty',
'version': 'version',
'coinbaseRaw': 'coinbase_raw',
'merkle_root': 'merkle_root',
'nonce': 'nonce',
'bits': 'bits',
'size': 'size',
'weight': 'weight',
'tx_count': 'tx_count',
'reward': 'block_reward',
'totalFees': 'fee_sum',
'avgFee': 'fee_mean',
'avgFeeRate': 'feerate_mean',
'feerate_min': 'feerate_min',
'feerate_median': 'feerate_median',
'feerate_max': 'feerate_max',
'feerate_pctl_10': 'feerate_pctl_10',
'feerate_pctl_25': 'feerate_pctl_25',
'feerate_pctl_75': 'feerate_pctl_75',
'feerate_pctl_90': 'feerate_pctl_90',
'pool.id': 'pool_Mempool_id',
'pool.name': 'pool_Mempool_name',
'pool.slug': 'pool_Mempool_slug',
'datasource': 'datasource'
}
) -> DataFrame:
"""
Hidden method for parsing and compiling a list of blocks
fetched asynchronously from the Mempool API.
Meant only to be used within the fetch_blocks method.
Requires blocks and blocks_v1 list objects returned from
the /block/ and /v1/block/ endpoints.
Requires a cols dictionary object for reordering and renaming attributes.
param blocks: list, list of block JSON strings from /blocks/ endpoint
param blocks_v1: list, list of block_v1 JSON strings from /v1/blocks/ endpoint
param cols: dict, dictionary used to reorder and rename attributes.
"""
try:
if not len(blocks) == len(blocks_v1):
print('Error compiling blocks, attribute lists of differenth lengths')
print(f'blocks length: {len(blocks)}, blocks_v1 length: {len(block_v1)}')
return None
else:
# Merge blocks objects; mediantime is the only attribute missing from blocks_v1
blocks, blocks_v1 = [loads(b) for b in blocks], [loads(b) for b in blocks_v1]
df, df_mediantime = DataFrame(blocks_v1), DataFrame(blocks)[['id', 'mediantime']]
df = merge(df, df_mediantime, on='id')
del blocks, blocks_v1, df_mediantime
# Parse out nested objects
df_extras = json_normalize(df['extras'])
feerate_cols = [f'feerate_{p}' for p in [
'min', 'pctl_10', 'pctl_25', 'median', 'pctl_75', 'pctl_90', 'max']]
df_feerates = DataFrame(df_extras['feeRange'].to_list(), columns=feerate_cols)
df = concat([df, df_extras, df_feerates], axis=1)
del df_extras, df_feerates
# Convert datetime, add datasource, reindex by blockheight
df['timestamp'] = to_datetime(df['timestamp'], unit='s')
df['datasource'] = self.url
df.sort_values('height', ignore_index=True, inplace=True)
# Drop, reorder, rename columns
df = df[list(cols)]
df.rename(columns=cols, inplace=True)
print(f'Dataframe shape: {df.shape}')
return df
except:
raise Exception('Error compiling blocks')
async def get_blocks(self, blockheight_start:int, blockheight_end:int) -> DataFrame:
"""
Method for fetching a set of block attributes for
a range of blocks from the Mempool API, in a pandas DataFrame.
Requires blockheight_start and blockheight_end integers,
indicating the range of blocks to fetch.
param blockheight_start: int, first in sequence of blockheights to fetch
param blockheight_end: int, last in sequence of blockheights to fetch
"""
try:
current_blockheight = int(self.api.call('/blocks/tip/height'))
if not 0 <= blockheight_start <= blockheight_end <= current_blockheight:
print('Invalid blockheight! Must be in range:' + '\n' +\
f'0 <= blockheight_start <= blockheight_end <= {current_blockheight}')
return None
blockheights = range(blockheight_start, blockheight_end + 1)
blockhashes = run(self.api.async_calls([f'/block-height/{b}' for b in blockheights]))
# There are two versions of the block endpoint, neither has complete list of attributes
blocks = run(self.api.async_calls([f'/block/{b}' for b in blockhashes]))
blocks_v1 = run(self.api.async_calls([f'/v1/block/{b}' for b in blockhashes]))
df = self._compile_blocks(blocks, blocks_v1)
return df
except:
raise Exception('Error fetching block data')
"""
# Example usage:
from asyncio import run
url = 'https://mempool.space/api'
api = MempoolAPI(url)
df = run(api.get_blocks(777777, 777888))
"""