Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from src.auth.endpoint_access_util import verify_access
from src.db.models.competition_models import ApplicationSubmission
from src.legacy_soap_api.grantors import schemas as grantor_schemas
from src.legacy_soap_api.legacy_soap_api_auth import validate_certificate
from src.legacy_soap_api.legacy_soap_api_auth import (
SOAPClientUserDoesNotHavePermission,
validate_certificate,
)
from src.legacy_soap_api.legacy_soap_api_config import SOAPOperationConfig
from src.legacy_soap_api.legacy_soap_api_constants import LegacySoapApiEvent
from src.legacy_soap_api.legacy_soap_api_schemas import SOAPRequest
Expand Down Expand Up @@ -66,7 +69,9 @@ def get_application_zip_response(
"privileges": soap_config.privileges,
},
)
raise e
raise SOAPClientUserDoesNotHavePermission(
"User did not have permission to access this application"
) from e

try:
filestream = file_util.open_stream(application_submission.download_path, mode="rb")
Expand Down
4 changes: 4 additions & 0 deletions api/src/legacy_soap_api/legacy_soap_api_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class SOAPClientCertificateLookupError(Exception):
pass


class SOAPClientUserDoesNotHavePermission(Exception):
pass


class SOAPClientCertificate(BaseModel):
cert: str
serial_number: str
Expand Down
10 changes: 10 additions & 0 deletions api/src/legacy_soap_api/legacy_soap_api_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.legacy_soap_api.legacy_soap_api_auth import (
MTLS_CERT_HEADER_KEY,
USE_SOAP_JWT_HEADER_KEY,
SOAPClientUserDoesNotHavePermission,
get_soap_auth,
)
from src.legacy_soap_api.legacy_soap_api_blueprint import legacy_soap_api_blueprint
Expand Down Expand Up @@ -98,6 +99,15 @@ def simpler_soap_api_route(
return get_simpler_soap_response(
soap_request, soap_proxy_response, db_session
).to_flask_response()
except SOAPClientUserDoesNotHavePermission:
msg = "soap_client_certificate: User did not have permission to access this application"
logger.info(
msg=msg,
extra={
"soap_api_event": LegacySoapApiEvent.ERROR_CALLING_SIMPLER,
},
)
return soap_proxy_response.to_flask_response()
except Exception:
msg = "Unable to process Simpler SOAP proxy response"
logger.exception(
Expand Down
5 changes: 2 additions & 3 deletions api/tests/src/legacy_soap_api/test_legacy_soap_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import pytest
import pytz
import requests
from apiflask import HTTPError
from botocore.exceptions import ClientError
from sqlalchemy import update

Expand All @@ -21,7 +20,7 @@
GetOpportunityListResponse,
OpportunityDetails,
)
from src.legacy_soap_api.legacy_soap_api_auth import SOAPAuth
from src.legacy_soap_api.legacy_soap_api_auth import SOAPAuth, SOAPClientUserDoesNotHavePermission
from src.legacy_soap_api.legacy_soap_api_client import (
BaseSOAPClient,
SimplerApplicantsS2SClient,
Expand Down Expand Up @@ -552,7 +551,7 @@ def test_get_simpler_soap_response_cannot_access_endpoint_if_certificate_user_do
)
mock_proxy_response = SOAPResponse(data=b"", status_code=500, headers={})
client = SimplerGrantorsS2SClient(soap_request, db_session)
with pytest.raises(HTTPError):
with pytest.raises(SOAPClientUserDoesNotHavePermission):
client.get_simpler_soap_response(mock_proxy_response)

def test_get_simpler_soap_response_logging_if_downloading_the_file_from_s3_fails(
Expand Down
60 changes: 59 additions & 1 deletion api/tests/src/legacy_soap_api/test_legacy_soap_api_routes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import logging
from unittest import mock

from lxml import etree

from src.legacy_soap_api.legacy_soap_api_auth import USE_SOAP_JWT_HEADER_KEY
from src.constants.lookup_constants import Privilege
from src.legacy_soap_api.legacy_soap_api_auth import (
USE_SOAP_JWT_HEADER_KEY,
SOAPAuth,
SOAPClientCertificate,
)
from src.legacy_soap_api.legacy_soap_api_utils import get_invalid_path_response
from tests.lib.data_factories import setup_cert_user
from tests.src.db.models.factories import (
AgencyFactory,
ApplicationFactory,
ApplicationSubmissionFactory,
CompetitionFactory,
OpportunityFactory,
)

NSMAP = {
"envelope": "http://schemas.xmlsoap.org/soap/envelope/",
Expand All @@ -15,6 +29,9 @@
LEGACY_TRACKING_NUMBER = "GRANT00000008"
GET_APPLICATION_PATH = f"{{{NSMAP['envelope']}}}Body/{{{NSMAP['application_request']}}}GetApplicationRequest/{{{NSMAP['tracking_number']}}}GrantsGovTrackingNumber"
GET_APPLICATION_ZIP_PATH = f"{{{NSMAP['envelope']}}}Body/{{{NSMAP['application_request']}}}GetApplicationZipRequest/{{{NSMAP['tracking_number']}}}GrantsGovTrackingNumber"
MOCK_FINGERPRINT = "123"
MOCK_CERT = "456"
MOCK_CERT_STR = "certstr"


def test_successful_request(client, fixture_from_file, caplog) -> None:
Expand Down Expand Up @@ -244,3 +261,44 @@ def test_simpler_getapplicationzip_operation_returns_not_found_response_includes
assert (
response.headers["Set-Cookie"] == "JSESSIONID=xyz; Path=/grantsws-agency; Secure; HttpOnly"
)


def test_simpler_getapplicationzip_operation_raising_httperror_due_to_privileges_logs_info(
client, fixture_from_file, enable_factory_create, caplog
) -> None:
agency = AgencyFactory.create()
opportunity = OpportunityFactory.create(agency_code=agency.agency_code)
competition = CompetitionFactory(
opportunity=opportunity,
)
WRONG_PRIVILEGES = {Privilege.READ_TEST_USER_TOKEN}
user, role, soap_client_certificate = setup_cert_user(agency, WRONG_PRIVILEGES)
application = ApplicationFactory.create(competition=competition)
submission = ApplicationSubmissionFactory.create(application=application)
full_path = "/grantsws-agency/services/v2/AgencyWebServicesSoapPort"
fixture_path = "/legacy_soap_api/grantors/get_application_zip_request.xml"
mock_data = fixture_from_file(fixture_path)
envelope = etree.fromstring(mock_data)
tracking_number = envelope.find(GET_APPLICATION_ZIP_PATH)
tracking_number.text = f"GRANT{submission.legacy_tracking_number}"
mock_client_cert = SOAPClientCertificate(
cert=MOCK_CERT_STR,
fingerprint=MOCK_FINGERPRINT,
serial_number="1235",
legacy_certificate=soap_client_certificate.legacy_certificate,
)
with mock.patch("src.legacy_soap_api.legacy_soap_api_routes.get_soap_auth") as mock_get_auth:
mock_get_auth.return_value = SOAPAuth(certificate=mock_client_cert)
response = client.post(
full_path, data=etree.tostring(envelope), headers={"Use-Simpler-Override": "true"}
)
assert response.status_code == 500
info_messages = [
record
for record in caplog.records
if record.message
== "soap_client_certificate: User did not have permission to access this application"
]
assert len(info_messages) == 1
error_records = [record for record in caplog.records if record.levelno >= logging.ERROR]
assert len(error_records) == 0
Loading