Skip to content

Commit c8ecaca

Browse files
authored
Added sgt-sync CLI tool to Identities samples. (#96)
1 parent a3daf37 commit c8ecaca

File tree

10 files changed

+924
-0
lines changed

10 files changed

+924
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
A simple sync tool to bring Security Group Tags from ISE to Secure Access.
2+
3+
## Usage
4+
main.py [-h] [--list-ise | --list-sa | --list-sa-inactive | --diff-only]
5+
6+
options:
7+
-h, --help show this help message and exit
8+
--list-ise List all Security Group Tags found in Cisco Identity Services Engine (ISE).
9+
--list-sa List all Security Group Tags found in Cisco Secure Access (active and inactive).
10+
--list-sa-inactive List only the INACTIVE Security Group Tags found in Cisco Secure Access.
11+
--diff-only Show the difference between ISE and Secure Access SGTs without performing any synchronization (no changes applied).
12+
13+
## Environmental Variables
14+
| Variable | Comment |
15+
|----------------|-------------------------|
16+
ISE-SERVER | IP address or FQDN
17+
ISE-USER | ISE ERS Admin Username
18+
ISE-PASS | ISE ERS Admin Password
19+
SA-KEY | Secure Access API Key
20+
SA-SECRET | Secure Access API Secret
21+
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""
2+
Copyright (c) 2025 Cisco and/or its affiliates.
3+
This software is licensed to you under the terms of the Cisco Sample
4+
Code License, Version 1.1 (the "License"). You may obtain a copy of the
5+
License at
6+
7+
https://developer.cisco.com/docs/licenses
8+
9+
All use of the material herein must be in accordance with the terms of
10+
the License. All rights not expressly granted by the License are
11+
reserved. Unless required by applicable law or agreed to separately in
12+
writing, software distributed under the License is distributed on an "AS
13+
IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14+
or implied.
15+
"""
16+
17+
# main.py
18+
import sys
19+
import argparse
20+
from sgt_sync.config import Config
21+
from sgt_sync.logging_config import setup_logging
22+
from sgt_sync.clients.ise_client import IseClient, IseClientError
23+
from sgt_sync.clients.secure_access_client import (
24+
SecureAccessClient,
25+
SecureAccessClientError,
26+
)
27+
from sgt_sync.synchronizer import SgtSynchronizer
28+
from sgt_sync.models.sgt import SecurityGroupTag
29+
30+
# Configure logging for the entire application
31+
logger = setup_logging()
32+
33+
34+
def print_sgts(title: str, sgts: list[SecurityGroupTag]):
35+
"""Helper function to print a list of SGTs in a readable format."""
36+
if not sgts:
37+
logger.info(f"No {title} SGTs found.")
38+
return
39+
40+
logger.info(f"\n--- {title} ({len(sgts)} SGTs) ---")
41+
for sgt in sgts:
42+
logger.info(
43+
f" Key: {sgt.key}, Label: '{sgt.label}', Tag ID: {sgt.tag_id}, Status: {sgt.status}"
44+
)
45+
logger.info(f"--- End of {title} ---")
46+
47+
48+
def main():
49+
"""Main function to run the SGT synchronization or perform CLI actions."""
50+
parser = argparse.ArgumentParser(
51+
description="Synchronize Security Group Tags from ISE to Cisco Secure Access, or perform diagnostic actions."
52+
)
53+
54+
# Create a mutually exclusive group for commands that should not run together
55+
group = parser.add_mutually_exclusive_group()
56+
group.add_argument(
57+
"--list-ise",
58+
action="store_true",
59+
help="List all Security Group Tags found in Cisco Identity Services Engine (ISE).",
60+
)
61+
group.add_argument(
62+
"--list-sa",
63+
action="store_true",
64+
help="List all Security Group Tags found in Cisco Secure Access (active and inactive).",
65+
)
66+
group.add_argument(
67+
"--list-sa-inactive",
68+
action="store_true",
69+
help="List only the INACTIVE Security Group Tags found in Cisco Secure Access.",
70+
)
71+
group.add_argument(
72+
"--diff-only",
73+
action="store_true",
74+
help="Show the difference between ISE and Secure Access SGTs without performing any synchronization (no changes applied).",
75+
)
76+
args = parser.parse_args()
77+
78+
try:
79+
# Load and validate configuration
80+
Config.validate()
81+
config = Config()
82+
logger.info("Configuration loaded and validated.")
83+
84+
# Initialize clients
85+
ise_client = IseClient(config)
86+
sa_client = SecureAccessClient(config)
87+
logger.info("API clients initialized.")
88+
89+
# Handle CLI arguments
90+
if args.list_ise:
91+
logger.info("Fetching SGTs from ISE...")
92+
ise_sgts = ise_client.get_sgts()
93+
print_sgts("ISE SGTs", ise_sgts)
94+
sys.exit(0)
95+
96+
elif args.list_sa:
97+
logger.info("Fetching SGTs from Secure Access (all statuses)...")
98+
sa_sgts = sa_client.get_sgts()
99+
print_sgts("Secure Access SGTs (All)", sa_sgts)
100+
sys.exit(0)
101+
102+
elif args.list_sa_inactive:
103+
logger.info("Fetching INACTIVE SGTs from Secure Access...")
104+
all_sa_sgts = sa_client.get_sgts()
105+
inactive_sa_sgts = [sgt for sgt in all_sa_sgts if sgt.status == "inactive"]
106+
print_sgts("Secure Access SGTs (Inactive)", inactive_sa_sgts)
107+
sys.exit(0)
108+
109+
elif args.diff_only:
110+
logger.info(
111+
"Performing SGT difference analysis (diff-only mode). No changes will be applied."
112+
)
113+
ise_sgts = ise_client.get_sgts()
114+
sa_sgts = sa_client.get_sgts()
115+
116+
synchronizer = SgtSynchronizer(ise_client, sa_client)
117+
sgts_to_add_update, sgts_to_mark_inactive = synchronizer.diff_sgts(
118+
ise_sgts, sa_sgts
119+
)
120+
121+
print_sgts(
122+
"SGTs to Add/Update in Secure Access (would be added/modified)",
123+
sgts_to_add_update,
124+
)
125+
print_sgts(
126+
"SGTs to Mark Inactive in Secure Access (would be set to inactive)",
127+
sgts_to_mark_inactive,
128+
)
129+
sys.exit(0)
130+
131+
# Full synchronization if no specific CLI arguments are provided
132+
else:
133+
logger.info(
134+
"No specific CLI arguments provided. Proceeding with full SGT synchronization."
135+
)
136+
synchronizer = SgtSynchronizer(ise_client, sa_client)
137+
synchronizer.sync_sgts()
138+
logger.info("SGT synchronization finished successfully.")
139+
140+
except ValueError as e:
141+
logger.critical(f"Configuration Error: {e}")
142+
sys.exit(1)
143+
except (IseClientError, SecureAccessClientError) as e:
144+
logger.critical(f"API Client Error: {e}")
145+
sys.exit(1)
146+
except Exception as e:
147+
logger.critical(f"An unexpected error occurred: {e}", exc_info=True)
148+
sys.exit(1)
149+
150+
151+
if __name__ == "__main__":
152+
main()
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[project]
2+
name = "sgt-sync"
3+
version = "0.1.0"
4+
description = "A simple proof-of-concept sync tool to bring Security Group Tags from ISE to Secure Access"
5+
readme = "README.md"
6+
requires-python = ">=3.13"
7+
dependencies = [
8+
"argparse>=1.4.0",
9+
"dotenv>=0.9.9",
10+
"httpx>=0.28.1",
11+
"requests-auth>=8.0.0",
12+
]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# sgt_sync/clients/ise_client.py
2+
import httpx
3+
import json
4+
import logging
5+
from requests.auth import HTTPBasicAuth
6+
from typing import List
7+
8+
from ..config import Config
9+
from ..models.sgt import SecurityGroupTag
10+
11+
logger = logging.getLogger("sgt_sync.ise_client")
12+
13+
class IseClientError(Exception):
14+
"""Custom exception for ISE client errors."""
15+
pass
16+
17+
class IseClient:
18+
"""
19+
Client for interacting with Cisco Identity Services Engine (ISE) ERS APIs.
20+
Handles SGT fetching.
21+
"""
22+
def __init__(self, config: Config):
23+
self.config = config
24+
self.auth = HTTPBasicAuth(self.config.ISE_USER, self.config.ISE_PASS)
25+
self.headers = {
26+
"Accept": "application/JSON",
27+
"Content-Type": "application/JSON",
28+
}
29+
self.client = httpx.Client(verify=self.config.VERIFY_SSL)
30+
31+
def get_sgts(self) -> List[SecurityGroupTag]:
32+
"""
33+
Retrieve all Security Group Tags from ISE using the ERS API.
34+
Handles pagination and fetches full SGT details by following links.
35+
"""
36+
logger.info(f"Retrieving ISE SGTs from {self.config.BASE_ISE_ERS_URL}/sgt...")
37+
ise_sgt_list: List[SecurityGroupTag] = []
38+
url = f"{self.config.BASE_ISE_ERS_URL}/sgt"
39+
40+
current_start_index = 0
41+
page_size = 100
42+
total_sgts = -1
43+
44+
try:
45+
while total_sgts == -1 or current_start_index < total_sgts:
46+
params = {"size": page_size, "startIndex": current_start_index}
47+
logger.debug(
48+
f"Fetching ISE SGTs with startIndex={current_start_index}, size={page_size}"
49+
)
50+
51+
response = self.client.get(url, auth=self.auth, params=params, headers=self.headers)
52+
response.raise_for_status()
53+
54+
data = response.json()
55+
search_result = data.get("SearchResult", {})
56+
resources = search_result.get("resources", [])
57+
58+
if not resources:
59+
logger.debug("No more resources found in ISE response.")
60+
break
61+
62+
for resource in resources:
63+
sgt_detail_link = resource.get("link", {}).get("href")
64+
65+
if sgt_detail_link:
66+
sgt_detail_response = self.client.get(
67+
sgt_detail_link, auth=self.auth, headers=self.headers
68+
)
69+
sgt_detail_response.raise_for_status()
70+
71+
sgt_data = sgt_detail_response.json().get("Sgt")
72+
73+
if sgt_data:
74+
try:
75+
ise_sgt_list.append(SecurityGroupTag.from_ise_data(sgt_data))
76+
except ValueError as ve:
77+
logger.warning(f"Skipping malformed ISE SGT data: {sgt_data} - {ve}")
78+
else:
79+
logger.warning(
80+
f"Could not find link for ISE SGT resource: {resource}"
81+
)
82+
83+
total_sgts = search_result.get("total", len(ise_sgt_list))
84+
current_start_index += len(resources)
85+
logger.debug(
86+
f"Current ISE SGTs retrieved: {len(ise_sgt_list)}, Total expected: {total_sgts}"
87+
)
88+
89+
logger.info(f"Successfully retrieved {len(ise_sgt_list)} SGTs from ISE.")
90+
return ise_sgt_list
91+
except httpx.HTTPStatusError as e:
92+
logger.error(
93+
f"Failed to retrieve ISE SGTs (HTTP Status {e.response.status_code}): {e.response.text}"
94+
)
95+
raise IseClientError("Failed to retrieve ISE SGTs.") from e
96+
except json.JSONDecodeError as e:
97+
logger.error(f"Failed to decode JSON response from ISE: {e}")
98+
raise IseClientError("Invalid JSON response from ISE.") from e
99+
except httpx.RequestError as e:
100+
logger.error(f"An error occurred while requesting ISE SGTs: {e}")
101+
raise IseClientError("Network error during ISE SGT retrieval.") from e

0 commit comments

Comments
 (0)