13
13
import http .client
14
14
import urllib .parse
15
15
16
+
16
17
def rpccall (node , user , method ):
17
18
url = urllib .parse .urlparse (node .url )
18
19
headers = {"Authorization" : "Basic " + str_to_b64str ('{}:{}' .format (user [0 ], user [3 ]))}
@@ -24,7 +25,12 @@ def rpccall(node, user, method):
24
25
return resp
25
26
26
27
28
+ def get_permissions (whitelist ):
29
+ return [perm for perm in whitelist .replace (" " , "" ).split ("," ) if perm ]
30
+
31
+
27
32
class RPCWhitelistTest (BitcoinTestFramework ):
33
+
28
34
def set_test_params (self ):
29
35
self .num_nodes = 1
30
36
@@ -48,7 +54,9 @@ def run_test(self):
48
54
["strangedude4" , "990c895760a70df83949e8278665e19a$8f0906f20431ff24cb9e7f5b5041e4943bdf2a5c02a19ef4960dcf45e72cde1c" , ":getblockcount, getbestblockhash" , "s7R4nG3R7H1nGZ" ],
49
55
["strangedude4" , "990c895760a70df83949e8278665e19a$8f0906f20431ff24cb9e7f5b5041e4943bdf2a5c02a19ef4960dcf45e72cde1c" , ":getblockcount" , "s7R4nG3R7H1nGZ" ],
50
56
# Testing the same permission twice
51
- ["strangedude5" , "d12c6e962d47a454f962eb41225e6ec8$2dd39635b155536d3c1a2e95d05feff87d5ba55f2d5ff975e6e997a836b717c9" , ":getblockcount,getblockcount" , "s7R4nG3R7H1nGZ" ]
57
+ ["strangedude5" , "d12c6e962d47a454f962eb41225e6ec8$2dd39635b155536d3c1a2e95d05feff87d5ba55f2d5ff975e6e997a836b717c9" , ":getblockcount,getblockcount" , "s7R4nG3R7H1nGZ" ],
58
+ # Test non-whitelisted user
59
+ ["strangedude6" , "ab02e4fb22ef4ab004cca217a49ee8d2$90dd09b08edd12d552d9d8a5ada838dcef2ac587789fa7e9c47f5990e80cdf93" , None , "password123" ]
52
60
]
53
61
# These commands shouldn't be allowed for any user to test failures
54
62
self .never_allowed = ["getnetworkinfo" ]
@@ -60,21 +68,11 @@ def run_test(self):
60
68
# Special cases
61
69
for strangedude in self .strange_users :
62
70
f .write ("rpcauth=" + strangedude [0 ] + ":" + strangedude [1 ] + "\n " )
63
- f .write ("rpcwhitelist=" + strangedude [0 ] + strangedude [2 ] + "\n " )
71
+ if strangedude [2 ] is not None :
72
+ f .write ("rpcwhitelist=" + strangedude [0 ] + strangedude [2 ] + "\n " )
64
73
self .restart_node (0 )
65
74
66
75
for user in self .users :
67
- permissions = user [2 ].replace (" " , "" ).split ("," )
68
- # Pop all empty items
69
- i = 0
70
- while i < len (permissions ):
71
- if permissions [i ] == '' :
72
- permissions .pop (i )
73
-
74
- i += 1
75
- for permission in permissions :
76
- self .log .info ("[" + user [0 ] + "]: Testing a permitted permission (" + permission + ")" )
77
- assert_equal (200 , rpccall (self .nodes [0 ], user , permission ).status )
78
76
for permission in self .never_allowed :
79
77
self .log .info ("[" + user [0 ] + "]: Testing a non permitted permission (" + permission + ")" )
80
78
assert_equal (403 , rpccall (self .nodes [0 ], user , permission ).status )
@@ -92,5 +90,56 @@ def run_test(self):
92
90
self .log .info ("Strange test 5" )
93
91
assert_equal (200 , rpccall (self .nodes [0 ], self .strange_users [4 ], "getblockcount" ).status )
94
92
93
+ self .test_users_permissions ()
94
+ self .test_rpcwhitelistdefault_0_no_permissions ()
95
+
96
+ # Replace file configurations
97
+ self .nodes [0 ].replace_in_config ([("rpcwhitelistdefault=0" , "rpcwhitelistdefault=1" )])
98
+ with open (self .nodes [0 ].datadir_path / "bitcoin.conf" , 'a' , encoding = 'utf8' ) as f :
99
+ f .write ("rpcwhitelist=__cookie__:getblockcount,getblockchaininfo,getmempoolinfo,stop\n " )
100
+ self .restart_node (0 )
101
+
102
+ # Test rpcwhitelistdefault=1
103
+ self .test_users_permissions ()
104
+ self .test_rpcwhitelistdefault_1_no_permissions ()
105
+
106
+ def test_users_permissions (self ):
107
+ """
108
+ * Permissions:
109
+ (user1): getbestblockhash,getblockcount
110
+ (user2): getblockcount
111
+ Expected result: * users can only access whitelisted methods
112
+ """
113
+ for user in self .users :
114
+ permissions = get_permissions (user [2 ])
115
+ for permission in permissions :
116
+ self .log .info ("[" + user [0 ] + "]: Testing whitelisted user permission (" + permission + ")" )
117
+ assert_equal (200 , rpccall (self .nodes [0 ], user , permission ).status )
118
+ self .log .info ("[" + user [0 ] + "]: Testing non-permitted permission: getblockchaininfo" )
119
+ assert_equal (403 , rpccall (self .nodes [0 ], user , "getblockchaininfo" ).status )
120
+
121
+ def test_rpcwhitelistdefault_0_no_permissions (self ):
122
+ """
123
+ * rpcwhitelistdefault=0
124
+ * No Permissions defined
125
+ Expected result: * strangedude6 (not whitelisted) can access any method
126
+ """
127
+ unrestricted_user = self .strange_users [6 ]
128
+ for permission in ["getbestblockhash" , "getblockchaininfo" ]:
129
+ self .log .info ("[" + unrestricted_user [0 ] + "]: Testing unrestricted user permission (" + permission + ")" )
130
+ assert_equal (200 , rpccall (self .nodes [0 ], unrestricted_user , permission ).status )
131
+
132
+ def test_rpcwhitelistdefault_1_no_permissions (self ):
133
+ """
134
+ * rpcwhitelistdefault=1
135
+ * No Permissions defined
136
+ Expected result: * strangedude6 (not whitelisted) can not access any method
137
+ """
138
+
139
+ for permission in ["getbestblockhash" , "getblockchaininfo" ]:
140
+ self .log .info ("[" + self .strange_users [6 ][0 ] + "]: Testing rpcwhitelistdefault=1 no specified permission (" + permission + ")" )
141
+ assert_equal (403 , rpccall (self .nodes [0 ], self .strange_users [6 ], permission ).status )
142
+
143
+
95
144
if __name__ == "__main__" :
96
145
RPCWhitelistTest (__file__ ).main ()
0 commit comments