-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcli.py
More file actions
executable file
·339 lines (295 loc) · 14.2 KB
/
cli.py
File metadata and controls
executable file
·339 lines (295 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python3
# Copyright (c) 2023 - for information on the respective copyright owner
# see the NOTICE file and/or the repository
# https://github.com/boschresearch/py-cx-ids
#
# SPDX-License-Identifier: Apache-2.0
from uuid import uuid4
import click
import os
import sys
import json
from datetime import datetime
import requests
from pycxids.cli.cli_settings import *
from pycxids.core.auth.auth_factory import IatpAuthFactory
from pycxids.core.http_binding import dsp_client_consumer_api
from pycxids.core.http_binding.models import ContractAgreementMessage, ContractNegotiation, DataAddress, TransferProcess, TransferStartMessage
from pycxids.cx.services import BdrsDirectory
from pycxids.edc.settings import CALLBACK_SERVICE_BASE_URL
from pycxids.iatp.iatp import CredentialService
from pycxids.portal.api import Portal
from pycxids.portal.settings import PORTAL_BASE_URL, PORTAL_OAUTH_TOKEN_ENDPOINT
from pycxids.utils.storage import FileStorageEngine
@click.group('A cli to interact with IDS / EDC data providers')
def cli():
pass
@cli.group('config', help='Configure the CLI')
def config():
pass
@config.command('use', help='Select config to use')
@click.argument('config_name', default='')
def cli_config_select(config_name: str):
if config_name:
config_storage.put('use', config_name)
else:
config_name = config_storage.get('use')
print(config_name)
@config.command('list', help='List available configurations')
@click.argument('config_name', default='')
def cli_config_list(config_name: str):
configs = config_storage.get('configs', {})
if config_name:
config = configs.get(config_name)
print(json.dumps(config, indent=4))
else:
for conf in configs.keys():
print(conf)
@config.command('add', help='Add configuration')
@click.argument('config_name')
def cli_config_add(config_name: str):
configs = config_storage.get('configs', {})
config = configs.get(config_name, {})
config['STS_CLIENT_ID'] = click.prompt("STS_CLIENT_ID:",
default=config.get('STS_CLIENT_ID', "BPNLconsumer"))
config['STS_CLIENT_SECRET_FN'] = click.prompt("STS_CLIENT_SECRET_FN:",
default=config.get('STS_CLIENT_SECRET_FN', "xxx"))
config['STS_TOKEN_ENDPOINT'] = click.prompt("STS_TOKEN_ENDPOINT:",
default=config.get('STS_TOKEN_ENDPOINT', "http://dev:13000/dummy/token"))
config['STS_BASE_URL'] = click.prompt("STS_BASE_URL:",
default=config.get('STS_BASE_URL', "http://dev:13000/sts"))
config['CS_BASE_URL'] = click.prompt("CS_BASE_URL:",
default=config.get('CS_BASE_URL', "http://dev:13000/cs"))
config['OUR_BPN'] = click.prompt("OUR_BPN:",
default=config.get('OUR_BPN', "BPNLconsumer"))
config['OUR_DID'] = click.prompt("OUR_DID:",
default=config.get('OUR_DID', "did:web:dev%3A13000:BPNLconsumer"))
config['BDRS_BASE_URL'] = click.prompt("BDRS_BASE_URL:",
default=config.get('BDRS_BASE_URL', "http://dev:13000/bdrs"))
# Portal
config['PORTAL_BASE_URL'] = click.prompt("PORTAL_BASE_URL:",
default=config.get('PORTAL_BASE_URL', "http://dev:13000/portal"))
config['PORTAL_TOKEN_ENDPOINT'] = click.prompt("PORTAL_TOKEN_ENDPOINT:",
default=config.get('PORTAL_TOKEN_ENDPOINT', "http://dev:13000/dummy/token"))
config['PORTAL_CLIENT_ID'] = click.prompt("PORTAL_CLIENT_ID:",
default=config.get('PORTAL_CLIENT_ID', "BPNLconsumer"))
config['PORTAL_CLIENT_SECRET_FN'] = click.prompt("PORTAL_CLIENT_SECRET_FN:",
default=config.get('PORTAL_CLIENT_SECRET_FN', ""))
configs[config_name] = config
config_storage.put('configs', configs)
config_storage.put('use', config_name)
click.echo("")
click.echo("Configuration done. This config will be used by default now.")
click.echo("")
def get_my_config():
use_config = config_storage.get('use')
myconfig = config_storage.get('configs', {}).get(use_config)
return myconfig
def get_DspClient(provider_base_url:str, bearer_scopes: list = None, provider_did: str = None):
"""
Depending on the setting, we return a client api with IATP authentication
"""
myconfig = get_my_config()
auth_factory = None
secret_fn = myconfig.get('STS_CLIENT_SECRET_FN')
secret = ''
if os.path.exists(secret_fn):
with open(secret_fn, 'rt') as f:
secret = f.read()
# else: TODO: use logging to not print to stdout because cli uses pipes, e.g. with ./cli.py assets
# print(f"secret_fn does not exist: {secret_fn} Using dummy value.")
auth_factory = IatpAuthFactory(
base_url=myconfig.get('STS_BASE_URL'),
client_id=myconfig.get('STS_CLIENT_ID'),
client_secret=secret,
token_url=myconfig.get('STS_TOKEN_ENDPOINT'),
our_did=myconfig.get('OUR_DID'),
)
return dsp_client_consumer_api.DspClientConsumerApi(provider_base_url=provider_base_url, auth=auth_factory, bearer_scopes=bearer_scopes, provider_did=provider_did)
@cli.command('catalog')
@click.option('-o', '--out-fn', default='')
@click.option('--overwrite-edc-endpoint', default='')
@click.argument('bpn', default=None)
def fetch_catalog_cli(bpn: str, out_fn, overwrite_edc_endpoint: str):
"""
For simplicity, only tractusx-edc 0.7.x and higher supported.
"""
storage = FileStorageEngine(PARTICIPANTS_SETTINGS_CACHE)
participant_settings = storage.get(bpn)
edc_endpoints = participant_settings.get('edc_endpoints')
provider_did = participant_settings.get('did')
provider_ids_endpoint = ''
if len(edc_endpoints) > 0:
# TODO: what if more than 1?
provider_ids_endpoint = edc_endpoints[0]
if overwrite_edc_endpoint:
provider_ids_endpoint = overwrite_edc_endpoint
api = get_DspClient(provider_base_url=provider_ids_endpoint, provider_did=provider_did)
catalog = api.fetch_catalog(out_fn=out_fn)
catalog_str = json.dumps(catalog, indent=True)
if out_fn:
with open(out_fn, 'wt') as f:
f.write(catalog_str)
print(catalog_str)
@cli.command('catalogs', help=f"Fetch all catalogs from {PARTICIPANTS_SETTINGS_CACHE}")
@click.option('-o', '--out-dir', default='./catalogs/')
def fetch_catalogs_cli(out_dir: str):
"""
Fetch all BPN's all endponits into the given directory
"""
if not os.path.exists(out_dir):
os.mkdir(out_dir)
storage = FileStorageEngine(PARTICIPANTS_SETTINGS_CACHE)
all = storage.get_all()
for bpn, bpn_settings in all.items():
edc_endpoints = bpn_settings.get('edc_endpoints')
for idx, endpoint in enumerate(edc_endpoints):
# every BPN can have multiple EDC endpoints and thus, catalogs
provider_did = bpn_settings.get('did')
api = get_DspClient(provider_base_url=endpoint, provider_did=provider_did)
out_fn = os.path.join(out_dir, f"{bpn}_{idx}.json")
catalog = api.fetch_catalog()
if not catalog:
continue
catalog_str = json.dumps(catalog, indent=True)
with open(out_fn, 'wt') as f:
f.write(catalog_str)
@cli.command('assets', help="List asset:prop:id list from a given catalog via filename or stdin")
@click.argument('catalog_filename', default='')
def list_assets_from_catalog(catalog_filename: str):
catalog_str = ''
if catalog_filename:
# read catalog content from file
if os.path.isfile(catalog_filename):
with(open(catalog_filename, 'r')) as f:
catalog_str = f.read()
else:
# stdtin case
catalog_str = click.get_text_stream('stdin').read().strip()
catalog = json.loads(catalog_str)
asset_ids = None
# check if DSP catalog result:
datasets = catalog.get('dcat:dataset', None)
if datasets:
# DSP case
asset_ids = dsp_client_consumer_api.DspClientConsumerApi.get_asset_ids_from_catalog(catalog=catalog)
else:
assert "No datasets found. Old multipart catalog? Or just empty?"
print('\n'.join(asset_ids))
@cli.command('fetch', help="Fetch a given asset id")
@click.option('-r', '--raw-data', default=False, is_flag=True)
@click.option('-o', '--out-fn', default='')
@click.option('--overwrite-edc-endpoint', default='')
@click.option('--agreement-id', default=None, help='Reuse existing agreement ID and save some negotiation time.')
@click.argument('bpn', default=None)
@click.argument('dataset_id', default='')
def fetch_asset_cli(bpn: str, out_fn, overwrite_edc_endpoint: str, dataset_id: str, raw_data:bool, agreement_id: str):
"""
For simplicity, only tractusx-edc 0.7.x and higher supported.
"""
before = datetime.now().timestamp()
config = get_my_config()
assert config, "Please add config first"
storage = FileStorageEngine(PARTICIPANTS_SETTINGS_CACHE)
participant_settings = storage.get(bpn)
edc_endpoints = participant_settings.get('edc_endpoints')
provider_did = participant_settings.get('did')
provider_ids_endpoint = ''
if len(edc_endpoints) > 0:
# TODO: what if more than 1?
provider_ids_endpoint = edc_endpoints[0]
if overwrite_edc_endpoint:
provider_ids_endpoint = overwrite_edc_endpoint
api = get_DspClient(provider_base_url=provider_ids_endpoint, provider_did=provider_did)
offers = api.get_offers_for_dataset(dataset_id=dataset_id)
print(json.dumps(offers))
#consumer_callback_base_url = config.get('CONSUMER_CONNECTOR_BASE_URL')
callback_uuid_negotiaion = str(uuid4())
consumer_callback_base_url_negotiation = f"{CALLBACK_SERVICE_BASE_URL}/{callback_uuid_negotiaion}"
# TODO catalog_base_url should not be used here, but rather the endpoint from the catalog result!
negotiation:ContractNegotiation = api.negotiation(dataset_id=dataset_id, offer=offers[0], consumer_callback_base_url=consumer_callback_base_url_negotiation)
print(negotiation)
# and now get the message from the receiver api (proprietary api)
agreement_message:ContractAgreementMessage = api.negotiation_callback_result(id=negotiation.dspace_consumer_pid, consumer_callback_base_url=consumer_callback_base_url_negotiation)
print(agreement_message)
assert agreement_message.dspace_agreement.field_id, "No agreement ID."
assert agreement_message.dspace_consumer_pid == negotiation.dspace_consumer_pid, "Agreement and Negoation consumePid not equal!"
callback_uuid_transfer = str(uuid4())
consumer_callback_base_url_transfer = f"{CALLBACK_SERVICE_BASE_URL}/{callback_uuid_transfer}"
transfer:TransferProcess = api.transfer(agreement_id_received=agreement_message.dspace_agreement.field_id, consumer_pid=agreement_message.dspace_consumer_pid, consumer_callback_base_url=consumer_callback_base_url_transfer)
print(transfer)
transfer_start_message:TransferStartMessage = api.transfer_callback_result(id=transfer.dspace_consumer_pid, consumer_callback_base_url=consumer_callback_base_url_transfer)
assert transfer_start_message
print(transfer_start_message)
data_address_received: DataAddress = transfer_start_message.dspace_data_address
authorization = dsp_client_consumer_api.DspClientConsumerApi.get_data_address_authorization(data_address_received)
endpoint = dsp_client_consumer_api.DspClientConsumerApi.get_data_address_endpoint(data_address_received)
assert authorization, "Could not find authorization token in DataAddress properties."
assert endpoint, "Could not find endpoint in DataAddress properties."
# actual request of the data
headers = {
"Authorization": authorization
}
r = requests.get(endpoint, headers=headers)
if raw_data:
data_result = r.content
else:
data_result = r.json()
after = datetime.now().timestamp()
duration = after - before
data_str = None
if raw_data:
data_str = data_result
else:
try:
data_str = json.dumps(data_result, indent=4)
except Exception as ex:
data_str = data_result
if out_fn:
with(open(out_fn, 'w')) as f:
f.write(data_str)
else:
print(data_str)
print(f"request duration in seconds: {duration}", file=sys.stderr)
os._exit(1) # this does also stop the webhook thread
@cli.command('update', help='Only from 0.7.x onwards')
@click.option('-o', '--out', help='Filename for output.', default=PARTICIPANTS_SETTINGS_CACHE)
@click.option('--client_id', default='', help='Portal technical user with discovery role')
@click.option('--client_secret_fn', default='.secrets/discovery.secret', help='Filename with corresponding client_secret')
@click.option('--token_endpoint', default=PORTAL_OAUTH_TOKEN_ENDPOINT, help='Filename with corresponding client_secret')
@click.option('--portal_base_url', default=PORTAL_BASE_URL, help='Portal base URL')
def update_participant_settings_cli(out, client_id, client_secret_fn, token_endpoint, portal_base_url):
config = get_my_config()
c = get_DspClient("")
token = c.auth.get_token(aud="")
# use our own token to read our own CS content
cs_base_url = config.get('CS_BASE_URL')
assert cs_base_url, "CS_BASE_URL needs to be set in config."
cs = CredentialService(credential_service_base_url=cs_base_url, access_token=token)
vps = cs.get_vps()
# BDRS (BPN - DID Mapping)
bdrs_base_url = config.get('BDRS_BASE_URL')
bdrs = BdrsDirectory(bdrs_base_url=bdrs_base_url, membership_vp_jwt=vps[0])
bpn_mappings = bdrs.get_directory()
#print(json.dumps(bpn_mappings, indent=True))
# Find all EDC endpoints
portal_secret = ''
portal_secret_fn = config.get('PORTAL_CLIENT_SECRET_FN')
if os.path.exists(portal_secret_fn):
with open(client_secret_fn, 'rt') as f:
portal_secret=f.read()
portal_base_url = config.get('PORTAL_BASE_URL')
portal_token_endpoint = config.get('PORTAL_TOKEN_ENDPOINT')
portal_client_id = config.get('PORTAL_CLIENT_ID')
portal = Portal(portal_base_url=portal_base_url, token_url=portal_token_endpoint,
client_id=portal_client_id, client_secret=portal_secret)
storage = FileStorageEngine(PARTICIPANTS_SETTINGS_CACHE)
for bpn, did in bpn_mappings.items():
edc_endpoints = portal.discover_edc_endpoint(bpn=bpn)
x = {
"did": did,
"edc_endpoints": edc_endpoints
}
storage.put(bpn, x)
if __name__ == '__main__':
cli()