forked from domob1812/namecoin-core
-
Notifications
You must be signed in to change notification settings - Fork 151
Expand file tree
/
Copy pathinterface_rest.py
More file actions
executable file
·595 lines (471 loc) · 29.5 KB
/
interface_rest.py
File metadata and controls
executable file
·595 lines (471 loc) · 29.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
#!/usr/bin/env python3
# Copyright (c) 2014-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the REST API."""
import binascii
from decimal import Decimal
from enum import Enum
from io import BytesIO
import http.client
import json
import typing
import urllib.parse
from test_framework.messages import (
BLOCK_HEADER_SIZE,
COIN,
deser_block_spent_outputs,
)
from test_framework.script import CScript
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_greater_than,
assert_greater_than_or_equal,
)
from test_framework.wallet import (
MiniWallet,
getnewdestination,
)
from test_framework.auxpow_testing import mineAuxpowBlock
from test_framework.messages import BLOCK_HEADER_SIZE
INVALID_PARAM = "abc"
UNKNOWN_PARAM = "0000000000000000000000000000000000000000000000000000000000000000"
class ReqType(Enum):
JSON = 1
BIN = 2
HEX = 3
class RetType(Enum):
OBJ = 1
BYTES = 2
JSON = 3
def filter_output_indices_by_value(vouts, value):
for vout in vouts:
if vout['value'] == value:
yield vout['n']
class RESTTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-rest", "-blockfilterindex=1", "-valueencoding=hex"], []]
# whitelist peers to speed up tx relay / mempool sync
self.noban_tx_relay = True
self.supports_cli = False
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def test_rest_request(
self,
uri: str,
http_method: str = 'GET',
req_type: ReqType = ReqType.JSON,
body: str = '',
status: int = 200,
ret_type: RetType = RetType.JSON,
query_params: typing.Union[dict[str, typing.Any], str, None] = None,
) -> typing.Union[http.client.HTTPResponse, bytes, str, None]:
rest_uri = '/rest' + uri
if req_type in ReqType:
rest_uri += f'.{req_type.name.lower()}'
if query_params:
if isinstance(query_params, str):
rest_uri += f'?{query_params}'
else:
rest_uri += f'?{urllib.parse.urlencode(query_params)}'
conn = http.client.HTTPConnection(self.url.hostname, self.url.port)
self.log.debug(f'{http_method} {rest_uri} {body}')
if http_method == 'GET':
conn.request('GET', rest_uri)
elif http_method == 'POST':
conn.request('POST', rest_uri, body)
resp = conn.getresponse()
assert resp.status == status, f"Expected: {status}, Got: {resp.status} ({resp.reason}) - Response: {str(resp.read())}"
if ret_type == RetType.OBJ:
return resp
elif ret_type == RetType.BYTES:
return resp.read()
elif ret_type == RetType.JSON:
return json.loads(resp.read().decode('utf-8'), parse_float=Decimal)
return None
def run_test(self):
self.url = urllib.parse.urlparse(self.nodes[0].url)
self.wallet = MiniWallet(self.nodes[0])
self.log.info("Broadcast test transaction and sync nodes")
txid = self.wallet.send_to(from_node=self.nodes[0], scriptPubKey=getnewdestination()[1], amount=int(0.1 * COIN))["txid"]
self.sync_all()
self.log.info("Test the /tx URI")
json_obj = self.test_rest_request(f"/tx/{txid}")
assert_equal(json_obj['txid'], txid)
# Check hex format response
hex_response = self.test_rest_request(f"/tx/{txid}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_greater_than_or_equal(int(hex_response.getheader('content-length')),
json_obj['size']*2)
spent = (json_obj['vin'][0]['txid'], json_obj['vin'][0]['vout']) # get the vin to later check for utxo (should be spent by then)
# get n of 0.1 outpoint
n, = filter_output_indices_by_value(json_obj['vout'], Decimal('0.1'))
spending = (txid, n)
# Test /tx with an invalid and an unknown txid
resp = self.test_rest_request(uri=f"/tx/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {INVALID_PARAM}")
resp = self.test_rest_request(uri=f"/tx/{UNKNOWN_PARAM}", ret_type=RetType.OBJ, status=404)
assert_equal(resp.read().decode('utf-8').rstrip(), f"{UNKNOWN_PARAM} not found")
self.log.info("Query an unspent TXO using the /getutxos URI")
self.generate(self.wallet, 1)
bb_hash = self.nodes[0].getbestblockhash()
# Check chainTip response
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}")
assert_equal(json_obj['chaintipHash'], bb_hash)
# Make sure there is one utxo
assert_equal(len(json_obj['utxos']), 1)
assert_equal(json_obj['utxos'][0]['value'], Decimal('0.1'))
self.log.info("Query a spent TXO using the /getutxos URI")
json_obj = self.test_rest_request(f"/getutxos/{spent[0]}-{spent[1]}")
# Check chainTip response
assert_equal(json_obj['chaintipHash'], bb_hash)
# Make sure there is no utxo in the response because this outpoint has been spent
assert_equal(len(json_obj['utxos']), 0)
# Check bitmap
assert_equal(json_obj['bitmap'], "0")
self.log.info("Query two TXOs using the /getutxos URI")
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}/{spent[0]}-{spent[1]}")
assert_equal(len(json_obj['utxos']), 1)
assert_equal(json_obj['bitmap'], "10")
self.log.info("Query the TXOs using the /getutxos URI with a binary response")
bin_request = b'\x01\x02'
for txid, n in [spending, spent]:
bin_request += bytes.fromhex(txid)
bin_request += n.to_bytes(4, 'little')
bin_response = self.test_rest_request("/getutxos", http_method='POST', req_type=ReqType.BIN, body=bin_request, ret_type=RetType.BYTES)
chain_height = int.from_bytes(bin_response[0:4], 'little')
response_hash = bin_response[4:36][::-1].hex()
assert_equal(bb_hash, response_hash) # check if getutxo's chaintip during calculation was fine
assert_equal(chain_height, 201) # chain height must be 201 (pre-mined chain [200] + generated block [1])
self.log.info("Test the /getutxos URI with and without /checkmempool")
# Create a transaction, check that it's found with /checkmempool, but
# not found without. Then confirm the transaction and check that it's
# found with or without /checkmempool.
# do a tx and don't sync
txid = self.wallet.send_to(from_node=self.nodes[0], scriptPubKey=getnewdestination()[1], amount=int(0.1 * COIN))["txid"]
json_obj = self.test_rest_request(f"/tx/{txid}")
# get the spent output to later check for utxo (should be spent by then)
spent = (json_obj['vin'][0]['txid'], json_obj['vin'][0]['vout'])
# get n of 0.1 outpoint
n, = filter_output_indices_by_value(json_obj['vout'], Decimal('0.1'))
spending = (txid, n)
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 0)
json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 1)
json_obj = self.test_rest_request(f"/getutxos/{spent[0]}-{spent[1]}")
assert_equal(len(json_obj['utxos']), 1)
json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spent[0]}-{spent[1]}")
assert_equal(len(json_obj['utxos']), 0)
self.generate(self.nodes[0], 1)
json_obj = self.test_rest_request(f"/getutxos/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 1)
json_obj = self.test_rest_request(f"/getutxos/checkmempool/{spending[0]}-{spending[1]}")
assert_equal(len(json_obj['utxos']), 1)
self.log.info("Check some invalid requests")
self.test_rest_request("/getutxos", http_method='POST', req_type=ReqType.JSON, body='{"checkmempool', status=400, ret_type=RetType.OBJ)
self.test_rest_request("/getutxos", http_method='POST', req_type=ReqType.BIN, body='{"checkmempool', status=400, ret_type=RetType.OBJ)
self.test_rest_request("/getutxos/checkmempool", http_method='POST', req_type=ReqType.JSON, status=400, ret_type=RetType.OBJ)
self.test_rest_request(f"/getutxos/{spending[0]}_+1", ret_type=RetType.OBJ, status=400)
self.test_rest_request(f"/getutxos/{spending[0]}-+1", ret_type=RetType.OBJ, status=400)
self.test_rest_request(f"/getutxos/{spending[0]}--1", ret_type=RetType.OBJ, status=400)
self.test_rest_request(f"/getutxos/{spending[0]}aa-1234", ret_type=RetType.OBJ, status=400)
self.test_rest_request("/getutxos/aa-1234", ret_type=RetType.OBJ, status=400)
# Test limits
long_uri = '/'.join([f"{txid}-{n_}" for n_ in range(20)])
self.test_rest_request(f"/getutxos/checkmempool/{long_uri}", http_method='POST', status=400, ret_type=RetType.OBJ)
long_uri = '/'.join([f'{txid}-{n_}' for n_ in range(15)])
self.test_rest_request(f"/getutxos/checkmempool/{long_uri}", http_method='POST', status=200)
mineAuxpowBlock(self.nodes[0], getnewdestination()[2]) # generate block to not affect upcoming tests
self.sync_all()
bb_hash = self.nodes[0].getbestblockhash()
self.log.info("Test the /block, /blockhashbyheight, /headers, and /blockfilterheaders URIs")
bb_hash = self.nodes[0].getbestblockhash()
# Check result if block does not exists
assert_equal(self.test_rest_request(f"/headers/{UNKNOWN_PARAM}", query_params={"count": 1}), [])
self.test_rest_request(f"/block/{UNKNOWN_PARAM}", status=404, ret_type=RetType.OBJ)
# Check result if block is not in the active chain
self.nodes[0].invalidateblock(bb_hash)
assert_equal(self.test_rest_request(f'/headers/{bb_hash}', query_params={'count': 1}), [])
self.test_rest_request(f'/block/{bb_hash}')
self.nodes[0].reconsiderblock(bb_hash)
# Check binary format
response = self.test_rest_request(f"/block/{bb_hash}", req_type=ReqType.BIN, ret_type=RetType.OBJ)
assert_greater_than(int(response.getheader('content-length')), BLOCK_HEADER_SIZE)
response_bytes = response.read()
# Compare with block header
response_header = self.test_rest_request(f"/headers/{bb_hash}", req_type=ReqType.BIN, ret_type=RetType.OBJ, query_params={"count": 1})
headerLen = int(response_header.getheader('content-length'))
assert_greater_than(headerLen, BLOCK_HEADER_SIZE)
response_header_bytes = response_header.read()
assert_equal(response_bytes[:headerLen], response_header_bytes)
# Check block hex format
response_hex = self.test_rest_request(f"/block/{bb_hash}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_greater_than(int(response_hex.getheader('content-length')), BLOCK_HEADER_SIZE*2)
response_hex_bytes = response_hex.read().strip(b'\n')
assert_equal(response_bytes.hex().encode(), response_hex_bytes)
# Compare with hex block header
response_header_hex = self.test_rest_request(f"/headers/{bb_hash}", req_type=ReqType.HEX, ret_type=RetType.OBJ, query_params={"count": 1})
assert_greater_than(int(response_header_hex.getheader('content-length')), BLOCK_HEADER_SIZE*2)
response_header_hex_bytes = response_header_hex.read().strip()
headerLen = len (response_header_hex_bytes) // 2
assert_equal(response_bytes[:headerLen].hex().encode(), response_header_hex_bytes)
# Check json format
block_json_obj = self.test_rest_request(f"/block/{bb_hash}")
assert_equal(block_json_obj['hash'], bb_hash)
assert_equal(self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}")['blockhash'], bb_hash)
# Check hex/bin format
resp_hex = self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}", req_type=ReqType.HEX, ret_type=RetType.OBJ)
assert_equal(resp_hex.read().decode('utf-8').rstrip(), bb_hash)
resp_bytes = self.test_rest_request(f"/blockhashbyheight/{block_json_obj['height']}", req_type=ReqType.BIN, ret_type=RetType.BYTES)
blockhash = resp_bytes[::-1].hex()
assert_equal(blockhash, bb_hash)
# Check invalid blockhashbyheight requests
resp = self.test_rest_request(f"/blockhashbyheight/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid height: {INVALID_PARAM}")
resp = self.test_rest_request("/blockhashbyheight/+1", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), "Invalid height: +1")
resp = self.test_rest_request("/blockhashbyheight/1000000", ret_type=RetType.OBJ, status=404)
assert_equal(resp.read().decode('utf-8').rstrip(), "Block height out of range")
resp = self.test_rest_request("/blockhashbyheight/-1", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), "Invalid height: -1")
self.test_rest_request("/blockhashbyheight/", ret_type=RetType.OBJ, status=400)
# Compare with json block header
json_obj = self.test_rest_request(f"/headers/{bb_hash}", query_params={"count": 1})
assert_equal(len(json_obj), 1) # ensure that there is one header in the json response
assert_equal(json_obj[0]['hash'], bb_hash) # request/response hash should be the same
# Check invalid uri (% symbol at the end of the request)
for invalid_uri in [f"/headers/{bb_hash}%", f"/blockfilterheaders/basic/{bb_hash}%", "/mempool/contents.json?%"]:
resp = self.test_rest_request(invalid_uri, ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), "URI parsing failed, it likely contained RFC 3986 invalid characters")
# Compare with normal RPC block response
rpc_block_json = self.nodes[0].getblock(bb_hash)
for key in ['hash', 'confirmations', 'height', 'version', 'merkleroot', 'time', 'nonce', 'bits', 'target', 'difficulty', 'chainwork', 'previousblockhash']:
assert_equal(json_obj[0][key], rpc_block_json[key])
# See if we can get 5 headers in one response
self.generate(self.nodes[1], 5)
expected_filter = {
'basic block filter index': {'synced': True, 'best_block_height': 208},
}
self.wait_until(lambda: self.nodes[0].getindexinfo() == expected_filter)
json_obj = self.test_rest_request(f"/headers/{bb_hash}", query_params={"count": 5})
assert_equal(len(json_obj), 5) # now we should have 5 header objects
json_obj = self.test_rest_request(f"/blockfilterheaders/basic/{bb_hash}", query_params={"count": 5})
first_filter_header = json_obj[0]
assert_equal(len(json_obj), 5) # now we should have 5 filter header objects
json_obj = self.test_rest_request(f"/blockfilter/basic/{bb_hash}")
# Compare with normal RPC blockfilter response
rpc_blockfilter = self.nodes[0].getblockfilter(bb_hash)
assert_equal(first_filter_header, rpc_blockfilter['header'])
assert_equal(json_obj['filter'], rpc_blockfilter['filter'])
# Test blockfilterheaders with an invalid hash and filtertype
resp = self.test_rest_request(f"/blockfilterheaders/{INVALID_PARAM}/{bb_hash}", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), f"Unknown filtertype {INVALID_PARAM}")
resp = self.test_rest_request(f"/blockfilterheaders/basic/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {INVALID_PARAM}")
# Test number parsing
for num in ['5a', '-5', '0', '2001', '99999999999999999999999999999999999']:
assert_equal(
bytes(f'Header count is invalid or out of acceptable range (1-2000): {num}\r\n', 'ascii'),
self.test_rest_request(f"/headers/{bb_hash}", ret_type=RetType.BYTES, status=400, query_params={"count": num}),
)
self.log.info("Test tx inclusion in the /mempool and /block URIs")
# Make 3 chained txs and mine them on node 1
txs = []
input_txid = txid
for _ in range(3):
utxo_to_spend = self.wallet.get_utxo(txid=input_txid)
txs.append(self.wallet.send_self_transfer(from_node=self.nodes[0], utxo_to_spend=utxo_to_spend)['txid'])
input_txid = txs[-1]
self.sync_all()
# Check that there are exactly 3 transactions in the TX memory pool before generating the block
json_obj = self.test_rest_request("/mempool/info")
assert_equal(json_obj['size'], 3)
# the size of the memory pool should be greater than 3x ~100 bytes
assert_greater_than(json_obj['bytes'], 300)
mempool_info = self.nodes[0].getmempoolinfo()
# pop unstable unbroadcastcount before check
for obj in [json_obj, mempool_info]:
obj.pop("unbroadcastcount")
assert_equal(json_obj, mempool_info)
# Check that there are our submitted transactions in the TX memory pool
json_obj = self.test_rest_request("/mempool/contents")
raw_mempool_verbose = self.nodes[0].getrawmempool(verbose=True)
assert_equal(json_obj, raw_mempool_verbose)
for i, tx in enumerate(txs):
assert tx in json_obj
assert_equal(json_obj[tx]['spentby'], txs[i + 1:i + 2])
assert_equal(json_obj[tx]['depends'], txs[i - 1:i])
# Check the mempool response for explicit parameters
json_obj = self.test_rest_request("/mempool/contents", query_params={"verbose": "true", "mempool_sequence": "false"})
assert_equal(json_obj, raw_mempool_verbose)
# Check the mempool response for not verbose
json_obj = self.test_rest_request("/mempool/contents", query_params={"verbose": "false"})
raw_mempool = self.nodes[0].getrawmempool(verbose=False)
assert_equal(json_obj, raw_mempool)
# Check the mempool response for sequence
json_obj = self.test_rest_request("/mempool/contents", query_params={"verbose": "false", "mempool_sequence": "true"})
raw_mempool = self.nodes[0].getrawmempool(verbose=False, mempool_sequence=True)
assert_equal(json_obj, raw_mempool)
# Check for error response if verbose=true and mempool_sequence=true
resp = self.test_rest_request("/mempool/contents", ret_type=RetType.OBJ, status=400, query_params={"verbose": "true", "mempool_sequence": "true"})
assert_equal(resp.read().decode('utf-8').strip(), 'Verbose results cannot contain mempool sequence values. (hint: set "verbose=false")')
# Check for error response if verbose is not "true" or "false"
resp = self.test_rest_request("/mempool/contents", ret_type=RetType.OBJ, status=400, query_params={"verbose": "TRUE"})
assert_equal(resp.read().decode('utf-8').strip(), 'The "verbose" query parameter must be either "true" or "false".')
# Check for error response if mempool_sequence is not "true" or "false"
resp = self.test_rest_request("/mempool/contents", ret_type=RetType.OBJ, status=400, query_params={"verbose": "false", "mempool_sequence": "TRUE"})
assert_equal(resp.read().decode('utf-8').strip(), 'The "mempool_sequence" query parameter must be either "true" or "false".')
# Now mine the transactions
newblockhash = self.generate(self.nodes[1], 1)
# Check if the 3 tx show up in the new block
json_obj = self.test_rest_request(f"/block/{newblockhash[0]}")
non_coinbase_txs = {tx['txid'] for tx in json_obj['tx']
if 'coinbase' not in tx['vin'][0]}
assert_equal(non_coinbase_txs, set(txs))
# Verify that the non-coinbase tx has "prevout" key set
for tx_obj in json_obj["tx"]:
for vin in tx_obj["vin"]:
if "coinbase" not in vin:
assert "prevout" in vin
assert_equal(vin["prevout"]["generated"], False)
else:
assert "prevout" not in vin
# Check the same but without tx details
json_obj = self.test_rest_request(f"/block/notxdetails/{newblockhash[0]}")
for tx in txs:
assert tx in json_obj['tx']
self.log.info("Test the /chaininfo URI")
bb_hash = self.nodes[0].getbestblockhash()
json_obj = self.test_rest_request("/chaininfo")
assert_equal(json_obj['bestblockhash'], bb_hash)
# Compare with normal RPC getblockchaininfo response
blockchain_info = self.nodes[0].getblockchaininfo()
assert_equal(blockchain_info, json_obj)
# Test compatibility of deprecated and newer endpoints
self.log.info("Test compatibility of deprecated and newer endpoints")
assert_equal(self.test_rest_request(f"/headers/{bb_hash}", query_params={"count": 1}), self.test_rest_request(f"/headers/1/{bb_hash}"))
assert_equal(self.test_rest_request(f"/blockfilterheaders/basic/{bb_hash}", query_params={"count": 1}), self.test_rest_request(f"/blockfilterheaders/basic/5/{bb_hash}"))
self.log.info("Test the /spenttxouts URI")
block_count = self.nodes[0].getblockcount()
for height in range(0, block_count + 1):
blockhash = self.nodes[0].getblockhash(height)
spent_bin = self.test_rest_request(f"/spenttxouts/{blockhash}", req_type=ReqType.BIN, ret_type=RetType.BYTES)
spent_hex = self.test_rest_request(f"/spenttxouts/{blockhash}", req_type=ReqType.HEX, ret_type=RetType.BYTES)
spent_json = self.test_rest_request(f"/spenttxouts/{blockhash}", req_type=ReqType.JSON, ret_type=RetType.JSON)
assert_equal(bytes.fromhex(spent_hex.decode()), spent_bin)
spent = deser_block_spent_outputs(BytesIO(spent_bin))
block = self.nodes[0].getblock(blockhash, 3) # return prevout for each input
assert_equal(len(spent), len(block["tx"]))
assert_equal(len(spent_json), len(block["tx"]))
for i, tx in enumerate(block["tx"]):
prevouts = [txin["prevout"] for txin in tx["vin"] if "coinbase" not in txin]
# compare with `getblock` JSON output (coinbase tx has no prevouts)
actual = [(txout.scriptPubKey.hex(), Decimal(txout.nValue) / COIN) for txout in spent[i]]
expected = [(p["scriptPubKey"]["hex"], p["value"]) for p in prevouts]
assert_equal(expected, actual)
# also compare JSON format
actual = [(prevout["scriptPubKey"], prevout["value"]) for prevout in spent_json[i]]
expected = [(p["scriptPubKey"], p["value"]) for p in prevouts]
assert_equal(expected, actual)
self.log.info("Test the /blockpart URI")
blockhash = self.nodes[0].getbestblockhash()
block_bin = self.test_rest_request(f"/block/{blockhash}", req_type=ReqType.BIN, ret_type=RetType.BYTES)
for req_type in (ReqType.BIN, ReqType.HEX):
def get_block_part(status: int = 200, **kwargs):
resp = self.test_rest_request(f"/blockpart/{blockhash}", status=status,
req_type=req_type, ret_type=RetType.BYTES, **kwargs)
assert isinstance(resp, bytes)
if req_type is ReqType.HEX and status == 200:
resp = bytes.fromhex(resp.decode().strip())
return resp
assert_equal(block_bin, get_block_part(query_params={"offset": 0, "size": len(block_bin)}))
assert len(block_bin) >= 500
assert_equal(block_bin[20:320], get_block_part(query_params={"offset": 20, "size": 300}))
assert_equal(block_bin[-5:], get_block_part(query_params={"offset": len(block_bin) - 5, "size": 5}))
get_block_part(status=400, query_params={"offset": 10})
get_block_part(status=400, query_params={"size": 100})
get_block_part(status=400, query_params={"offset": "x"})
get_block_part(status=400, query_params={"size": "y"})
get_block_part(status=400, query_params={"offset": "x", "size": "y"})
assert get_block_part(status=400, query_params="%XY").decode("utf-8").startswith("URI parsing failed")
get_block_part(status=400, query_params={"offset": 0, "size": 0})
get_block_part(status=400, query_params={"offset": len(block_bin), "size": 0})
get_block_part(status=400, query_params={"offset": len(block_bin), "size": 1})
get_block_part(status=400, query_params={"offset": len(block_bin) + 1, "size": 1})
get_block_part(status=400, query_params={"offset": 0, "size": len(block_bin) + 1})
res = self.test_rest_request(f"/blockpart/{blockhash}", status=400, req_type=ReqType.BIN, ret_type=RetType.OBJ)
assert res.read().decode().startswith("Block part offset missing or invalid")
res = self.test_rest_request(f"/blockpart/{blockhash}", query_params={"offset":0, "size":1}, status=400, req_type=ReqType.JSON, ret_type=RetType.OBJ)
assert res.read().decode().startswith("JSON output is not supported for this request type")
self.log.info("Missing block data should cause REST API to fail")
self.test_rest_request(f"/block/{blockhash}", status=200, req_type=ReqType.BIN, ret_type=RetType.OBJ)
self.test_rest_request(f"/blockpart/{blockhash}", query_params={"offset": 0, "size": 1}, status=200, req_type=ReqType.BIN, ret_type=RetType.OBJ)
blk_files = list(self.nodes[0].blocks_path.glob("blk*.dat"))
for blk_file in blk_files:
blk_file.rename(blk_file.with_suffix('.bkp'))
self.test_rest_request(f"/block/{blockhash}", status=500, req_type=ReqType.BIN, ret_type=RetType.OBJ)
self.test_rest_request(f"/blockpart/{blockhash}", query_params={"offset": 0, "size": 1}, status=500, req_type=ReqType.BIN, ret_type=RetType.OBJ)
for blk_file in blk_files:
blk_file.with_suffix('.bkp').rename(blk_file)
self.log.info("Test the /deploymentinfo URI")
deployment_info = self.nodes[0].getdeploymentinfo()
assert_equal(deployment_info, self.test_rest_request('/deploymentinfo'))
previous_bb_hash = self.nodes[0].getblockhash(self.nodes[0].getblockcount() - 1)
deployment_info = self.nodes[0].getdeploymentinfo(previous_bb_hash)
assert_equal(deployment_info, self.test_rest_request(f"/deploymentinfo/{previous_bb_hash}"))
non_existing_blockhash = '42759cde25462784395a337460bde75f58e73d3f08bd31fdc3507cbac856a2c4'
resp = self.test_rest_request(f'/deploymentinfo/{non_existing_blockhash}', ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), "Block not found")
resp = self.test_rest_request(f"/deploymentinfo/{INVALID_PARAM}", ret_type=RetType.OBJ, status=400)
assert_equal(resp.read().decode('utf-8').rstrip(), f"Invalid hash: {INVALID_PARAM}")
# Test name handling.
self.log.info("Test the /name URI")
self.name_tests()
def name_tests(self):
"""
Run REST tests specific to names.
"""
# Start by registering a test name.
name = "d/some weird.name++"
binData = bytearray ([0, 1]).decode ("ascii")
value = "correct value\nwith newlines\nand binary: " + binData
hexValue = value.encode ('ascii').hex ()
newData = self.nodes[0].name_new(name)
self.generate (self.nodes[0], 10)
self.nodes[0].name_firstupdate(name, newData[1], newData[0], hexValue)
self.generate (self.nodes[0], 5)
nameData = self.nodes[0].name_show(name)
assert_equal(nameData['name_encoding'], 'ascii')
assert_equal(nameData['name'], name)
assert_equal(nameData['value_encoding'], 'hex')
assert_equal(nameData['value'], hexValue)
# The REST interface explicitly does not include the 'ismine' field
# that the RPC interface has. Thus remove the field for the comparison
# below.
del nameData['ismine']
# Different variants of the encoded name that should all work.
variants = [urllib.parse.quote_plus(name), "d/some+weird.name%2b%2B"]
for encName in variants:
query = '/name/' + encName
# Query JSON data of the name.
data = self.test_rest_request (query, req_type=ReqType.JSON)
assert_equal(data, nameData)
# Query plain value.
res = self.test_rest_request (query, req_type=ReqType.BIN,
ret_type=RetType.BYTES)
assert_equal(res.decode("ascii"), value)
# Query hex value.
res = self.test_rest_request (query, req_type=ReqType.HEX,
ret_type=RetType.BYTES)
assert_equal(res.decode ('ascii'), hexValue + "\n")
# Check invalid encoded names.
invalid = ['%', '%2', '%2x', '%x2']
for encName in invalid:
query = '/name/' + encName
res = self.test_rest_request (query, status=http.client.BAD_REQUEST,
req_type=ReqType.BIN,
ret_type=RetType.OBJ)
if __name__ == '__main__':
RESTTest(__file__).main()