-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_cli.py
More file actions
2753 lines (2524 loc) · 118 KB
/
test_cli.py
File metadata and controls
2753 lines (2524 loc) · 118 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Functional tests for :mod:`weaver.cli`.
"""
import base64
import contextlib
import copy
import json
import logging
import os
import shutil
import smtplib
import tempfile
import uuid
from typing import TYPE_CHECKING, cast
import mock
import pytest
import responses
import yaml
from owslib.ows import DEFAULT_OWS_NAMESPACE
from owslib.wps import WPSException
from parameterized import parameterized
from pyramid.httpexceptions import HTTPForbidden, HTTPOk, HTTPUnauthorized
from webtest import TestApp as WebTestApp
from tests import resources
from tests.functional.test_job_provenance import TestJobProvenanceBase
from tests.functional.utils import JobUtils, ResourcesUtil, WpsConfigBase
from tests.utils import (
get_weaver_url,
mocked_dismiss_process,
mocked_execute_celery,
mocked_file_server,
mocked_reference_test_file,
mocked_remote_server_requests_wps1,
mocked_sub_requests,
mocked_wps_output,
run_command,
setup_config_from_settings
)
from weaver.__meta__ import __version__
from weaver.base import classproperty
from weaver.cli import AuthHandler, BearerAuthHandler, WeaverClient, main as weaver_cli
from weaver.config import WeaverConfiguration
from weaver.datatype import DockerAuthentication, Service
from weaver.execute import ExecuteReturnPreference
from weaver.formats import (
ContentType,
OutputFormat,
clean_media_type_format,
get_cwl_file_format,
get_extension,
repr_json
)
from weaver.notify import decrypt_email
from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, ProcessSchema
from weaver.processes.types import ProcessType
from weaver.provenance import ProvenanceFormat, ProvenancePathType
from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory
from weaver.utils import fully_qualified_name, get_registry
from weaver.visibility import Visibility
from weaver.wps.utils import get_wps_output_url, map_wps_output_location
if TYPE_CHECKING:
from typing import Any, Callable, Dict, Optional, Union
from weaver.cli import OperationResult
from weaver.status import AnyStatusType
from weaver.typedefs import AnyRequestType, AnyResponseType, CWL, JSON
class FakeAuthHandler(object):
def __call__(self, *_, **__):
return None
@pytest.mark.cli
@pytest.mark.functional
class TestWeaverClientBase(WpsConfigBase, ResourcesUtil, JobUtils):
test_process_prefix = "test-client-"
@classmethod
def setUpClass(cls):
settings = copy.deepcopy(cls.settings or {})
settings.update({
"weaver.vault_dir": tempfile.mkdtemp(prefix="weaver-test-"),
"weaver.wps_output_dir": tempfile.mkdtemp(prefix="weaver-test-"),
"weaver.wps_output_url": "http://random-file-server.com/wps-outputs",
"weaver.wps_email_notify_smtp_host": "http://fake-email-server",
"weaver.wps_email_notify_port": 1234,
"weaver.wps_email_encrypt_salt": "123456",
})
cls.settings = settings
super(TestWeaverClientBase, cls).setUpClass()
cls.url = get_weaver_url(cls.app.app.registry)
cls.client = WeaverClient(cls.url)
cli_logger = logging.getLogger("weaver.cli")
cli_logger.setLevel(logging.DEBUG)
def setUp(self):
self.job_store.clear_jobs()
self.service_store.clear_services()
processes = self.process_store.list_processes()
test_processes = filter(lambda _proc: _proc.id.startswith(self.test_process_prefix), processes)
for proc in test_processes:
self.process_store.delete_process(proc.id)
# make one process available for testing features
self.test_process = {}
self.test_payload = {}
for process in ["Echo", "CatFile", "FileInfo"]:
self.test_process[process] = f"{self.test_process_prefix}{process}"
self.test_payload[process] = (
self.retrieve_payload(process, "deploy", local=True) or
self.retrieve_payload(process, "package", local=True)
)
self.deploy_process(self.test_payload[process], process_id=self.test_process[process])
@classmethod
def tearDownClass(cls):
super(TestWeaverClientBase, cls).tearDownClass()
for tmp_dir_cfg in ["weaver.vault_dir", "weaver.wps_output_dir"]:
tmp_wps_out = cls.settings.get(tmp_dir_cfg, "")
if os.path.isdir(tmp_wps_out):
shutil.rmtree(tmp_wps_out, ignore_errors=True)
class TestWeaverClient(TestWeaverClientBase):
test_tmp_dir = None # type: str
@classmethod
def setUpClass(cls):
super(TestWeaverClient, cls).setUpClass()
cls.test_tmp_dir = tempfile.mkdtemp()
@classmethod
def tearDownClass(cls):
super(TestWeaverClient, cls).tearDownClass()
shutil.rmtree(cls.test_tmp_dir, ignore_errors=True)
def setup_test_file(self, original_file, substitutions):
# type: (str, Dict[str, str]) -> str
path = os.path.join(self.test_tmp_dir, str(uuid.uuid4()))
os.makedirs(path, exist_ok=True)
test_file_path = os.path.join(path, os.path.split(original_file)[-1])
with open(original_file, mode="r", encoding="utf-8") as real_file:
data = real_file.read()
for sub, new in substitutions.items():
data = data.replace(sub, new)
with open(test_file_path, mode="w", encoding="utf-8") as test_file:
test_file.write(data)
return test_file_path
def test_info(self):
result = mocked_sub_requests(self.app, self.client.info)
assert result.success
assert result.body["title"] == "Weaver"
assert result.body["configuration"] == WeaverConfiguration.HYBRID
assert "parameters" in result.body
def test_version(self):
result = mocked_sub_requests(self.app, self.client.version)
assert result.success
assert "versions" in result.body
assert result.body["versions"] == [{"name": "weaver", "version": __version__, "type": "api"}]
def test_conformance(self):
result = mocked_sub_requests(self.app, self.client.conformance)
assert result.success
assert "conformsTo" in result.body
assert isinstance(result.body["conformsTo"], list)
def process_listing_op(self, operation, **op_kwargs):
# type: (Callable[[Any, ...], OperationResult], **Any) -> OperationResult
result = mocked_sub_requests(self.app, operation, only_local=True, **op_kwargs)
assert result.success
assert "processes" in result.body
assert "undefined" not in result.message
return result
def test_capabilities(self):
result = self.process_listing_op(self.client.capabilities)
assert set(result.body["processes"]) == {
# test process
self.test_process["CatFile"],
self.test_process["Echo"],
self.test_process["FileInfo"],
# builtin
*self.get_builtin_process_names(),
}
def test_processes(self):
result = self.process_listing_op(self.client.processes)
assert set(result.body["processes"]) == {
# test process
self.test_process["CatFile"],
self.test_process["Echo"],
self.test_process["FileInfo"],
# builtin
*self.get_builtin_process_names(),
}
def test_processes_with_details(self):
result = self.process_listing_op(self.client.processes, detail=True)
assert all(isinstance(proc, dict) for proc in result.body["processes"])
expect_ids = [proc["id"] for proc in result.body["processes"]]
assert set(expect_ids) == {
# test process
self.test_process["CatFile"],
self.test_process["Echo"],
self.test_process["FileInfo"],
# builtin
*self.get_builtin_process_names(),
}
@mocked_remote_server_requests_wps1([
(resources.TEST_REMOTE_SERVER_URL, resources.TEST_REMOTE_SERVER_WPS1_GETCAP_XML, [
resources.TEST_REMOTE_SERVER_WPS1_DESCRIBE_PROCESS_XML
]),
(resources.TEST_HUMMINGBIRD_WPS1_URL, resources.TEST_HUMMINGBIRD_WPS1_GETCAP_XML, []),
(resources.TEST_EMU_WPS1_GETCAP_URL, resources.TEST_EMU_WPS1_GETCAP_XML, []),
])
def test_processes_with_providers(self):
prov1 = Service(name="emu", url=resources.TEST_EMU_WPS1_GETCAP_URL, public=True)
prov2 = Service(name="hummingbird", url=resources.TEST_HUMMINGBIRD_WPS1_URL, public=True)
prov3 = Service(name="test-service", url=resources.TEST_REMOTE_SERVER_URL, public=True)
self.service_store.save_service(prov1)
self.service_store.save_service(prov2)
self.service_store.save_service(prov3)
result = self.process_listing_op(self.client.processes, with_providers=True)
assert len(result.body["processes"]) > 0, "Local processes should be reported as well along with providers."
assert "providers" in result.body
providers = result.body["providers"]
for prov in providers:
prov.pop("$schema", None)
prov.pop("$id", None)
assert providers == [
{"id": prov1.name, "processes": resources.TEST_EMU_WPS1_PROCESSES},
{"id": prov2.name, "processes": resources.TEST_HUMMINGBIRD_WPS1_PROCESSES},
{"id": prov3.name, "processes": resources.TEST_REMOTE_SERVER_WPS1_PROCESSES},
]
@mocked_remote_server_requests_wps1([
resources.TEST_REMOTE_SERVER_URL,
resources.TEST_REMOTE_SERVER_WPS1_GETCAP_XML,
[resources.TEST_REMOTE_SERVER_WPS1_DESCRIBE_PROCESS_XML]
])
def test_register_provider(self):
prov_id = "test-server"
prov_url = resources.TEST_REMOTE_SERVER_URL
result = mocked_sub_requests(self.app, self.client.register, prov_id, prov_url, only_local=True)
assert result.success
assert result.body["id"] == "test-server"
assert result.body["title"] == "Mock Remote Server"
assert result.body["description"] == "Testing"
assert result.body["type"] == ProcessType.WPS_REMOTE
assert "links" in result.body
for link in result.body["links"]:
if link["rel"] != "service-desc":
continue
assert "request=GetCapabilities" in link["href"]
assert link["type"] == ContentType.APP_XML
break
else:
self.fail("Could not find expected remote WPS link reference.")
for link in result.body["links"]:
if link["rel"] != "service":
continue
assert link["href"] == f"{self.url}/providers/{prov_id}"
assert link["type"] == ContentType.APP_JSON
break
else:
self.fail("Could not find expected provider JSON link reference.")
for link in result.body["links"]:
if link["rel"] != "http://www.opengis.net/def/rel/ogc/1.0/processes":
continue
assert link["href"] == f"{self.url}/providers/{prov_id}/processes"
assert link["type"] == ContentType.APP_JSON
break
else:
self.fail("Could not find expected provider sub-processes link reference.")
def test_unregister_provider(self):
prov = Service(name="test-service", url=resources.TEST_REMOTE_SERVER_URL, public=True)
self.service_store.save_service(prov)
result = mocked_sub_requests(self.app, self.client.unregister, prov.name, only_local=True)
assert result.success
assert result.message == "Successfully unregistered provider."
assert result.code == 204
assert result.body is None
def test_custom_auth_handler(self):
"""
Validate use of custom authentication handler works.
Called operation does not matter.
"""
token = str(uuid.uuid4())
class CustomAuthHandler(AuthHandler):
def __call__(self, request):
request.headers["Custom-Authorization"] = f"token={token}&user={self.identity}"
return request
auth = CustomAuthHandler(identity="test") # insert an auth property that should be used by prepared request
# skip result parsing to return obtained response directly, which contains a reference to the prepared request
with mock.patch.object(WeaverClient, "_parse_result", side_effect=lambda r, *_, **__: r):
resp = mocked_sub_requests(self.app, self.client.describe, self.test_process["Echo"], auth=auth)
assert resp.status_code == 200, "Operation should have been called successfully"
assert resp.json["id"] == self.test_process["Echo"], "Operation should have been called successfully"
assert "Custom-Authorization" in resp.request.headers
assert resp.request.headers["Custom-Authorization"] == f"token={token}&user=test"
def test_deploy_payload_body_cwl_embedded(self):
test_id = f"{self.test_process_prefix}deploy-body-no-cwl"
payload = self.retrieve_payload("Echo", "deploy", local=True)
package = self.retrieve_payload("Echo", "package", local=True)
payload["executionUnit"][0] = {"unit": package}
result = mocked_sub_requests(self.app, self.client.deploy, test_id, payload)
assert result.success
assert "processSummary" in result.body
assert result.body["processSummary"]["id"] == test_id
assert "deploymentDone" in result.body
assert result.body["deploymentDone"] is True
assert "undefined" not in result.message
def test_deploy_payload_file_cwl_embedded(self):
test_id = f"{self.test_process_prefix}deploy-file-no-cwl"
payload = self.retrieve_payload("Echo", "deploy", local=True)
package = self.retrieve_payload("Echo", "package", local=True, ref_found=True)
payload["executionUnit"][0] = {"href": package}
with tempfile.NamedTemporaryFile(mode="w", suffix=".cwl") as body_file:
json.dump(payload, body_file)
body_file.flush()
body_file.seek(0)
result = mocked_sub_requests(self.app, self.client.deploy, test_id, body_file.name)
assert result.success
assert "processSummary" in result.body
assert result.body["processSummary"]["id"] == test_id
assert "deploymentDone" in result.body
assert result.body["deploymentDone"] is True
assert "undefined" not in result.message
def test_deploy_payload_inject_cwl_body(self):
test_id = f"{self.test_process_prefix}deploy-body-with-cwl-body"
payload = self.retrieve_payload("Echo", "deploy", local=True)
package = self.retrieve_payload("Echo", "package", local=True)
payload.pop("executionUnit", None)
result = mocked_sub_requests(self.app, self.client.deploy, test_id, payload, package)
assert result.success
assert "processSummary" in result.body
assert result.body["processSummary"]["id"] == test_id
assert "deploymentDone" in result.body
assert result.body["deploymentDone"] is True
assert "undefined" not in result.message
def test_deploy_payload_inject_cwl_file(self):
test_id = f"{self.test_process_prefix}deploy-body-with-cwl-file"
payload = self.retrieve_payload("Echo", "deploy", local=True)
package = self.retrieve_payload("Echo", "package", local=True, ref_found=True)
payload.pop("executionUnit", None)
result = mocked_sub_requests(self.app, self.client.deploy, test_id, payload, package)
assert result.success
assert "processSummary" in result.body
assert result.body["processSummary"]["id"] == test_id
assert "deploymentDone" in result.body
assert result.body["deploymentDone"] is True
assert "undefined" not in result.message
def test_deploy_with_undeploy(self):
test_id = f"{self.test_process_prefix}deploy-undeploy-flag"
deploy = self.test_payload["Echo"]
result = mocked_sub_requests(self.app, self.client.deploy, test_id, deploy)
assert result.success
result = mocked_sub_requests(self.app, self.client.deploy, test_id, deploy, undeploy=True)
assert result.success
assert "undefined" not in result.message
def test_deploy_private_process_description(self):
test_id = f"{self.test_process_prefix}private-process-description"
payload = self.retrieve_payload("Echo", "deploy", local=True)
package = self.retrieve_payload("Echo", "package", local=True)
payload.pop("executionUnit", None)
process = payload["processDescription"].pop("process")
payload["processDescription"].update(process)
payload["processDescription"]["visibility"] = Visibility.PRIVATE
result = mocked_sub_requests(self.app, self.client.deploy, test_id, payload, package)
assert result.success
assert "processSummary" in result.body
assert result.body["processSummary"]["id"] == test_id
result = mocked_sub_requests(self.app, self.client.describe, test_id)
assert not result.success
assert result.code == 403
def test_deploy_private_process_nested(self):
test_id = f"{self.test_process_prefix}private-process-nested"
payload = self.retrieve_payload("Echo", "deploy", local=True)
package = self.retrieve_payload("Echo", "package", local=True)
payload.pop("executionUnit", None)
payload["processDescription"]["process"]["visibility"] = Visibility.PRIVATE
result = mocked_sub_requests(self.app, self.client.deploy, test_id, payload, package)
assert result.success
assert "processSummary" in result.body
assert result.body["processSummary"]["id"] == test_id
result = mocked_sub_requests(self.app, self.client.describe, test_id)
assert not result.success
assert result.code == 403
def test_deploy_workflow(self):
"""
Ensure the :term:`CLI` can infer "remote" processes references in the Workflow even though checking locally.
"""
step_id = f"{self.test_process_prefix}echo"
package = self.retrieve_payload("Echo", "package", local=True)
result = mocked_sub_requests(self.app, self.client.deploy, step_id, cwl=package)
assert result.success
test_id = f"{self.test_process_prefix}workflow-echo"
package = self.retrieve_payload("WorkflowEcho", "package", local=True)
assert all(step["run"] == "Echo.cwl" for step in package["steps"].values()), "Requirement for test not met"
for step in package["steps"].values():
step["run"] = f"{step_id}.cwl" # replace by test id used for cleanup
def get_registry_no_auto_pyramid(_container):
"""
Raise if the registry resolver gets called with nothing in this context, as it should not be available.
.. note::
Given we are running in a test-suite, a pseudo test registry is actually injected as global *current*
registry, making the evaluation of this specific case hard to validate otherwise, since that registry
is still needed for the test web app to receive the valid requests for lookup.
"""
if _container is None:
raise ValueError("Test missing registry for settings reference!")
return get_registry(_container)
with mock.patch("weaver.utils.get_registry", side_effect=get_registry_no_auto_pyramid):
result = mocked_sub_requests(self.app, self.client.deploy, test_id, cwl=package)
assert result.success
def test_undeploy(self):
# deploy a new process to leave the test one available
other_payload = copy.deepcopy(self.test_payload["Echo"])
other_process = f"{self.test_process['Echo']}-other"
self.deploy_process(other_payload, process_id=other_process)
result = mocked_sub_requests(self.app, self.client.undeploy, other_process)
assert result.success
assert result.body.get("undeploymentDone", None) is True
assert "undefined" not in result.message
path = f"/processes/{other_process}"
resp = mocked_sub_requests(self.app, "get", path, expect_errors=True)
assert resp.status_code == 404
def test_describe(self):
result = mocked_sub_requests(self.app, self.client.describe, self.test_process["Echo"])
assert self.test_payload["Echo"]["processDescription"]["process"]["version"] == "1.0", (
"Original version submitted should be partial."
)
assert result.success
# see deployment file for details that are expected here
assert result.body["id"] == self.test_process["Echo"]
assert result.body["version"] == "1.0"
assert result.body["keywords"] == ["test", "application"] # app is added by Weaver since not CWL Workflow
assert "message" in result.body["inputs"]
assert result.body["inputs"]["message"]["title"] == "message"
assert result.body["inputs"]["message"]["description"] == "Message to echo."
assert result.body["inputs"]["message"]["minOccurs"] == 1
assert result.body["inputs"]["message"]["maxOccurs"] == 1
assert result.body["inputs"]["message"]["literalDataDomains"][0]["dataType"]["name"] == "string"
assert "output" in result.body["outputs"]
assert result.body["outputs"]["output"]["title"] == "output"
assert result.body["outputs"]["output"]["description"] == "Output file with echo message."
output_formats = result.body["outputs"]["output"]["formats"]
for out_fmt in output_formats:
out_fmt.pop("$schema", None)
out_fmt.pop("$id", None)
assert output_formats == [
{"default": True, "mediaType": ContentType.TEXT_PLAIN},
{"mediaType": ContentType.TEXT_HTML},
{"mediaType": ContentType.APP_PDF}
]
assert "undefined" not in result.message, "CLI should not have confused process description as response detail."
assert result.body["description"] == (
"Dummy process that simply echo's back the input message for testing purposes."
), "CLI should not have overridden the process description field."
def run_execute_inputs_schema_variant(
self,
inputs_param, # type: Union[JSON, str]
process="Echo", # type: str
preload=False, # type: bool
location=False, # type: bool
expect_success=True, # type: bool
expect_status=None, # type: Optional[AnyStatusType]
mock_exec=True, # type: bool
**exec_kwargs, # type: Any
): # type: (...) -> OperationResult
if isinstance(inputs_param, str):
ref = {"location": inputs_param, "ref_found": True} if location else {"ref_name": inputs_param}
if preload:
inputs_param = self.retrieve_payload(process=process, local=True, **ref)
else:
inputs_param = self.retrieve_payload(process=process, local=True, **ref)
with contextlib.ExitStack() as stack_exec:
# use pass-through function because don't care about execution result here, only the parsing of I/O
if mock_exec:
mock_exec_func = lambda *_, **__: None # noqa: E731 # pylint: disable=C3001
else:
mock_exec_func = None
for mock_exec_proc in mocked_execute_celery(func_execute_task=mock_exec_func):
stack_exec.enter_context(mock_exec_proc)
result = cast(
"OperationResult",
mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
)
if expect_success:
assert result.success, result.message + (result.text if result.text else "")
assert "jobID" in result.body
assert "processID" in result.body
assert "status" in result.body
assert "location" in result.body
assert result.body["processID"] == self.test_process[process]
assert result.body["status"] == expect_status or Status.ACCEPTED
assert result.body["location"] == result.headers["Location"]
assert "undefined" not in result.message
else:
assert not result.success, result.text
return result
def test_execute_inputs_cwl_file_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_cwl_schema.yml", preload=False)
def test_execute_inputs_ogc_value_file_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_ogc_value_schema.yml", preload=False)
def test_execute_inputs_ogc_mapping_file_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_ogc_mapping_schema.yml", preload=False)
def test_execute_inputs_old_listing_file_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_old_listing_schema.yml", preload=False)
def test_execute_inputs_cwl_literal_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_cwl_schema.yml", preload=True)
def test_execute_inputs_ogc_value_literal_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_ogc_value_schema.yml", preload=True)
def test_execute_inputs_ogc_mapping_literal_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_ogc_mapping_schema.yml", preload=True)
def test_execute_inputs_old_listing_literal_schema(self):
self.run_execute_inputs_schema_variant("Execute_Echo_old_listing_schema.yml", preload=True)
def test_execute_inputs_representation_literal_schema(self):
self.run_execute_inputs_schema_variant(["message='hello world'"], preload=True)
def test_execute_inputs_invalid(self):
"""
Mostly check that errors don't raise an error in the client, but are handled and gracefully return a result.
"""
for invalid_inputs_schema in [
[1, 2, 3, 4], # missing the ID
[{"id": "message"}], # missing the value
{} # valid schema, but missing inputs of process
]:
self.run_execute_inputs_schema_variant(invalid_inputs_schema, expect_success=False)
@pytest.mark.flaky(retries=2, delay=1)
def test_execute_manual_monitor_status_and_download_results(self):
"""
Test a typical case of :term:`Job` execution, result retrieval and download, but with manual monitoring.
Manual monitoring can be valid in cases where a *very* long :term:`Job` must be executed, and the user does
not intend to wait for it. This avoids leaving some shell/notebook/etc. open of a long time and provide a
massive ``timeout`` value. Instead, the user can simply re-call :meth:`WeaverClient.monitor` at a later time
to resume monitoring. Other situation can be if the connection was dropped or script runner crashed, and the
want to pick up monitoring again.
.. note::
The :meth:`WeaverClient.execute` is accomplished synchronously during this test because of the mock.
The :meth:`WeaverClient.monitor` step can therefore only return ``success``/``failed`` directly
without any intermediate and asynchronous pooling of ``running`` status.
The first status result from :meth:`WeaverClient.execute` is ``accept`` because this is the
default status that is generated by the HTTP response from the :term:`Job` creation.
Any following GET status will directly return the final :term:`Job` result.
.. fixme:
.. todo::
In some circonstances when running the complete test suite, this test fails sporadically when
looking for the expected job by ID. Re-running this test by itself validates if this case happened.
Find a way to make it work seamlessly. Retries sometime works, but it is not guaranteed.
"""
result = self.run_execute_inputs_schema_variant("Execute_Echo_cwl_schema.yml", mock_exec=False)
job_id = result.body["jobID"]
result = mocked_sub_requests(self.app, self.client.monitor, job_id, timeout=5, interval=1)
assert result.success, result.text
assert "undefined" not in result.message
assert result.body.get("status") == Status.SUCCESSFUL
links = result.body.get("links")
assert isinstance(links, list)
assert len([_link for _link in links if _link["rel"].endswith("results")]) == 1
# first test to get job results details, but not downloading yet
result = mocked_sub_requests(self.app, self.client.results, job_id)
assert result.success, result.text
assert "undefined" not in result.message
outputs_body = result.body
assert isinstance(outputs_body, dict) and len(outputs_body) == 1
output = outputs_body.get("output") # single of this process
assert isinstance(output, dict) and "href" in output, "Output named 'output' should be a 'File' reference."
output_href = output.get("href")
assert isinstance(output_href, str) and output_href.startswith(self.settings["weaver.wps_output_url"])
# test download feature
with contextlib.ExitStack() as stack:
server_mock = stack.enter_context(mocked_wps_output(self.settings))
target_dir = stack.enter_context(tempfile.TemporaryDirectory())
result = mocked_sub_requests(self.app, self.client.results,
job_id, download=True, out_dir=target_dir, # 'client.results' parameters
only_local=True) # mock parameter (avoid download HTTP redirect to TestApp)
assert result.success, result.text
assert "undefined" not in result.message
assert result.body != outputs_body, "Download operation should modify the original outputs body."
output = result.body.get("output", {})
assert output.get("href") == output_href
output_path = output.get("path") # inserted by download
assert isinstance(output_path, str) and output_path.startswith(target_dir)
output_name = output_href.split(job_id)[-1].lstrip("/") # everything after jobID, and without the first '/'
output_file = os.path.join(target_dir, output_name)
assert output_path == output_file
assert os.path.isfile(output_file) and not os.path.islink(output_file)
assert len(server_mock.calls) == 1 # list of (PreparedRequest, Response)
assert server_mock.calls[0][0].url == output_href
@pytest.mark.xfail(reason="not implemented")
def test_execute_with_auto_monitor(self):
"""
Test case where monitoring is accomplished automatically and inline to the execution before result download.
"""
# FIXME: Properly test execute+monitor,
# Need an actual (longer) async call because 'mocked_execute_celery' blocks until complete.
# Therefore, no pooling monitoring actually occurs (only single get status with final result).
# Test should wrap 'get_job' in 'get_job_status' view (or similar wrapping approach) to validate that
# status was periodically pooled and returned 'running' until the final 'succeeded' resumes to download.
raise NotImplementedError
def test_execute_subscribers(self):
"""
Test that specified subscribers are called for relevant :term:`Job` status milestones.
.. versionadded:: 4.34
"""
subscribers = {
"inProgressUri": "https://server.com/started",
"failedUri": "https://server.com/failure",
"successUri": "https://server.com/success",
"inProgressEmail": "working@email.com",
"failedEmail": "failed@email.com",
"successEmail": "success@email.com",
}
with contextlib.ExitStack() as subs_stack:
# mock as close as possible to the 'send' operations of respective subscriber types
mocked_requests = subs_stack.enter_context(mock.patch("weaver.notify.request_extra"))
mocked_smtp = subs_stack.enter_context(mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL))
mocked_smtp.return_value.sendmail.return_value = None # sending worked
mocked_emails = mocked_smtp.return_value.sendmail # shortcut
result = self.run_execute_inputs_schema_variant(
{"message": "test-subscribers"},
subscribers=subscribers,
mock_exec=False, # need to run it to get subscriber calls
)
# NOTE:
# Because JSON of job status are pushed using the OGC schema definitions,
# actual status in the body will be mapped to their standard equivalents.
# For example, "started" will be represented as "running" in the callback request body,
# even though both of these statuses are used internally at distinct execution steps.
running_statuses = JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]
job_id = result.body["jobID"]
expect_outputs = {
"output": {
"href": f"{get_wps_output_url(self.settings)}/{job_id}/output/stdout.log",
"type": ContentType.TEXT_PLAIN,
"format": {"mediaType": ContentType.TEXT_PLAIN},
}
}
# order important, expect status 'started' (in-progress) to occur before 'successful'
# call for 'failed' should never happen since 'successful' expected, as validated by above method
assert mocked_requests.call_count == 2, "Should not have called both failed/success callback requests"
assert mocked_requests.call_args_list[0].args == ("POST", subscribers["inProgressUri"])
assert mocked_requests.call_args_list[0].kwargs["json"]["status"] in running_statuses # status JSON
assert mocked_requests.call_args_list[1].args == ("POST", subscribers["successUri"])
assert mocked_requests.call_args_list[1].kwargs["json"] == expect_outputs # results JSON
# first argument None is 'from_addr' not configured, this is allowed if provided by 'From' email header
test_proc_byte = self.test_process["Echo"]
assert mocked_emails.call_count == 2, "Should not have sent both failed/success email notifications"
assert mocked_emails.call_args_list[0].args[:2] == (None, subscribers["inProgressEmail"])
assert f"Job {test_proc_byte} Started".encode() in mocked_emails.call_args_list[0].args[-1]
assert mocked_emails.call_args_list[1].args[:2] == (None, subscribers["successEmail"])
assert f"Job {test_proc_byte} Successful".encode() in mocked_emails.call_args_list[1].args[-1]
# NOTE:
# For all below '<>_auto_resolve_vault' test cases, the local file referenced in the Execute request body
# should be automatically handled by uploading to the Vault and forwarding the relevant X-Auth-Vault header.
def run_execute_inputs_with_vault_file(self, test_input_file, process="CatFile", preload=False, embed=False):
test_data = "DUMMY DATA"
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt") as tmp_file:
tmp_file.write(test_data)
tmp_file.flush()
tmp_file.seek(0)
if embed:
test_file = [test_input_file.format(test_file=tmp_file.name)]
else:
exec_file = self.retrieve_payload(process=process, ref_name=test_input_file, local=True, ref_found=True)
test_file = self.setup_test_file(exec_file, {"<TEST_FILE>": tmp_file.name})
result = self.run_execute_inputs_schema_variant(test_file, process=process,
preload=preload, location=True, mock_exec=False)
job_id = result.body["jobID"]
result = mocked_sub_requests(self.app, self.client.results, job_id)
assert result.success, result.message
output = result.body["output"]["href"]
output = map_wps_output_location(output, self.settings, exists=True)
assert os.path.isfile(output)
with open(output, mode="r", encoding="utf-8") as out_file:
out_data = out_file.read()
assert out_data == test_data
@pytest.mark.vault
def test_execute_inputs_cwl_file_schema_auto_resolve_vault(self):
self.run_execute_inputs_with_vault_file("Execute_CatFile_cwl_schema.yml", "CatFile", preload=False)
@pytest.mark.vault
def test_execute_inputs_ogc_mapping_file_schema_auto_resolve_vault(self):
self.run_execute_inputs_with_vault_file("Execute_CatFile_ogc_mapping_schema.yml", "CatFile", preload=False)
@pytest.mark.vault
def test_execute_inputs_old_listing_file_schema_auto_resolve_vault(self):
self.run_execute_inputs_with_vault_file("Execute_CatFile_old_listing_schema.yml", "CatFile", preload=False)
@pytest.mark.vault
def test_execute_inputs_cwl_literal_schema_auto_resolve_vault(self):
self.run_execute_inputs_with_vault_file("Execute_CatFile_cwl_schema.yml", "CatFile", preload=True)
@pytest.mark.vault
def test_execute_inputs_ogc_mapping_literal_schema_auto_resolve_vault(self):
self.run_execute_inputs_with_vault_file("Execute_CatFile_ogc_mapping_schema.yml", "CatFile", preload=True)
@pytest.mark.vault
def test_execute_inputs_old_listing_literal_schema_auto_resolve_vault(self):
self.run_execute_inputs_with_vault_file("Execute_CatFile_old_listing_schema.yml", "CatFile", preload=True)
@pytest.mark.format
@pytest.mark.vault
def test_execute_inputs_cwi_file_format_forward_media_type_vault(self):
"""
Test that uses the vault feature to upload a local file and validate that its media-type is properly resolved.
"""
content_type = ContentType.APP_JSON
ext = get_extension(content_type)
fmt = get_cwl_file_format(content_type, make_reference=True)
with contextlib.ExitStack() as stack:
tmp_input_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=ext))
tmp_input_file.write("test")
tmp_input_file.flush()
tmp_input_file.seek(0)
tmp_job_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w", suffix=".json"))
tmp_job_file.write(json.dumps({
"file": {
"class": "File",
"path": tmp_input_file.name,
"format": fmt,
}
}))
tmp_job_file.flush()
tmp_job_file.seek(0)
result = self.run_execute_inputs_schema_variant(
tmp_job_file.name, # should be uploaded to vault for resolution
location=True, # above is the file path
preload=False, # pass the file path as is to the CLI
process="FileInfo",
mock_exec=False,
)
job_id = result.body["jobID"]
result = mocked_sub_requests(self.app, self.client.results, job_id)
assert result.success, result.message
output = result.body["output"]["href"]
output = map_wps_output_location(output, self.settings, exists=True)
assert os.path.isfile(output)
with open(output, mode="r", encoding="utf-8") as out_file:
out_data = json.load(out_file)
# 'FileInfo' simply returns the JSON path/format of the input file
# validate that they match expectation (but path can be a random CWL directory)
assert out_data["path"] != tmp_input_file.name # make sure CWL was involved, not just directly the input file
assert not out_data["path"].endswith(os.path.basename(tmp_input_file.name))
assert out_data["path"].startswith("/var/lib/cwl/") # default, ensures vault->docker file handling occured
assert out_data["path"].endswith(".json")
assert out_data["format"] == fmt
@pytest.mark.vault
def test_execute_inputs_representation_literal_schema_auto_resolve_vault(self):
# 1st 'file' is the name of the process input
# 2nd 'File' is the type (CWL) to ensure proper detection/conversion to href URL
# 'test_file' will be replaced by the actual temp file instantiated with dummy data
for input_data in [
"file:File={test_file}",
"file:File='{test_file}'",
"file:File=\"{test_file}\"",
]:
self.run_execute_inputs_with_vault_file(input_data, "CatFile", preload=False, embed=True)
def test_execute_trigger(self):
result = self.run_execute_inputs_schema_variant(
"Execute_Echo_cwl_schema.yml",
preload=True,
pending=True, # this is the parameter of interest for this test
expect_status=Status.CREATED,
)
assert result.success
assert result.message == (
"Job successfully submitted for creation. "
"Waiting on trigger request to being execution."
)
job_id = result.body["jobID"]
# technically, trigger only need to submit the job to the execution queue
# however, because we do not have an actual celery worker queue configured in tests, mock the execution inline
# the response will be as if we only "accepted" the submission, but the job will be completed for next steps
with contextlib.ExitStack() as stack_exec:
for mock_exec_proc in mocked_execute_celery():
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.trigger_job, job_id)
assert result.success
assert result.code == 202
result = mocked_sub_requests(self.app, self.client.monitor, job_id, timeout=5, interval=1)
assert result.success
result = mocked_sub_requests(self.app, self.client.results, job_id)
assert result.success
output = result.body["output"]["href"]
output = map_wps_output_location(output, self.settings, exists=True)
assert os.path.isfile(output)
with open(output, mode="r", encoding="utf-8") as out_file:
out_data = out_file.read().strip()
assert out_data == "Test message"
def test_update_job(self):
result = self.run_execute_inputs_schema_variant(
"Execute_Echo_cwl_schema.yml",
preload=True,
pending=True, # pre-requirement for updating job is that it must not already be queued/running
expect_status=Status.CREATED,
)
assert result.success
assert result.message == (
"Job successfully submitted for creation. "
"Waiting on trigger request to being execution."
)
job_id = result.body["jobID"]
result = mocked_sub_requests(self.app, self.client.status, job_id)
assert result.success
assert "title" not in result.body
result = mocked_sub_requests(
self.app,
self.client.update_job,
job_id,
title="Random Title",
headers={"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}"},
inputs={"message": "new message"},
output_filter={"output": {}},
output_context="test",
subscribers={"successUri": "https://example.com"},
)
assert result.success
assert result.code == 204
assert result.body is None
result = mocked_sub_requests(self.app, self.client.status, job_id)
assert result.success
assert isinstance(result.body, dict)
assert result.body["title"] == "Random Title"
result = mocked_sub_requests(self.app, self.client.inputs, job_id)
assert result.success
assert isinstance(result.body, dict)
assert result.body["inputs"] == {"message": "new message"}
assert result.body["outputs"] == {"output": {}}
assert result.body["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}; respond-async"
@mocked_dismiss_process()
def test_dismiss(self):
for status in [Status.ACCEPTED, Status.FAILED, Status.RUNNING, Status.SUCCESSFUL]:
proc = self.test_process["Echo"]
job = self.job_store.save_job(task_id="12345678-1111-2222-3333-111122223333", process=proc)
job.status = status
job = self.job_store.update_job(job)
result = mocked_sub_requests(self.app, self.client.dismiss, str(job.id))
assert result.success
assert "undefined" not in result.message
def test_jobs_search_multi_status(self):
self.job_store.clear_jobs()
proc = self.test_process["Echo"]
job1 = self.job_store.save_job(task_id=uuid.uuid4(), process=proc, access=Visibility.PUBLIC)
job2 = self.job_store.save_job(task_id=uuid.uuid4(), process=proc, access=Visibility.PUBLIC)
job3 = self.job_store.save_job(task_id=uuid.uuid4(), process=proc, access=Visibility.PUBLIC)
job1.status = Status.SUCCESSFUL
job2.status = Status.FAILED
job3.status = Status.RUNNING
job1 = self.job_store.update_job(job1)
job2 = self.job_store.update_job(job2)
job3 = self.job_store.update_job(job3)
jobs = [job1, job2, job3]
for test_status, job_expect in [
(Status.SUCCESSFUL, [job1]),
([Status.SUCCESSFUL], [job1]),
([Status.SUCCESSFUL, Status.RUNNING], [job1, job3]),
(f"{Status.SUCCESSFUL},{Status.RUNNING}", [job1, job3]),
(StatusCategory.FINISHED, [job1, job2]),
(StatusCategory.FINISHED.value, [job1, job2]),
([StatusCategory.FINISHED], [job1, job2]),
([StatusCategory.FINISHED.value], [job1, job2]),
(f"{StatusCategory.FINISHED.value},{Status.FAILED}", [job1, job2]), # failed within finished, nothing added
([StatusCategory.FINISHED.value, Status.RUNNING], [job1, job2, job3]),
([StatusCategory.FINISHED, Status.RUNNING], [job1, job2, job3]),
(f"{StatusCategory.FINISHED.value},{Status.RUNNING}", [job1, job2, job3]),
]:
result = mocked_sub_requests(self.app, self.client.jobs, status=test_status, detail=False)
expect = [job.id for job in job_expect]
assert result.success
self.assert_equal_with_jobs_diffs(result.body["jobs"], expect, test_status, jobs=jobs)
class TestWeaverCLI(TestWeaverClientBase):
def setUp(self):
super(TestWeaverCLI, self).setUp()
job = self.job_store.save_job(
task_id="12345678-1111-2222-3333-111122223333", process="fake-process", access=Visibility.PUBLIC
)
job.status = Status.SUCCESSFUL
self.test_job = self.job_store.update_job(job)
def test_help_operations(self):
lines = run_command(
[
"weaver",
"--help",
],
trim=False,
)
operations = [
"deploy",
"undeploy",
"capabilities",
"processes",
"describe",
"execute",
"monitor",
"dismiss",
"results",
"status",
]
assert all(any(op in line for line in lines) for op in operations)
def test_auth_handler_unresolved(self):
"""
Validates some custom argument parser actions to validate special handling.
"""
name = "random.HandlerDoesNotExist"
args = ["processes", "-u", self.url, "-aC", name]
lines = run_command(args, entrypoint=weaver_cli, trim=False, expect_error=True)
assert lines
assert "error: argument -aC" in lines[-1] and name in lines[-1]
def test_auth_handler_bad_type(self):