|
74 | 74 | ], |
75 | 75 | } |
76 | 76 |
|
| 77 | +policy_data_trusted_ip = { |
| 78 | + "Version": "2012-10-17", |
| 79 | + "Statement": [ |
| 80 | + { |
| 81 | + "Effect": "Allow", |
| 82 | + "Principal": {"AWS": "*"}, |
| 83 | + "Action": ["es:ESHttp*"], |
| 84 | + "Condition": {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]}}, |
| 85 | + "Resource": f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}/*", |
| 86 | + } |
| 87 | + ], |
| 88 | +} |
| 89 | + |
77 | 90 |
|
78 | 91 | class Test_opensearch_service_domains_not_publicly_accessible: |
79 | 92 | @mock_aws |
@@ -304,3 +317,87 @@ def test_policy_data_not_restricted_whole_internet(self): |
304 | 317 | assert result[0].resource_arn == domain_arn |
305 | 318 | assert result[0].region == AWS_REGION_US_WEST_2 |
306 | 319 | assert result[0].resource_tags == [] |
| 320 | + |
| 321 | + @mock_aws |
| 322 | + def test_policy_data_not_restricted_with_trusted_ips(self): |
| 323 | + opensearch_client = client("opensearch", region_name=AWS_REGION_US_WEST_2) |
| 324 | + domain_arn = opensearch_client.create_domain( |
| 325 | + DomainName=domain_name, |
| 326 | + AccessPolicies=dumps(policy_data_trusted_ip), |
| 327 | + )["DomainStatus"]["ARN"] |
| 328 | + |
| 329 | + aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2]) |
| 330 | + aws_provider._audit_config = {"trusted_ips": ["1.2.3.4", "5.6.7.8"]} |
| 331 | + |
| 332 | + from prowler.providers.aws.services.opensearch.opensearch_service import ( |
| 333 | + OpenSearchService, |
| 334 | + ) |
| 335 | + |
| 336 | + with ( |
| 337 | + mock.patch( |
| 338 | + "prowler.providers.common.provider.Provider.get_global_provider", |
| 339 | + return_value=aws_provider, |
| 340 | + ), |
| 341 | + mock.patch( |
| 342 | + "prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible.opensearch_client", |
| 343 | + new=OpenSearchService(aws_provider), |
| 344 | + ), |
| 345 | + ): |
| 346 | + from prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible import ( |
| 347 | + opensearch_service_domains_not_publicly_accessible, |
| 348 | + ) |
| 349 | + |
| 350 | + check = opensearch_service_domains_not_publicly_accessible() |
| 351 | + result = check.execute() |
| 352 | + assert len(result) == 1 |
| 353 | + assert result[0].status == "PASS" |
| 354 | + assert ( |
| 355 | + result[0].status_extended |
| 356 | + == f"Opensearch domain {domain_name} is not publicly accessible." |
| 357 | + ) |
| 358 | + assert result[0].resource_id == domain_name |
| 359 | + assert result[0].resource_arn == domain_arn |
| 360 | + assert result[0].region == AWS_REGION_US_WEST_2 |
| 361 | + assert result[0].resource_tags == [] |
| 362 | + |
| 363 | + @mock_aws |
| 364 | + def test_policy_data_not_restricted_with_trusted_ips_partial_match(self): |
| 365 | + opensearch_client = client("opensearch", region_name=AWS_REGION_US_WEST_2) |
| 366 | + domain_arn = opensearch_client.create_domain( |
| 367 | + DomainName=domain_name, |
| 368 | + AccessPolicies=dumps(policy_data_trusted_ip), |
| 369 | + )["DomainStatus"]["ARN"] |
| 370 | + |
| 371 | + aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2]) |
| 372 | + aws_provider._audit_config = {"trusted_ips": ["1.2.3.4"]} |
| 373 | + |
| 374 | + from prowler.providers.aws.services.opensearch.opensearch_service import ( |
| 375 | + OpenSearchService, |
| 376 | + ) |
| 377 | + |
| 378 | + with ( |
| 379 | + mock.patch( |
| 380 | + "prowler.providers.common.provider.Provider.get_global_provider", |
| 381 | + return_value=aws_provider, |
| 382 | + ), |
| 383 | + mock.patch( |
| 384 | + "prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible.opensearch_client", |
| 385 | + new=OpenSearchService(aws_provider), |
| 386 | + ), |
| 387 | + ): |
| 388 | + from prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible import ( |
| 389 | + opensearch_service_domains_not_publicly_accessible, |
| 390 | + ) |
| 391 | + |
| 392 | + check = opensearch_service_domains_not_publicly_accessible() |
| 393 | + result = check.execute() |
| 394 | + assert len(result) == 1 |
| 395 | + assert result[0].status == "FAIL" |
| 396 | + assert ( |
| 397 | + result[0].status_extended |
| 398 | + == f"Opensearch domain {domain_name} is publicly accessible via access policy." |
| 399 | + ) |
| 400 | + assert result[0].resource_id == domain_name |
| 401 | + assert result[0].resource_arn == domain_arn |
| 402 | + assert result[0].region == AWS_REGION_US_WEST_2 |
| 403 | + assert result[0].resource_tags == [] |
0 commit comments