1+ from robot .api .deco import library , keyword
2+
3+ from robot .libraries .Process import Process
4+
5+ import json
6+
7+ from requests import get , post , Response
8+
9+ import os
10+
11+ from typing import Union , Tuple , List
12+
13+ @library
14+ class web_service_keywords (Process ):
15+
16+ _DEFAULT_SQLITE_DB_PATH : str = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." , ".." , "tmp" , "robot_cli_affirmation_store.db" ))
17+
18+ def _get_dsn (self ) -> str :
19+ return self ._DEFAULT_SQLITE_DB_PATH
20+
21+ def __init__ (self ):
22+ self ._affirmation_store_web_service = None
23+ self ._web_server_app : str = 'test/python/flask/oauth2/token_srv'
24+ super ().__init__ ()
25+
26+ @keyword
27+ def create_oauth2_client_credentials_web_service (
28+ self ,
29+ port : int
30+ ) -> None :
31+ """
32+ Sign the input.
33+ """
34+ return self .start_process (
35+ 'flask' ,
36+ f'--app={ self ._web_server_app } ' ,
37+ 'run' ,
38+ f'--port={ port } ' ,
39+ stdout = os .path .abspath (os .path .join (os .path .dirname (__file__ ), '..' , 'log' , f'token-client-credentials-{ port } -stdout.txt' )),
40+ stderr = os .path .abspath (os .path .join (os .path .dirname (__file__ ), '..' , 'log' , f'token-client-credentials-{ port } -stderr.txt' ))
41+ )
42+
43+ @keyword
44+ def send_get_request (
45+ self ,
46+ address : str
47+ ) -> Response :
48+ """
49+ Send a simple get request.
50+ """
51+ return get (address )
52+
53+ @keyword
54+ def send_json_post_request (
55+ self ,
56+ address : str ,
57+ input : dict
58+ ) -> Response :
59+ """
60+ Send a canonical json post request.
61+ """
62+ return post (address , json = input )
63+
64+ @keyword
65+ def send_to_affirmation_store (
66+ self ,
67+ frame_key_val : str ,
68+ json_input : Union [str , dict ],
69+ affirmation_store_url : str = 'http://127.0.0.1:5848' ,
70+ path_prefix : str = '/data/' ,
71+ frame_key : str = '_frame_hash'
72+ ) -> Response :
73+ """
74+ Send a canonical json post request.
75+ """
76+ if isinstance (json_input , str ):
77+ input = json .loads (json_input )
78+ return post (f'{ affirmation_store_url } { path_prefix } { frame_key_val } ' , json = input )
79+
80+ @keyword
81+ def extract_frame_key (
82+ self ,
83+ json_input : Union [str , dict ],
84+ frame_key : str = '_frame_hash'
85+ ) -> Response :
86+ """
87+ Extract frame hash key.
88+ """
89+ if isinstance (json_input , str ):
90+ json_input = json .loads (json_input )
91+ frame_key_val = json_input .get (frame_key )
92+ return frame_key_val
93+
94+ @keyword
95+ def retrieve_from_affirmation_store (
96+ self ,
97+ frame_key_val : str ,
98+ affirmation_store_url : str = 'http://127.0.0.1:5848' ,
99+ path_prefix : str = '/data/' ,
100+ frame_key : str = '_frame_hash'
101+ ) -> Response :
102+ """
103+ Retrieve from affirmation store.
104+ """
105+ return get (f'{ affirmation_store_url } { path_prefix } { frame_key_val } ' )
0 commit comments