Skip to content

Commit dead0ea

Browse files
committed
initial commit
1 parent 57f79d9 commit dead0ea

16 files changed

+2536
-1
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,17 @@
1-
# contrast-integrations-cli
1+
# The Contrast Security Integrations CLI
22
A CLI tool for adding Contrast Integrations via rule customizations.
3+
4+
Adapted from [Contrast-Security-OSS/integrations-scw](https://github.com/Contrast-Security-OSS/integrations-scw) to include a CLI tool, and also to include Secure Flag, Secure Code Warrior and possibly other integrations in the future.
5+
6+
## Usage
7+
```sh
8+
contrast-integrations --help
9+
contrast-integrations auth init
10+
contrast-integrations secure-code-warrior enable-for-rule sql-injection
11+
contrast-integrations secure-code-warrior disable-for-rule sql-injection
12+
13+
contrast-integrations secure-flag enable-for-all
14+
contrast-integrations secure-flag disable-for-all
15+
16+
17+
```

contrast_api/api_handlers.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import json
2+
import requests
3+
4+
from rich import print
5+
6+
from contrast_api.models import *
7+
from contrast_api.request_handler import RequestHandler
8+
9+
class BaseAPI:
10+
api_version = "api/ng"
11+
12+
def __init__(self, session, api_config):
13+
self._session = session
14+
self._config = api_config
15+
16+
self.hostname = self._config.teamserver_url
17+
self.requests = RequestHandler(session=self._session)
18+
19+
@property
20+
def _base_url(self) -> str:
21+
base_url = self.hostname
22+
base_url += f"/{self.api_version}" if self.api_version else ""
23+
return base_url
24+
25+
@property
26+
def current_org_uuid(self):
27+
if self._config.active_profile:
28+
return self._config.active_profile.org_uuid
29+
else:
30+
return self._config.default_profile.org_uuid
31+
32+
def full_url(self, endpoint: str) -> str:
33+
return f"{self._base_url}/{endpoint}"
34+
35+
def get(self, endpoint: str, skip_links: bool = False, expand: str = None, params: Dict = None) -> Response:
36+
if any([skip_links, expand, params]):
37+
params = params or {}
38+
expand = expand or ""
39+
if skip_links:
40+
expand = "skip_links," + expand
41+
if expand:
42+
params["expand"] = expand
43+
return self.requests.get(url=self.full_url(endpoint), params=params)
44+
45+
def post(self, endpoint: str, skip_links: bool = False, expand: List = None, params: Dict = None, data: Dict = {}) -> Response:
46+
if any([skip_links, expand, params]):
47+
params = params or {}
48+
expand = expand or ""
49+
if skip_links:
50+
expand = "skip_links," + expand
51+
if expand:
52+
params["expand"] = expand
53+
return self.requests.post(url=self.full_url(endpoint), params=params, data=data)
54+
55+
56+
class APIHandler(BaseAPI):
57+
api_version = ""
58+
59+
def get_current_user_profile(self):
60+
print("Getting current user profile")
61+
return self.get("profile/current-user")
62+
63+
64+
class PolicyHandler(BaseAPI):
65+
66+
def get_org_policy(self) -> Policy:
67+
response = self.get(f"{self.current_org_uuid}/rules", skip_links=True)
68+
return Policy.from_response(response)
69+
70+
def get_details_for_rule(self, rule_name: str) -> Rule:
71+
response = self.get(f"{self.current_org_uuid}/rules/{rule_name}")
72+
return Rule.from_response(response, _policy_handler=self)
73+
74+
def reset_rule(self, rule_name: str) -> Response:
75+
response = self.post(f"{self.current_org_uuid}/rules/{rule_name}", data={"override": "false"})
76+
return response
77+
78+
def update_rule(self, rule_name: str, customizations: Dict) -> Response:
79+
response = self.post(f"{self.current_org_uuid}/rules/{rule_name}", data=customizations)
80+
return response
81+
82+
83+
class ProfileHandler(BaseAPI):
84+
85+
def get_profile(self):
86+
response = self.get("profile", expand="email,preferences,login,signup,service_key,ip_address")
87+
return User.from_response(response)
88+
89+
def get_current_user_profile(self):
90+
response = self.get("profile/current-user", expand="ip_address")
91+
return User.from_response(response)
92+
93+
def get_organizations_in_profile(self):
94+
response = self.get("profile/organizations", expand="role")
95+
return Organization.from_response(response)
96+
97+
def get_default_org_from_profile(self):
98+
response = self.get("profile/organizations/default")
99+
return Organization.from_response(response)
100+
101+
def get_profile_user_roles_for_org(self):
102+
response = self.get(f"profile/organizations/{self.current_org_uuid}", expand="role")
103+
return Organization.from_response(response)

contrast_api/contrast_api.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import json
2+
import logging
3+
import requests
4+
import yaml
5+
6+
from dataclasses import dataclass
7+
from pathlib import Path
8+
from typing import Dict, List, Optional
9+
from rich import print
10+
from rich.logging import RichHandler
11+
from uuid import UUID
12+
13+
from contrast_api.api_handlers import *
14+
15+
logging.basicConfig(level="INFO", datefmt="[%X]", format="%(name)s: %(message)s", handlers=[RichHandler(markup=True)])
16+
logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass
20+
class ProfileAuth(yaml.YAMLObject):
21+
org_uuid: UUID
22+
api_key: str
23+
name: Optional[str] = None
24+
25+
@classmethod
26+
def from_dict(cls):
27+
pass
28+
29+
30+
@dataclass
31+
class ContrastAPIConfig(yaml.YAMLObject):
32+
teamserver_url: str
33+
api_key: str
34+
auth_header: str
35+
is_superadmin: Optional[bool] = False
36+
default_profile: Optional[ProfileAuth] = None
37+
active_profile: Optional[ProfileAuth] = None
38+
39+
@classmethod
40+
def from_yaml_file(cls, config_file: Path):
41+
"""Load ContrastAuth object from a YAML file"""
42+
try:
43+
logger.debug(f"Loading config file from {config_file}")
44+
contrast_config = yaml.safe_load(config_file.read_text())
45+
return cls(**contrast_config)
46+
except FileNotFoundError as err:
47+
logger.error(f"Could not find config file!: {err}")
48+
raise err
49+
50+
def __post_init__(self):
51+
if self.default_profile:
52+
self.default_profile = ProfileAuth(**self.default_profile)
53+
if self.active_profile:
54+
self.active_profile = ProfileAuth(**self.active_profile)
55+
56+
def to_yaml_file(self, config_file: Path):
57+
"""Write ContrastAuth object to a YAML file"""
58+
logger.debug(f"Writing config file to '{config_file}'")
59+
with open(config_file, "w") as file:
60+
yaml.emitter.Emitter.process_tag = lambda self, *args, **kw: None
61+
yaml.dump(self.__dict__, file, sort_keys=False, allow_unicode=True, default_flow_style=False)
62+
63+
def set_default_profile(self, org_uuid: str, api_key: str, config_file: Path, org_name: str = None):
64+
self.default_profile = ProfileAuth(name=org_name, org_uuid=org_uuid, api_key=api_key)
65+
self.to_yaml_file(config_file)
66+
67+
def set_active_profile(self, org_uuid: str, api_key: str, config_file: Path, org_name: str = None):
68+
self.active_profile = ProfileAuth(name=org_name, org_uuid=org_uuid, api_key=api_key)
69+
self.to_yaml_file(config_file)
70+
71+
72+
@dataclass
73+
class ContrastAPIResponse:
74+
status_code: int
75+
message: str
76+
data: List[Dict]
77+
78+
79+
class ContrastAPI:
80+
81+
def __init__(self, session: requests.Session, api_config: ContrastAPIConfig):
82+
self.api_config = api_config
83+
self.session = session
84+
self.api_config = api_config
85+
86+
@classmethod
87+
def from_config(cls, api_config: ContrastAPIConfig):
88+
session = requests.Session()
89+
session.headers.update({
90+
"Accept": "application/json",
91+
"Content-Type": "application/json",
92+
"API-Key": api_config.active_profile.api_key if api_config.active_profile else api_config.api_key,
93+
"Authorization": api_config.auth_header
94+
})
95+
return cls(session, api_config)
96+
97+
@property
98+
def base(self):
99+
return BaseAPI(self.session, self.api_config)
100+
101+
@property
102+
def policy(self):
103+
return PolicyHandler(self.session, self.api_config)
104+
105+
@property
106+
def profile(self):
107+
return ProfileHandler(self.session, self.api_config)

0 commit comments

Comments
 (0)