Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e0067b0
feat: started
ErikPre Nov 4, 2025
dbfc069
feat: more typing and fix
ErikPre Nov 4, 2025
79bc2bf
feat: finished fwconfig_import
ErikPre Nov 4, 2025
4f5633a
feat: finished fwconfig_import
ErikPre Nov 4, 2025
9913528
feat: finished fwconfig_import_object
ErikPre Nov 5, 2025
86901c2
wip: fwconfig_import_rule typing
ErikPre Nov 6, 2025
ee28fc4
wip: fwconfig_import_rule typing
ErikPre Nov 6, 2025
f0ed1ea
feat: fix some stuff
ErikPre Nov 7, 2025
1610408
feat: fix more stuff
ErikPre Nov 7, 2025
57e6869
feat: stuff
ErikPre Nov 7, 2025
f744866
Merge branch 'importer-rework' into chore/type-safety
ErikPre Nov 7, 2025
b7d6681
feat: Import only types
ErikPre Nov 7, 2025
a253bde
feat: additional typing
ErikPre Nov 7, 2025
a3a5f09
feat: typed fwo_log
ErikPre Nov 7, 2025
4556421
wip: fwconfig_import_rule
ErikPre Nov 8, 2025
cd5c98e
feat: fwconfig_import_rule typing finished
ErikPre Nov 8, 2025
2c0b067
feat: finished all models
ErikPre Nov 9, 2025
ba71e60
wip: typing
ErikPre Nov 9, 2025
ed2df50
fix: minor changes
ErikPre Nov 9, 2025
5a4999c
feat: more typing
ErikPre Nov 9, 2025
ba48a91
Merge remote-tracking branch 'CactuseSecurity/importer-rework' into c…
Y4nnikH Nov 10, 2025
5f1bded
fix(importer): incorrect types and imports
Y4nnikH Nov 10, 2025
38a8a7d
wip: model controller typing
ErikPre Nov 10, 2025
d40494c
feat: model_controller typing finished
ErikPre Nov 10, 2025
e3c2281
feat: common.py typing
ErikPre Nov 10, 2025
c4a3307
feat: typing finished
ErikPre Nov 10, 2025
c79107e
feat: asa typing finished
ErikPre Nov 10, 2025
63fdb51
fix: circular import
ErikPre Nov 10, 2025
9cf9937
Merge branch 'importer-rework' into chore/type-safety
ErikPre Nov 10, 2025
a3f587c
wip: checkpoint typing
ErikPre Nov 11, 2025
a104348
chore: use python models where possible and replace all Optional with…
ErikPre Nov 11, 2025
4cb5cfa
wip: checkpoint
ErikPre Nov 11, 2025
cee9e63
wip: checkpoint
ErikPre Nov 11, 2025
fffd0b8
feat: cp_rule
ErikPre Nov 11, 2025
7e05454
feat: checkpoint typing
ErikPre Nov 11, 2025
e9183e1
fix(importer): import path
Y4nnikH Nov 12, 2025
33207ea
Merge remote-tracking branch 'upstream/importer-rework' into chore/ty…
Laennart Nov 12, 2025
7dcbb95
fix: ASA types
Laennart Nov 12, 2025
d0ac4f6
refactor: Azure types
Laennart Nov 12, 2025
11f4477
refactor: Cisco firepower domain
Laennart Nov 12, 2025
07053f9
refactor: Dummy router management types
Laennart Nov 12, 2025
fcc6b26
refactor: Revert azure change
Laennart Nov 12, 2025
ce5d155
fix: Checkpoint login
Laennart Nov 13, 2025
2d9d551
wip: fmgr
ErikPre Nov 14, 2025
bdcf2b6
wip: fmgr
ErikPre Nov 15, 2025
932fe22
feat: fmgr
ErikPre Nov 15, 2025
a7d62e8
wip: Cleanup
ErikPre Nov 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions roles/importer/files/importer/azure2022ff/azure_getter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# library for API get functions
import base64
from typing import Any
from fwo_log import getFwoLogger
import requests.packages
import requests
import json
import fwo_globals
from fwo_exceptions import FwLoginFailed


def api_call(url, params = {}, headers = {}, data = {}, azure_jwt = '', show_progress=False, method='get'):
def api_call(url: str, params: dict[str, Any] = {}, headers: dict[str, Any] = {}, data: dict[str, Any] | str = {}, azure_jwt: str = '', show_progress: bool = False, method: str = 'get') -> tuple[dict[str, Any], dict[str, Any]]:
logger = getFwoLogger()
request_headers = {}
if not 'Content-Type' in headers:
Expand All @@ -31,13 +30,6 @@ def api_call(url, params = {}, headers = {}, data = {}, azure_jwt = '', show_pro

# error handling:
exception_text = ''
if response is None:
if 'password' in json.dumps(data):
exception_text = "error while sending api_call containing credential information to url '" + \
str(url)
else:
exception_text = "error while sending api_call to url '" + str(url) + "' with payload '" + json.dumps(
data, indent=2) + "' and headers: '" + json.dumps(request_headers, indent=2)
if not response.ok:
exception_text = 'error code: {error_code}, error={error}'.format(error_code=response.status_code, error=response.content)
#logger.error(response.content)
Expand All @@ -58,13 +50,13 @@ def api_call(url, params = {}, headers = {}, data = {}, azure_jwt = '', show_pro
logger.debug("api_call to url '" + str(url) + "' with payload '" + json.dumps(
data, indent=2) + "' and headers: '" + json.dumps(request_headers, indent=2))

return response.headers, body_json
return dict(response.headers), body_json


def login(azure_user, azure_password, tenant_id, client_id, client_secret):
def login(azure_user: str, azure_password: str, tenant_id: str, client_id: str, client_secret: str) -> str | None:
base_url = 'https://login.microsoftonline.com/{tenant_id}/oauth2/token'.format(tenant_id=tenant_id)
try:
headers, body = api_call(base_url, method="post",
_, body = api_call(base_url, method="post",
headers={'Content-Type': 'application/x-www-form-urlencoded'},
data={
"grant_type" : "client_credentials",
Expand All @@ -78,7 +70,7 @@ def login(azure_user, azure_password, tenant_id, client_id, client_secret):
raise FwLoginFailed("Azure login ERROR for client_id id=" + str(client_id) + " Message: " + str(e)) from None

if body.get("access_token") == None: # leaving out payload as it contains pwd
raise FwLoginFailed("Azure login ERROR for client_id=" + str(client_id) + " Message: " + str(e)) from None
raise FwLoginFailed("Azure login ERROR for client_id=" + str(client_id) + " Message: None") from None

if fwo_globals.debug_level > 2:
logger = getFwoLogger()
Expand All @@ -87,12 +79,12 @@ def login(azure_user, azure_password, tenant_id, client_id, client_secret):
return body["access_token"]


def update_config_with_azure_api_call(azure_jwt, api_base_url, config, api_path, key, parameters={}, payload={}, show_progress=False, limit: int=1000, method="get"):
offset = 0
limit = 1000
def update_config_with_azure_api_call(azure_jwt: str, api_base_url: str, config: dict[str, Any], api_path: str, key: str, parameters: dict[str, Any]={}, payload: dict[str, Any]={}, show_progress: bool=False, limit: int=1000, method: str="get") -> None:
_ = 0
__ = 1000
returned_new_data = True
full_result = []

full_result: list[Any] = []
#while returned_new_data:
# parameters["offset"] = offset
# parameters["limit"] = limit
Expand Down
34 changes: 17 additions & 17 deletions roles/importer/files/importer/azure2022ff/azure_network.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from asyncio.log import logger
from fwo_log import getFwoLogger
from typing import Any
from netaddr import IPAddress
from fwo_const import list_delimiter
import ipaddress


def normalize_nwobjects(full_config, config2import, import_id, jwt=None, mgm_id=None):
nw_objects = []
def normalize_nwobjects(full_config: dict[str, Any], config2import: dict[str, Any], import_id: str, jwt: str | None = None, mgm_id: str | None = None) -> None:
nw_objects: list[dict[str, Any]] = []
for obj_orig in full_config["networkObjects"]:
nw_objects.append(parse_object(obj_orig, import_id, config2import, nw_objects))
for obj_grp_orig in full_config["networkObjectGroups"]:
Expand All @@ -16,8 +16,8 @@ def normalize_nwobjects(full_config, config2import, import_id, jwt=None, mgm_id=
config2import['network_objects'] = nw_objects


def extract_base_object_infos(obj_orig, import_id, config2import, nw_objects):
obj = {}
def extract_base_object_infos(obj_orig: dict[str, Any], import_id: str, config2import: dict[str, Any], nw_objects: list[dict[str, Any]]) -> dict[str, Any]:
obj: dict[str, Any] = {}

if "type" in obj_orig:
obj["obj_name"] = obj_orig["name"]
Expand All @@ -30,9 +30,9 @@ def extract_base_object_infos(obj_orig, import_id, config2import, nw_objects):
return obj


def parse_obj_group(orig_grp, import_id, nw_objects, config2import, id = None):
refs = []
names = []
def parse_obj_group(orig_grp: dict[str, Any], import_id: str, nw_objects: list[dict[str, Any]], config2import: dict[str, Any], id: str | None = None) -> tuple[str, str]:
refs: list[str] = []
names: list[str] = []
if "properties" in orig_grp:
if 'ipAddresses' in orig_grp['properties']:
for ip in orig_grp['properties']['ipAddresses']:
Expand All @@ -43,14 +43,14 @@ def parse_obj_group(orig_grp, import_id, nw_objects, config2import, id = None):
return list_delimiter.join(refs), list_delimiter.join(names)


def parse_obj_list(ip_list, import_id, config, id):
refs = []
names = []
def parse_obj_list(ip_list: list[str], import_id: str, config: dict[str, Any], id: str | None = None) -> tuple[str, str]:
refs: list[str] = []
names: list[str] = []
for ip in ip_list:
# TODO: lookup ip in network_objects and re-use
ip_obj = {}
ip_obj: dict[str, Any] = {}
ip_obj['obj_name'] = ip
ip_obj['obj_uid'] = ip_obj['obj_name'] + "_" + id
ip_obj['obj_uid'] = ip_obj['obj_name'] + "_" + (id if id is not None else "")
try:
ipaddress.ip_network(ip)
# valid ip
Expand All @@ -73,13 +73,13 @@ def parse_obj_list(ip_list, import_id, config, id):

ip_obj['control_id'] = import_id

config.append(ip_obj)
config.append(ip_obj) # type: ignore # TODO: config is dict[str, Any], not list
refs.append(ip_obj['obj_uid'])
names.append(ip_obj['obj_name'])
return list_delimiter.join(refs), list_delimiter.join(names)


def parse_object(obj_orig, import_id, config2import, nw_objects):
def parse_object(obj_orig: dict[str, Any], import_id: str, config2import: dict[str, Any], nw_objects: list[dict[str, Any]]) -> dict[str, Any]:
obj = extract_base_object_infos(obj_orig, import_id, config2import, nw_objects)
if obj_orig["type"] == "network": # network
obj["obj_typ"] = "network"
Expand Down Expand Up @@ -113,7 +113,7 @@ def parse_object(obj_orig, import_id, config2import, nw_objects):
return obj


def add_network_object(config2import, ip=None):
def add_network_object(config2import: dict[str, Any], ip: str | None = None) -> dict[str, Any]:
if "-" in str(ip):
type = 'ip_range'
else:
Expand Down
25 changes: 13 additions & 12 deletions roles/importer/files/importer/azure2022ff/azure_rule.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
from typing import Any, Literal
from azure_service import parse_svc_list
from azure_network import parse_obj_list
from fwo_log import getFwoLogger
import hashlib
import base64


def make_hash_sha256(o):
def make_hash_sha256(o: Any) -> str:
hasher = hashlib.sha256()
hasher.update(repr(make_hashable(o)).encode())
return base64.b64encode(hasher.digest()).decode()


def make_hashable(o):
def make_hashable(o: Any) -> tuple[Any, ...] | Any:
if isinstance(o, (tuple, list)):
return tuple((make_hashable(e) for e in o))
return tuple([make_hashable(e) for e in o]) # type: ignore

if isinstance(o, dict):
return tuple(sorted((k,make_hashable(v)) for k,v in o.items()))
return tuple(sorted((k, make_hashable(v)) for k, v in o.items())) # type: ignore

if isinstance(o, (set, frozenset)):
return tuple(sorted(make_hashable(e) for e in o))
return tuple(sorted(make_hashable(e) for e in o)) # type: ignore

return o

Expand All @@ -32,27 +32,28 @@ def make_hashable(o):
# rule_scope = rule_access_scope + rule_nat_scope


def normalize_access_rules(full_config, config2import, import_id, mgm_details={}):
rules = []
def normalize_access_rules(full_config: dict[str, Any], config2import: dict[str, Any], import_id: str, mgm_details: dict[str, Any] = {}) -> None:
rules: list[dict[str, Any]] = []

nw_obj_names = []
nw_obj_names: list[str] = []
for o in config2import['network_objects']:
nw_obj_names.append(o["obj_name"])

for device in full_config["devices"]:
for _ in full_config["devices"]:
rule_number = 0
for policy_name in full_config['devices'].keys():
for rule_prop in full_config['devices'][policy_name]['rules']:
rule_coll_container = rule_prop['properties']
if 'ruleCollections' in rule_coll_container:
for rule_coll in rule_coll_container['ruleCollections']:
rule_action: Literal["accept", "deny"] | None = None
if 'ruleCollectionType' in rule_coll and rule_coll['ruleCollectionType'] == 'FirewallPolicyFilterRuleCollection':
rule_action = "accept"
if rule_coll['action']['type'] == 'Deny':
rule_action = "deny"

for rule_orig in rule_coll['rules']:
rule = {'rule_src': 'any', 'rule_dst': 'any', 'rule_svc': 'any',
rule: dict[str, Any] = {'rule_src': 'any', 'rule_dst': 'any', 'rule_svc': 'any',
'rule_src_refs': 'any_obj_placeholder', 'rule_dst_refs': 'any_obj_placeholder',
'rule_svc_refs': 'any_svc_placeholder'}
rule['rulebase_name'] = policy_name
Expand All @@ -73,7 +74,7 @@ def normalize_access_rules(full_config, config2import, import_id, mgm_details={}
if "sourceAddresses" in rule_orig:
rule['rule_src_refs'], rule["rule_src"] = parse_obj_list(rule_orig["sourceAddresses"], import_id, config2import['network_objects'], rule["rule_uid"])
if "destinationAddresses" in rule_orig:
undefObjects = []
undefObjects: list[str] = []

for obj in rule_orig['destinationAddresses']:
if "obj_name" in obj:
Expand Down
31 changes: 15 additions & 16 deletions roles/importer/files/importer/azure2022ff/azure_service.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import random
from typing import Any
from fwo_const import list_delimiter


def normalize_svcobjects(full_config, config2import, import_id):
svc_objects = []
def normalize_svcobjects(full_config: dict[str, Any], config2import: dict[str, Any], import_id: str) -> None:
svc_objects: list[dict[str, Any]] = []
for svc_orig in full_config["serviceObjects"]:
svc_objects.append(parse_svc(svc_orig, import_id))
for svc_grp_orig in full_config["serviceObjectGroups"]:
svc_grp = extract_base_svc_infos(svc_grp_orig, import_id)
svc_grp["svc_typ"] = "group"
svc_grp["svc_member_refs"] , svc_grp["svc_member_names"] = parse_svc_group(svc_grp_orig, import_id, svc_objects)
svc_grp["svc_member_refs"] , svc_grp["svc_member_names"] = parse_svc_group(svc_grp_orig, import_id, svc_objects) # type: ignore # TODO: parse_svc_group is not defined
svc_objects.append(svc_grp)
config2import['service_objects'] = svc_objects

def extract_base_svc_infos(svc_orig, import_id):
svc = {}

def extract_base_svc_infos(svc_orig: dict[str, Any], import_id: str) -> dict[str, Any]:
svc: dict[str, Any] = {}
if "id" in svc_orig:
svc["svc_uid"] = svc_orig["id"]
else:
Expand All @@ -36,7 +37,7 @@ def extract_base_svc_infos(svc_orig, import_id):
return svc


def parse_svc(orig_svc, import_id):
def parse_svc(orig_svc: dict[str, Any], import_id: str) -> dict[str, Any]:
svc = extract_base_svc_infos(orig_svc, import_id)
svc["svc_typ"] = "simple"
parse_port(orig_svc, svc)
Expand All @@ -58,7 +59,7 @@ def parse_svc(orig_svc, import_id):
return svc


def parse_port(orig_svc, svc):
def parse_port(orig_svc: dict[str, Any], svc: dict[str, Any]) -> None:
if "port" in orig_svc:
if orig_svc["port"].find("-") != -1: # port range
port_range = orig_svc["port"].split("-")
Expand All @@ -68,18 +69,16 @@ def parse_port(orig_svc, svc):
svc["svc_port"] = orig_svc["port"]
svc["svc_port_end"] = None

def parse_svc_list(ports, ip_protos, import_id, svc_objects, id = None):
refs = []
names = []

def parse_svc_list(ports: list[str], ip_protos: list[str], import_id: str, svc_objects: list[dict[str, Any]], id: str | None = None) -> tuple[str, str]:
refs: list[str] = []
names: list[str] = []
for port in ports:
for ip_proto in ip_protos:
# TODO: lookup port in svc_objects and re-use
svc = {}


svc: dict[str, Any] = {}

if id == None:
if id is None:
id = str(random.random())

svc['svc_name'] = ip_proto + "_" + port
Expand Down
17 changes: 9 additions & 8 deletions roles/importer/files/importer/azure2022ff/fwcommon.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# import sys
# from common import importer_base_dir
# sys.path.append(importer_base_dir + '/azure2022ff')
from typing import Any
from azure_service import normalize_svcobjects
from azure_rule import normalize_access_rules
from azure_network import normalize_nwobjects
from azure_getter import login, update_config_with_azure_api_call
from fwo_log import getFwoLogger
from azure_base import azure_api_version_str

def has_config_changed(full_config, mgm_details, force=False):
def has_config_changed(full_config: dict[str, Any], mgm_details: dict[str, Any], force: bool=False):
# dummy - may be filled with real check later on
return True


def get_config(config2import, full_config, current_import_id, mgm_details, limit=1000, force=False, jwt=''):
def get_config(config2import: dict[str, Any], full_config: dict[str, Any], current_import_id: str, mgm_details: dict[str, Any], limit: int=1000, force: bool=False, jwt: str=''):
logger = getFwoLogger()
if full_config == {}: # no native config was passed in, so getting it from Azzure
parsing_config_only = False
Expand Down Expand Up @@ -41,7 +42,7 @@ def get_config(config2import, full_config, current_import_id, mgm_details, limit

# login
azure_jwt = login(azure_user, azure_password, azure_tenant_id, azure_client_id, azure_client_secret)
if azure_jwt == None or azure_jwt == "":
if azure_jwt is None or azure_jwt == "":
logger.error('Did not succeed in logging in to Azure API, no jwt returned.')
return 1

Expand Down Expand Up @@ -87,9 +88,9 @@ def get_config(config2import, full_config, current_import_id, mgm_details, limit
normalize_nwobjects(full_config, config2import, current_import_id, jwt=jwt, mgm_id=mgm_details['id'])
normalize_svcobjects(full_config, config2import, current_import_id)

any_nw_svc = {"svc_uid": "any_svc_placeholder", "svc_name": "Any", "svc_comment": "Placeholder service.",
any_nw_svc: dict[str, Any] = {"svc_uid": "any_svc_placeholder", "svc_name": "Any", "svc_comment": "Placeholder service.",
"svc_typ": "simple", "ip_proto": -1, "svc_port": 0, "svc_port_end": 65535, "control_id": current_import_id}
any_nw_object = {"obj_uid": "any_obj_placeholder", "obj_name": "Any", "obj_comment": "Placeholder object.",
any_nw_object: dict[str, Any] = {"obj_uid": "any_obj_placeholder", "obj_name": "Any", "obj_comment": "Placeholder object.",
"obj_typ": "network", "obj_ip": "0.0.0.0/0", "control_id": current_import_id}
config2import["service_objects"].append(any_nw_svc)
config2import["network_objects"].append(any_nw_object)
Expand All @@ -101,15 +102,15 @@ def get_config(config2import, full_config, current_import_id, mgm_details, limit
return 0


def extract_nw_objects(rule, config):
def extract_nw_objects(rule: str, config: dict[str, Any]):
pass


def extract_svc_objects(rule, config):
def extract_svc_objects(rule: str, config: dict[str, Any]):
pass


def extract_user_objects(rule, config):
def extract_user_objects(rule: str, config: dict[str, Any]):
pass


Expand Down
Loading
Loading