diff --git a/src/aws_assume_role/aws_assume_role.py b/src/aws_assume_role/aws_assume_role.py index 8b84e67..46fd8d3 100644 --- a/src/aws_assume_role/aws_assume_role.py +++ b/src/aws_assume_role/aws_assume_role.py @@ -231,11 +231,47 @@ def check_device_exists(devices, device_id): return False +def verify(client, app_id, device_id, state_token, otp_token, username_or_email, password, onelogin_subdomain, ip, mfa_verify_info, devices, device_type): + saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, otp_token, do_not_notify=True) + + mfa_error = 0 + while client.error or saml_endpoint_response is None: + if client.error_description == "State token is invalid or expired": + # State token expired so the OTP Token was not able to be processed + # regenerate new SAMLResponse and get new state_token + return get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip, mfa_verify_info) + else: + if mfa_error > MFA_ATTEMPTS_FOR_WARNING and len(devices) > 1: + print("The OTP Token was not able to be processed after %s attempts, Do you want to select a new MFA method? (y/n)" % MFA_ATTEMPTS_FOR_WARNING) + answer = get_yes_or_not() + if answer == 'y': + # Let's regenerate the SAMLResponse and initialize again the count + print("\n") + return get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip, None) + else: + print("Ok, Try introduce a new OTP Token then: ") + else: + if device_type == "OneLogin SMS": + # Trigger SMS, before ask for OTP + saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=True) + + if client.error_description == "Failed authentication with this factor": + print("The OTP Token was invalid or expired, please introduce a new one: ") + else: + print("The OTP Token was not able to be processed, please introduce a new one: ") + + otp_token = sys.stdin.readline().strip() + saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, otp_token, do_not_notify=True) + + mfa_error = mfa_error + 1 + mfa_verify_info['otp_token'] = otp_token + return saml_endpoint_response, mfa_verify_info + + def get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip=None, mfa_verify_info=None, cmd_otp=None): saml_endpoint_response = client.get_saml_assertion(username_or_email, password, app_id, onelogin_subdomain, ip) try_get_saml_response = 0 - verified_with_push = False while saml_endpoint_response is None or saml_endpoint_response.type == "pending": if saml_endpoint_response is None: if client.error in ['400', '401']: @@ -288,102 +324,67 @@ def get_saml_response(client, username_or_email, password, app_id, onelogin_subd device_id = mfa_verify_info['device_id'] device_type = mfa_verify_info['device_type'] - # Consider case 0 or MFA that requires a trigger - if mfa_verify_info is None or device_type in ["OneLogin SMS", "OneLogin Protect"]: - if mfa_verify_info is None: - print("-----------------------------------------------------------------------") - for index, device in enumerate(devices): - print(" " + str(index) + " | " + device.type) - - print("-----------------------------------------------------------------------") - - if len(devices) > 1: - print("\nSelect the desired MFA Device [0-%s]: " % (len(devices) - 1)) - device_selection = get_selection(len(devices)) - else: - device_selection = 0 - device = devices[device_selection] - device_id = device.id - device_type = device.type + if mfa_verify_info is None: + print("-----------------------------------------------------------------------") + for index, device in enumerate(devices): + print(" " + str(index) + " | " + device.type) - mfa_verify_info = { - 'device_id': device_id, - 'device_type': device_type, - } + print("-----------------------------------------------------------------------") - if device_type == "OneLogin SMS": - # Trigger SMS + if len(devices) > 1: + print("\nSelect the desired MFA Device [0-%s]: " % (len(devices) - 1)) + device_selection = get_selection(len(devices)) + else: + device_selection = 0 + device = devices[device_selection] + device_id = device.id + device_type = device.type + + mfa_verify_info = { + 'device_id': device_id, + 'device_type': device_type, + } + + otp_token = cmd_otp + verified = False + if otp_token: + saml_endpoint_response = verify( + client, app_id, device_id, state_token, otp_token, username_or_email, password, onelogin_subdomain, ip, mfa_verify_info, devices, device_type) + # Consider case 0 or MFA that requires a trigger + elif device_type == "OneLogin SMS": + # Trigger SMS + saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=True) + print("SMS with OTP token sent to device %s" % device_id) + elif device_type == "OneLogin Protect": + try_get_saml_response_verified = 0 + # Trigger PUSH and try verify + if 'otp_token' not in mfa_verify_info: + saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=False) + print("PUSH with OTP token sent to device %s" % device_id) + while saml_endpoint_response and saml_endpoint_response.type == "pending" and try_get_saml_response_verified < MAX_ITER_GET_SAML_RESPONSE: + time.sleep(TIME_SLEEP_ON_RESPONSE_PENDING) saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=True) - print("SMS with OTP token sent to device %s" % device_id) - elif device_type == "OneLogin Protect": - try_get_saml_response_verified = 0 - # Trigger PUSH and try verify - if 'otp_token' not in mfa_verify_info: - saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=False) - print("PUSH with OTP token sent to device %s" % device_id) - while saml_endpoint_response and saml_endpoint_response.type == "pending" and try_get_saml_response_verified < MAX_ITER_GET_SAML_RESPONSE: - time.sleep(TIME_SLEEP_ON_RESPONSE_PENDING) - saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=True) - try_get_saml_response_verified += 1 - - if saml_endpoint_response and saml_endpoint_response.type == 'success': - verified_with_push = True - else: - print("PUSH notification not confirmed, trying manual mode") + try_get_saml_response_verified += 1 - if not verified_with_push: - if cmd_otp: - otp_token = cmd_otp - else: - # Otherwise, let's request OTP token to be inserted manually - print("Enter the OTP Token for %s: " % device_type) - otp_token = sys.stdin.readline().strip() - elif 'otp_token' not in mfa_verify_info: - if cmd_otp: - otp_token = cmd_otp + if saml_endpoint_response and saml_endpoint_response.type == 'success': + verified = True else: - print("Enter the OTP Token for %s: " % mfa_verify_info['device_type']) - otp_token = sys.stdin.readline().strip() - else: - otp_token = mfa_verify_info['otp_token'] - - if not verified_with_push: - saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, otp_token, do_not_notify=True) + print("PUSH notification not confirmed, trying manual mode") - mfa_error = 0 - while client.error or saml_endpoint_response is None: - if client.error_description == "State token is invalid or expired": - # State token expired so the OTP Token was not able to be processed - # regenerate new SAMLResponse and get new state_token - return get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip, mfa_verify_info) - else: - if mfa_error > MFA_ATTEMPTS_FOR_WARNING and len(devices) > 1: - print("The OTP Token was not able to be processed after %s attempts, Do you want to select a new MFA method? (y/n)" % MFA_ATTEMPTS_FOR_WARNING) - answer = get_yes_or_not() - if answer == 'y': - # Let's regenerate the SAMLResponse and initialize again the count - print("\n") - return get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip, None) - else: - print("Ok, Try introduce a new OTP Token then: ") - else: - if device_type == "OneLogin SMS": - # Trigger SMS, before ask for OTP - saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, None, do_not_notify=True) + if not verified: + print("Enter the OTP Token for %s: " % device_type) + otp_token = sys.stdin.readline().strip() + saml_endpoint_response = verify( + client, app_id, device_id, state_token, otp_token, username_or_email, password, onelogin_subdomain, ip, mfa_verify_info, devices, device_type) - if client.error_description == "Failed authentication with this factor": - print("The OTP Token was invalid or expired, please introduce a new one: ") - else: - print("The OTP Token was not able to be processed, please introduce a new one: ") + if saml_endpoint_response.saml_response is not None: + print("\nObtained SAMLResponse from OneLogin to be used at AWS") - otp_token = sys.stdin.readline().strip() - saml_endpoint_response = client.get_saml_assertion_verifying(app_id, device_id, state_token, otp_token, do_not_notify=True) + return saml_endpoint_response, mfa_verify_info - mfa_error = mfa_error + 1 - mfa_verify_info['otp_token'] = otp_token - if saml_endpoint_response.saml_response is not None: - print("\nObtained SAMLResponse from OneLogin to be used at AWS") +def get_result(client, username_or_email, password, app_id, onelogin_subdomain, ip, mfa_verify_info, cmd_otp): + saml_endpoint_response, mfa_verify_info = get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip, mfa_verify_info, cmd_otp) result = { 'saml_response': saml_endpoint_response.saml_response, 'mfa_verify_info': mfa_verify_info, @@ -391,7 +392,6 @@ def get_saml_response(client, username_or_email, password, app_id, onelogin_subd 'password': password, 'onelogin_subdomain': onelogin_subdomain } - return result def get_attributes(saml_response): @@ -658,7 +658,7 @@ def main(): onelogin_subdomain = sys.stdin.readline().strip() if result is None: - result = get_saml_response(client, username_or_email, password, app_id, onelogin_subdomain, ip, mfa_verify_info, cmd_otp) + result = get_result(client, username_or_email, password, app_id, onelogin_subdomain, ip, mfa_verify_info, cmd_otp) username_or_email = result['username_or_email'] password = result['password']