diff --git a/.gitignore b/.gitignore index 8c500ab..eb382b6 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ venv/* env/* .vscode/* +.claude/ # ignore http server log atests/http_server/http_server.log diff --git a/atests/http_server/core.py b/atests/http_server/core.py index f979378..1fdad51 100644 --- a/atests/http_server/core.py +++ b/atests/http_server/core.py @@ -2,6 +2,7 @@ # See AUTHORS and LICENSE for more information from flask import Flask, Response, jsonify as flask_jsonify, request +from flask_httpauth import HTTPBasicAuth, HTTPDigestAuth from .structures import CaseInsensitiveDict from .helpers import get_dict, status_code @@ -9,6 +10,11 @@ app = Flask(__name__) +app.config['SECRET_KEY'] = 'test-secret-key-for-digest-auth' + +# Initialize authentication handlers +basic_auth = HTTPBasicAuth() +digest_auth = HTTPDigestAuth() def jsonify(*args, **kwargs): @@ -191,3 +197,92 @@ def redirect_to(): response.headers["Location"] = args["url"] return response + + +# Basic auth verification callback +@basic_auth.verify_password +def verify_basic_password(username, password): + # Get expected credentials from the request path + path_parts = request.path.split('/') + if len(path_parts) >= 4 and path_parts[1] == 'basic-auth': + expected_user = path_parts[2] + expected_pass = path_parts[3] + return username == expected_user and password == expected_pass + return False + + +@app.route("/basic-auth//") +@basic_auth.login_required +def basic_auth_endpoint(user, passwd): + """Prompts the user for authorization using HTTP Basic Auth. + --- + tags: + - Auth + parameters: + - in: path + name: user + type: string + required: true + - in: path + name: passwd + type: string + required: true + produces: + - application/json + responses: + 200: + description: Successful authentication. + 401: + description: Unsuccessful authentication. + """ + return jsonify(authenticated=True, user=basic_auth.current_user()) + + +# Digest auth password callback +@digest_auth.get_password +def get_digest_password(username): + # Get expected credentials from the request path + path_parts = request.path.split('/') + if len(path_parts) >= 5 and path_parts[1] == 'digest-auth': + expected_user = path_parts[3] + expected_pass = path_parts[4] + if username == expected_user: + return expected_pass + return None + + +@app.route("/digest-auth///") +@app.route("/digest-auth////") +@digest_auth.login_required +def digest_auth_endpoint(qop, user, passwd, algorithm='MD5'): + """Prompts the user for authorization using HTTP Digest Auth. + --- + tags: + - Auth + parameters: + - in: path + name: qop + type: string + required: true + - in: path + name: user + type: string + required: true + - in: path + name: passwd + type: string + required: true + - in: path + name: algorithm + type: string + required: false + default: MD5 + produces: + - application/json + responses: + 200: + description: Successful authentication. + 401: + description: Unsuccessful authentication. + """ + return jsonify(authenticated=True, user=digest_auth.current_user()) diff --git a/atests/http_server/helpers.py b/atests/http_server/helpers.py index ce5e82a..b13c88b 100644 --- a/atests/http_server/helpers.py +++ b/atests/http_server/helpers.py @@ -256,126 +256,6 @@ def status_code(code): return r -def check_basic_auth(user, passwd): - """Checks user authentication using HTTP Basic Auth.""" - - auth = request.authorization - return auth and auth.username == user and auth.password == passwd - - -# Digest auth helpers -# qop is a quality of protection - - -def H(data, algorithm): - if algorithm == 'SHA-256': - return sha256(data).hexdigest() - elif algorithm == 'SHA-512': - return sha512(data).hexdigest() - else: - return md5(data).hexdigest() - - -def HA1(realm, username, password, algorithm): - """Create HA1 hash by realm, username, password - - HA1 = md5(A1) = MD5(username:realm:password) - """ - if not realm: - realm = u'' - return H(b":".join([username.encode('utf-8'), - realm.encode('utf-8'), - password.encode('utf-8')]), algorithm) - - -def HA2(credentials, request, algorithm): - """Create HA2 md5 hash - - If the qop directive's value is "auth" or is unspecified, then HA2: - HA2 = md5(A2) = MD5(method:digestURI) - If the qop directive's value is "auth-int" , then HA2 is - HA2 = md5(A2) = MD5(method:digestURI:MD5(entityBody)) - """ - if credentials.get("qop") == "auth" or credentials.get('qop') is None: - return H(b":".join([request['method'].encode('utf-8'), request['uri'].encode('utf-8')]), algorithm) - elif credentials.get("qop") == "auth-int": - for k in 'method', 'uri', 'body': - if k not in request: - raise ValueError("%s required" % k) - A2 = b":".join([request['method'].encode('utf-8'), - request['uri'].encode('utf-8'), - H(request['body'], algorithm).encode('utf-8')]) - return H(A2, algorithm) - raise ValueError - - -def response(credentials, password, request): - """Compile digest auth response - - If the qop directive's value is "auth" or "auth-int" , then compute the response as follows: - RESPONSE = MD5(HA1:nonce:nonceCount:clienNonce:qop:HA2) - Else if the qop directive is unspecified, then compute the response as follows: - RESPONSE = MD5(HA1:nonce:HA2) - - Arguments: - - `credentials`: credentials dict - - `password`: request user password - - `request`: request dict - """ - response = None - algorithm = credentials.get('algorithm') - HA1_value = HA1( - credentials.get('realm'), - credentials.get('username'), - password, - algorithm - ) - HA2_value = HA2(credentials, request, algorithm) - if credentials.get('qop') is None: - response = H(b":".join([ - HA1_value.encode('utf-8'), - credentials.get('nonce', '').encode('utf-8'), - HA2_value.encode('utf-8') - ]), algorithm) - elif credentials.get('qop') == 'auth' or credentials.get('qop') == 'auth-int': - for k in 'nonce', 'nc', 'cnonce', 'qop': - if k not in credentials: - raise ValueError("%s required for response H" % k) - response = H(b":".join([HA1_value.encode('utf-8'), - credentials.get('nonce').encode('utf-8'), - credentials.get('nc').encode('utf-8'), - credentials.get('cnonce').encode('utf-8'), - credentials.get('qop').encode('utf-8'), - HA2_value.encode('utf-8')]), algorithm) - else: - raise ValueError("qop value are wrong") - - return response - - -def check_digest_auth(user, passwd): - """Check user authentication using HTTP Digest auth""" - - if request.headers.get('Authorization'): - credentials = Authorization.from_header(request.headers.get('Authorization')) - if not credentials: - return - request_uri = request.script_root + request.path - if request.query_string: - request_uri += '?' + request.query_string - response_hash = response(credentials, passwd, dict(uri=request_uri, - body=request.data, - method=request.method)) - if credentials.get('response') == response_hash: - return True - return False - - -def secure_cookie(): - """Return true if cookie should have secure attribute""" - return request.environ['wsgi.url_scheme'] == 'https' - - def __parse_request_range(range_header_text): """ Return a tuple describing the byte range requested in a GET request If the range is open ended on the left or right side, then a value of None @@ -453,28 +333,3 @@ def next_stale_after_value(stale_after): return str(stal_after_count) except ValueError: return 'never' - - -def digest_challenge_response(app, qop, algorithm, stale=False): - response = app.make_response('') - response.status_code = 401 - - # RFC2616 Section4.2: HTTP headers are ASCII. That means - # request.remote_addr was originally ASCII, so I should be able to - # encode it back to ascii. Also, RFC2617 says about nonces: "The - # contents of the nonce are implementation dependent" - nonce = H(b''.join([ - getattr(request, 'remote_addr', u'').encode('ascii'), - b':', - str(time.time()).encode('ascii'), - b':', - os.urandom(10) - ]), algorithm) - opaque = H(os.urandom(10), algorithm) - - auth = WWWAuthenticate("digest") - auth.set_digest('me@kennethreitz.com', nonce, opaque=opaque, - qop=('auth', 'auth-int') if qop is None else (qop,), algorithm=algorithm) - auth.stale = stale - response.headers['WWW-Authenticate'] = auth.to_header() - return response diff --git a/atests/test_authentication.robot b/atests/test_authentication.robot index 8bc7e4e..96430c9 100644 --- a/atests/test_authentication.robot +++ b/atests/test_authentication.robot @@ -1,22 +1,23 @@ *** Settings *** Library RequestsLibrary Library customAuthenticator.py +Resource res_setup.robot *** Test Cases *** Get With Auth [Tags] get get-cert ${auth}= Create List user passwd - Create Session httpbin https://httpbin.org auth=${auth} verify=${CURDIR}${/}cacert.pem - ${resp}= GET On Session httpbin /basic-auth/user/passwd + Create Session authsession ${HTTP_LOCAL_SERVER} auth=${auth} + ${resp}= GET On Session authsession /basic-auth/user/passwd Should Be Equal As Strings ${resp.status_code} 200 Should Be Equal As Strings ${resp.json()['authenticated']} True Get With Custom Auth [Tags] get ${auth}= Get Custom Auth user passwd - Create Custom Session httpbin https://httpbin.org auth=${auth} verify=${CURDIR}${/}cacert.pem - ${resp}= GET On Session httpbin /basic-auth/user/passwd + Create Custom Session authsession ${HTTP_LOCAL_SERVER} auth=${auth} + ${resp}= GET On Session authsession /basic-auth/user/passwd Should Be Equal As Strings ${resp.status_code} 200 Should Be Equal As Strings ${resp.json()['authenticated']} True @@ -24,11 +25,10 @@ Get With Digest Auth [Tags] get get-cert ${auth}= Create List user pass Create Digest Session - ... httpbin - ... https://httpbin.org + ... authsession + ... ${HTTP_LOCAL_SERVER} ... auth=${auth} ... debug=3 - ... verify=${CURDIR}${/}cacert.pem - ${resp}= GET On Session httpbin /digest-auth/auth/user/pass + ${resp}= GET On Session authsession /digest-auth/auth/user/pass Should Be Equal As Strings ${resp.status_code} 200 Should Be Equal As Strings ${resp.json()['authenticated']} True diff --git a/atests/test_ssl_certs.robot b/atests/test_ssl_certs.robot index 7f1078a..2fadbcd 100644 --- a/atests/test_ssl_certs.robot +++ b/atests/test_ssl_certs.robot @@ -5,19 +5,19 @@ Library RequestsLibrary *** Test Cases *** Get HTTPS & Verify Cert [Tags] get get-cert - Create Session httpbin https://httpbin.org verify=True - ${resp}= GET On Session httpbin /get + Create Session sslsession https://github.com verify=True + ${resp}= GET On Session sslsession / Should Be Equal As Strings ${resp.status_code} 200 Get HTTPS & Verify Cert with a CA bundle [Tags] get get-cert - Create Session httpbin https://httpbin.org verify=${CURDIR}${/}cacert.pem - ${resp}= GET On Session httpbin /get + Create Session sslsession https://github.com verify=${CURDIR}${/}cacert.pem + ${resp}= GET On Session sslsession / Should Be Equal As Strings ${resp.status_code} 200 Get HTTPS with Client Side Certificates [Tags] get get-cert @{client_certs}= Create List ${CURDIR}${/}clientcert.pem ${CURDIR}${/}clientkey.pem - Create Client Cert Session crtsession https://server.cryptomix.com/secure client_certs=@{client_certs} - ${resp}= GET On Session crtsession / + Create Client Cert Session sslsession https://server.cryptomix.com/secure client_certs=@{client_certs} + ${resp}= GET On Session sslsession / Should Be Equal As Strings ${resp.status_code} 200 diff --git a/setup.py b/setup.py index 54f9dfb..46b870c 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ Topic :: Software Development :: Testing """[1:-1] -TEST_REQUIRE = ['robotframework>=3.2.1', 'pytest', 'flask', 'six', 'coverage', 'flake8'] +TEST_REQUIRE = ['robotframework>=3.2.1', 'pytest', 'flask', 'six', 'coverage', 'flake8', 'Flask-HTTPAuth==4.8.0'] VERSION = None version_file = join(dirname(abspath(__file__)), 'src', 'RequestsLibrary', 'version.py')