-
Notifications
You must be signed in to change notification settings - Fork 53
Expand file tree
/
Copy pathconfig.py
More file actions
311 lines (258 loc) · 14.1 KB
/
config.py
File metadata and controls
311 lines (258 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
from configparser import ConfigParser, ExtendedInterpolation
from pathlib import Path
import sys
from snet.cli.utils.config import encrypt_secret
default_snet_folder = Path("~").expanduser().joinpath(".snet")
DEFAULT_NETWORK = "sepolia"
class Config(ConfigParser):
def __init__(self, _snet_folder=default_snet_folder, sdk_config=None):
super(Config, self).__init__(interpolation=ExtendedInterpolation(), delimiters=("=",))
self._config_file = _snet_folder.joinpath("config")
self.sdk_config = sdk_config
self.is_sdk = True if sdk_config else False
if self._config_file.exists():
with open(self._config_file) as f:
self.read_file(f)
else:
self.create_default_config()
def get_session_network_name(self):
session_network = self["session"]["network"]
self._check_section("network.%s" % session_network)
return session_network
def safe_get_session_identity_network_names(self):
if "identity" not in self["session"]:
first_identity_message_and_exit(is_sdk=self.is_sdk)
session_identity = self["session"]["identity"]
self._check_section("identity.%s" % session_identity)
session_network = self.get_session_network_name()
network = self._get_identity_section(session_identity).get("network")
if network and network != session_network:
raise Exception("Your session identity '%s' is bind to network '%s', which is different from your"
" session network '%s', please switch identity or network" % (
session_identity, network, session_network))
return session_identity, session_network
def set_session_network(self, network, out_f):
self._set_session_network(network, out_f)
if "identity" in self["session"]:
session_identity = self["session"]["identity"]
identity_network = self._get_identity_section(session_identity).get("network")
if identity_network and identity_network != network:
print("Your new session network '%s' is incompatible with your current session identity '%s' "
"(which is bind to network '%s'), please switch your identity" % (
network, session_identity, identity_network), file=out_f)
def _set_session_network(self, network, out_f):
if network not in self.get_all_networks_names():
raise Exception("Network %s is not in config" % network)
print("Switch to network: %s" % network, file=out_f)
self["session"]["network"] = network
self._persist()
def set_session_identity(self, identity, out_f):
if identity not in self.get_all_identities_names():
raise Exception('Identity "%s" is not in config' % identity)
network = self._get_identity_section(identity).get("network")
if network:
print('Identity "%s" is bind to network "%s"' % (identity, network), file=out_f)
self._set_session_network(network, out_f)
else:
print(
'Identity "%s" is not bind to any network. You should switch network manually if you need.' % identity,
file=out_f)
print("Switch to identity: %s" % (identity), file=out_f)
self["session"]["identity"] = identity
self._persist()
# session is the union of session.identity + session.network + default_ipfs_endpoint
# if value is presented in both session.identity and session.network we get it from session.identity (can happen only for default_eth_rpc_endpoint)
def get_session_field(self, key, exception_if_not_found=True):
session_identity, session_network = self.safe_get_session_identity_network_names()
rez_identity = self._get_identity_section(session_identity).get(key)
rez_network = self._get_network_section(session_network).get(key)
rez_ipfs = None
if key == "default_ipfs_endpoint":
rez_ipfs = self.get_ipfs_endpoint()
rez = rez_identity or rez_network or rez_ipfs
if not rez and exception_if_not_found:
raise Exception("Cannot find %s in the session.identity and in the session.network" % key)
return rez
def set_session_field(self, key, value, out_f):
if key == "default_ipfs_endpoint":
self.set_ipfs_endpoint(value)
print("set default_ipfs_endpoint=%s" % value, file=out_f)
elif key == "filecoin_api_key":
self.set_filecoin_key(value)
print("set filecoin_api_key=%s" % value, file=out_f)
elif key in get_session_network_keys():
session_network = self.get_session_network_name()
self.set_network_field(session_network, key, value)
print("set {}={} for network={}".format(key, value, session_network), file=out_f)
elif key in get_session_identity_keys():
session_identity, _ = self.safe_get_session_identity_network_names()
self.set_identity_field(session_identity, key, value)
print("set {}={} for identity={}".format(key, value, session_identity), file=out_f)
else:
all_keys = get_session_network_keys() + get_session_identity_keys() + ["default_ipfs_endpoint"]
raise Exception("key {} not in {}".format(key, all_keys))
def unset_session_field(self, key, out_f):
if key in get_session_network_keys_removable():
print("unset %s from network %s" % (key, self["session"]["network"]), file=out_f)
if key == "filecoin_api_key":
self.unset_filecoin_key()
else:
del self._get_network_section(self["session"]["network"])[key]
self._persist()
def session_to_dict(self):
session_identity, session_network = self.safe_get_session_identity_network_names()
show = {"session", "network.%s" % session_network, "identity.%s" % session_identity, "ipfs", "filecoin"}
response = {f: dict(self[f]) for f in show}
return response
def add_network(self, network, rpc_endpoint, default_gas_price):
network_section = "network.%s" % network
if network_section in self:
raise Exception("Network section %s already exists in config" % network)
self[network_section] = {}
self[network_section]["default_eth_rpc_endpoint"] = str(rpc_endpoint)
# TODO: find solution with default gas price
self[network_section]["default_gas_price"] = str(default_gas_price)
self._persist()
def set_network_field(self, network, key, value):
self._get_network_section(network)[key] = str(value)
self._persist()
def add_identity(self, identity_name, identity, out_f=sys.stdout, password=None):
identity_section = "identity.%s" % identity_name
if identity_section in self:
raise Exception("Identity section %s already exists in config" % identity_section)
if "network" in identity and identity["network"] not in self.get_all_networks_names():
raise Exception("Network %s is not in config" % identity["network"])
if password:
if "mnemonic" in identity:
identity["mnemonic"] = encrypt_secret(identity["mnemonic"], password)
elif "private_key" in identity:
identity["private_key"] = encrypt_secret(identity["private_key"], password)
self[identity_section] = identity
self._persist()
# switch to it, if it was the first identity
if len(self.get_all_identities_names()) == 1:
print("You've just added your first identity %s. We will automatically switch to it!" % identity_name)
self.set_session_identity(identity_name, out_f)
def set_identity_field(self, identity, key, value):
self._get_identity_section(identity)[key] = str(value)
self._persist()
def _get_network_section(self, network):
""" return section for network or identity """
return self["network.%s" % network]
def _get_identity_section(self, identity):
""" return section for the specific identity """
return self["identity.%s" % identity]
def get_ipfs_endpoint(self):
return self["ipfs"]["default_ipfs_endpoint"]
def set_ipfs_endpoint(self, ipfs_endpoint):
self["ipfs"]["default_ipfs_endpoint"] = ipfs_endpoint
self._persist()
def get_filecoin_key(self):
if "filecoin" not in self or not self["filecoin"].get("filecoin_api_key"):
raise Exception("Use [snet set filecoin_api_key <YOUR_LIGHTHOUSE_API_KEY>] to set filecoin key")
return self["filecoin"]["filecoin_api_key"]
def set_filecoin_key(self, filecoin_key: str):
if "filecoin" not in self:
self["filecoin"] = {"filecoin_api_key": ""}
self["filecoin"]["filecoin_api_key"] = filecoin_key
self._persist()
def unset_filecoin_key(self):
del self["filecoin"]["filecoin_api_key"]
self._persist()
def get_all_identities_names(self):
return [x[len("identity."):] for x in self.sections() if x.startswith("identity.")]
def get_all_networks_names(self):
return [x[len("network."):] for x in self.sections() if x.startswith("network.")]
def delete_identity(self, identity_name):
if identity_name not in self.get_all_identities_names():
raise Exception("identity_name {} does not exist".format(identity_name))
session_identity, _ = self.safe_get_session_identity_network_names()
if identity_name == session_identity:
raise Exception("identity_name {} is in use".format(identity_name))
self.remove_section("identity.{}".format(identity_name))
self._persist()
def create_default_config(self):
""" Create default configuration if config file does not exist """
# make config directory with the minimal possible permission
self._config_file.parent.mkdir(mode=0o700, exist_ok=True)
self["network.mainnet"] = {
"default_eth_rpc_endpoint": "https://mainnet.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5"
}
self["network.sepolia"] = {
"default_eth_rpc_endpoint": "https://sepolia.infura.io/v3/09027f4a13e841d48dbfefc67e7685d5",
}
self["ipfs"] = {"default_ipfs_endpoint": "/dns/ipfs.singularitynet.io/tcp/80/"}
self["filecoin"] = {"filecoin_api_key": ""}
network = self.get_param_from_sdk_config("network")
if network:
if network not in self.get_all_networks_names():
raise Exception("Network '%s' is not in config" % network)
self["session"] = {"network": network}
else:
self["session"] = {"network": DEFAULT_NETWORK}
identity_name = self.get_param_from_sdk_config("identity_name")
identity_type = self.get_param_from_sdk_config("identity_type")
if identity_name and identity_type:
identity = self.setup_identity()
self.add_identity(identity_name, identity)
self._persist()
print("We've created configuration file with default values in: %s\n" % str(self._config_file))
def _check_section(self, s):
if s not in self:
raise Exception("Config error, section %s is absent" % s)
def _persist(self):
with open(self._config_file, "w") as f:
self.write(f)
self._config_file.chmod(0o600)
def get_param_from_sdk_config(self, param: str, alternative=None):
if self.sdk_config:
return self.sdk_config.get(param, alternative)
return None
def setup_identity(self):
identity_type = self.get_param_from_sdk_config("identity_type")
private_key = self.get_param_from_sdk_config("private_key")
default_wallet_index = self.get_param_from_sdk_config("wallet_index", 0)
if not identity_type:
raise Exception("identity_type not passed")
if identity_type == "key":
identity = {
"identity_type": "key",
"private_key": private_key,
"default_wallet_index": default_wallet_index
}
# TODO: logic for other identity_type
else:
print("\nThe identity_type parameter value you passed is not supported "
"by the sdk at this time.\n")
print("The available identity types are:\n"
" - 'key' (uses a required hex-encoded private key for signing)\n\n")
exit(1)
return identity
def first_identity_message_and_exit(is_sdk=False):
if is_sdk:
print("\nPlease create your first identity by passing the 'identity_name' "
"and 'identity_type' parameters in SDK config.\n")
print("The available identity types are:\n"
" - 'key' (uses a required hex-encoded private key for signing)\n\n")
else:
print("\nPlease create your first identity by running 'snet identity create'.\n\n")
print("The available identity types are:\n"
" - 'rpc' (yields to a required ethereum json-rpc endpoint for signing using a given wallet\n"
" index)\n"
" - 'mnemonic' (uses a required bip39 mnemonic for HDWallet/account derivation and signing\n"
" using a given wallet index)\n"
" - 'key' (uses a required hex-encoded private key for signing)\n"
" - 'ledger' (yields to a required ledger nano s device for signing using a given wallet\n"
" index)\n"
"\n")
# " - 'trezor' (yields to a required trezor device for signing using a given wallet index)\n"
exit(1)
def get_session_identity_keys():
return ["default_wallet_index"]
def get_session_network_keys():
return ["current_registry_at", "current_multipartyescrow_at", "current_singularitynettoken_at",
"default_eth_rpc_endpoint"]
def get_session_network_keys_removable():
return ["current_registry_at", "current_multipartyescrow_at", "current_singularitynettoken_at", "filecoin_api_key"]
def get_session_keys():
return get_session_network_keys() + get_session_identity_keys() + ["default_ipfs_endpoint"] + ["filecoin_api_key"]