Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/confcom/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Release History
===============

1.4.2
++++++
* Update policy model to use pydantic and explicitly declare collections where order doesn't affect function. These fields will serialize in alphabetical order and comparisons will ignore order.

1.4.0
++++++
* Add --with-containers flag to acipolicygen and acifragmentgen to allow passing container policy definitions directly
Expand Down
13 changes: 13 additions & 0 deletions src/confcom/azext_confcom/lib/binaries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os


def get_binaries_dir():
binaries_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "bin")
if not os.path.exists(binaries_dir):
os.makedirs(binaries_dir)
return binaries_dir
52 changes: 52 additions & 0 deletions src/confcom/azext_confcom/lib/opa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import hashlib
import json
import os
from pathlib import Path
import platform
import subprocess
from typing import Iterable

import requests

from azext_confcom.lib.binaries import get_binaries_dir

_opa_pathh = os.path.abspath(os.path.join(get_binaries_dir(), "opa"))
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name typo: _opa_pathh has an extra 'h'. Should be _opa_path for consistency with naming conventions.

Copilot uses AI. Check for mistakes.
_expected_sha256 = "fe8e191d44fec33db2a3d0ca788b9f83f866d980c5371063620c3c6822792877"


def opa_get():

opa_fetch_resp = requests.get(
f"https://openpolicyagent.org/downloads/latest/opa_{platform.system().lower()}_amd64")
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The opa_get() function downloads a binary from the internet without verifying HTTPS certificate or using a secure connection verification. The requests.get() call should explicitly verify SSL certificates to prevent man-in-the-middle attacks:

opa_fetch_resp = requests.get(
    f"https://openpolicyagent.org/downloads/latest/opa_{platform.system().lower()}_amd64",
    verify=True,  # Explicitly verify SSL certificates (default behavior, but explicit is better)
    timeout=30    # Add timeout to prevent hanging
)

Additionally, there's no timeout which could cause the installation to hang indefinitely if the server doesn't respond.

Suggested change
f"https://openpolicyagent.org/downloads/latest/opa_{platform.system().lower()}_amd64")
f"https://openpolicyagent.org/downloads/latest/opa_{platform.system().lower()}_amd64",
verify=True, # Explicitly verify SSL certificates
timeout=30 # Add timeout to prevent hanging
)

Copilot uses AI. Check for mistakes.
opa_fetch_resp.raise_for_status()

assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == _expected_sha256
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion will cause a cryptic error message if the SHA256 hash doesn't match. Consider using a more informative error message:

actual_hash = hashlib.sha256(opa_fetch_resp.content).hexdigest()
if actual_hash != _expected_sha256:
    raise ValueError(f"OPA binary hash mismatch. Expected {_expected_sha256}, got {actual_hash}")
Suggested change
assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == _expected_sha256
actual_hash = hashlib.sha256(opa_fetch_resp.content).hexdigest()
if actual_hash != _expected_sha256:
raise ValueError(f"OPA binary hash mismatch. Expected {_expected_sha256}, got {actual_hash}")

Copilot uses AI. Check for mistakes.

with open(_opa_pathh, "wb") as f:
f.write(opa_fetch_resp.content)

Comment on lines +19 to +32
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OPA binary is downloaded during package installation without verifying the authenticity or checking if the file already exists. This could lead to:

  1. Unnecessary network calls during every installation
  2. No version pinning - the download URL uses "latest" which could break installations when new OPA versions are released
  3. The hardcoded SHA256 hash will fail if OPA releases a new version

Consider:

  • Checking if the binary already exists before downloading
  • Pinning to a specific OPA version instead of "latest"
  • Updating the expected SHA256 hash when upgrading OPA versions
Suggested change
_expected_sha256 = "fe8e191d44fec33db2a3d0ca788b9f83f866d980c5371063620c3c6822792877"
def opa_get():
opa_fetch_resp = requests.get(
f"https://openpolicyagent.org/downloads/latest/opa_{platform.system().lower()}_amd64")
opa_fetch_resp.raise_for_status()
assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == _expected_sha256
with open(_opa_pathh, "wb") as f:
f.write(opa_fetch_resp.content)
_OPA_VERSION = "v0.63.0"
_expected_sha256 = "fe8e191d44fec33db2a3d0ca788b9f83f866d980c5371063620c3c6822792877"
def opa_get():
# Check if OPA binary exists and matches expected hash
if os.path.isfile(_opa_pathh):
with open(_opa_pathh, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
if file_hash == _expected_sha256:
return _opa_pathh
# Download OPA binary if not present or hash mismatch
url = f"https://openpolicyagent.org/downloads/{_OPA_VERSION}/opa_{platform.system().lower()}_amd64"
opa_fetch_resp = requests.get(url)
opa_fetch_resp.raise_for_status()
assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == _expected_sha256, "Downloaded OPA binary hash mismatch"
with open(_opa_pathh, "wb") as f:
f.write(opa_fetch_resp.content)

Copilot uses AI. Check for mistakes.
os.chmod(_opa_pathh, 0o755)
return _opa_pathh


def opa_run(args: Iterable[str]) -> subprocess.CompletedProcess:
return subprocess.run(
[_opa_pathh, *args],
check=True,
stdout=subprocess.PIPE,
text=True,
)


def opa_eval(data_path: Path, query: str):
return json.loads(opa_run([
"eval",
"--format", "json",
"--data", str(data_path),
query,
]).stdout.strip())
64 changes: 64 additions & 0 deletions src/confcom/azext_confcom/lib/orderless_dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from typing import Any
from pydantic.dataclasses import dataclass as _dataclass, Field
from pydantic import field_serializer


# The policy model is represented as pydantic dataclasses, this makes
# serialisation to/from JSON trivial.

# For some collections in the model, the order has no semantic meaning
# (e.g. environment rules). We mark such fields using a custom OrderlessField
# class which is an extension of the pydantic Field class. This custom class
# just sets a metadata flag we can read later.

# We then also extend the dataclass decorator to sort these fields with this
# flag before serialisation and comparison.


def dataclass(cls=None, **dataclass_kwargs):
def wrap(inner_cls):

# This method uses a pydantic field serializer to operate on fields
# before serialisation. Here we look for "orderless" fields and sort them.
@field_serializer("*")
def _sort_orderless(self, value, info):
field = type(self).__pydantic_fields__[info.field_name]
if (field.json_schema_extra or {}).get("orderless"):
return sorted(value, key=repr)
return value
setattr(inner_cls, "_sort_orderless", _sort_orderless)

# This custom equality method sorts "orderless" fields before comparison.
def __eq__(self, other):
def compare_field(name, field_info):
if (field_info.json_schema_extra or {}).get("orderless"):
return (
sorted(getattr(self, name), key=repr) ==
sorted(getattr(other, name), key=repr)
)
return getattr(self, name) == getattr(other, name)

return (
type(self) is type(other) and
all(
compare_field(name, field_info)
for name, field_info in self.__pydantic_fields__.items()
)
)
setattr(inner_cls, "__eq__", __eq__)

return _dataclass(inner_cls, eq=False, **dataclass_kwargs)

# This adds support for using the decorator with or without parentheses.
if cls is None:
return wrap
return wrap(cls)


def OrderlessField(**kwargs: Any):
return Field(json_schema_extra={"orderless": True}, **kwargs)
59 changes: 31 additions & 28 deletions src/confcom/azext_confcom/lib/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from dataclasses import dataclass, field
from typing import Literal, Optional
from typing import Literal, Optional, List
from azext_confcom.lib.orderless_dataclasses import dataclass, OrderlessField, Field


def get_default_capabilities():
return [
return (
"CAP_AUDIT_WRITE",
"CAP_CHOWN",
"CAP_DAC_OVERRIDE",
Expand All @@ -23,16 +23,16 @@ def get_default_capabilities():
"CAP_SETPCAP",
"CAP_SETUID",
"CAP_SYS_CHROOT"
]
)


@dataclass
class ContainerCapabilities:
ambient: list[str] = field(default_factory=list)
bounding: list[str] = field(default_factory=get_default_capabilities)
effective: list[str] = field(default_factory=get_default_capabilities)
inheritable: list[str] = field(default_factory=list)
permitted: list[str] = field(default_factory=get_default_capabilities)
ambient: List[str] = OrderlessField(default_factory=list)
bounding: List[str] = OrderlessField(default_factory=get_default_capabilities)
effective: List[str] = OrderlessField(default_factory=get_default_capabilities)
inheritable: List[str] = OrderlessField(default_factory=list)
permitted: List[str] = OrderlessField(default_factory=get_default_capabilities)


@dataclass
Expand All @@ -44,32 +44,35 @@ class ContainerRule:

@dataclass
class ContainerExecProcesses:
command: list[str]
signals: Optional[list[str]] = None
command: List[str]
signals: Optional[List[str]] = OrderlessField(default=None)
allow_stdio_access: bool = True


@dataclass
@dataclass()
class ContainerMount:
destination: str
source: str
type: str
options: list[str] = field(default_factory=list)
options: List[str] = OrderlessField(default_factory=list)


@dataclass
class ContainerUser:
group_idnames: list[ContainerRule] = field(default_factory=lambda: [ContainerRule(pattern="", strategy="any")])
group_idnames: List[ContainerRule] = \
OrderlessField(default_factory=lambda: [ContainerRule(pattern="", strategy="any")])
umask: str = "0022"
user_idname: ContainerRule = field(default_factory=lambda: ContainerRule(pattern="", strategy="any"))
user_idname: ContainerRule = \
Field(default_factory=lambda: ContainerRule(pattern="", strategy="any"))


@dataclass
class FragmentReference:
feed: str
issuer: str
minimum_svn: str
includes: list[Literal["containers", "fragments", "namespace", "external_processes"]]
includes: List[Literal["containers", "fragments", "namespace", "external_processes"]] = \
OrderlessField(default_factory=list)
path: Optional[str] = None


Expand All @@ -78,18 +81,18 @@ class FragmentReference:
class Container:
allow_elevated: bool = False
allow_stdio_access: bool = True
capabilities: ContainerCapabilities = field(default_factory=ContainerCapabilities)
command: Optional[list[str]] = None
env_rules: list[ContainerRule] = field(default_factory=list)
exec_processes: list[ContainerExecProcesses] = field(default_factory=list)
capabilities: ContainerCapabilities = Field(default_factory=ContainerCapabilities)
command: Optional[List[str]] = None
env_rules: List[ContainerRule] = OrderlessField(default_factory=list)
exec_processes: List[ContainerExecProcesses] = OrderlessField(default_factory=list)
id: Optional[str] = None
layers: list[str] = field(default_factory=list)
mounts: list[ContainerMount] = field(default_factory=list)
layers: List[str] = Field(default_factory=list)
mounts: List[ContainerMount] = OrderlessField(default_factory=list)
name: Optional[str] = None
no_new_privileges: bool = False
seccomp_profile_sha256: str = ""
signals: list[str] = field(default_factory=list)
user: ContainerUser = field(default_factory=ContainerUser)
signals: List[str] = OrderlessField(default_factory=list)
user: ContainerUser = Field(default_factory=ContainerUser)
working_dir: str = "/"


Expand All @@ -99,8 +102,8 @@ class Policy:
package: str = "policy"
api_version: str = "0.10.0"
framework_version: str = "0.2.3"
fragments: list[FragmentReference] = field(default_factory=list)
containers: list[Container] = field(default_factory=list)
fragments: List[FragmentReference] = OrderlessField(default_factory=list)
containers: List[Container] = OrderlessField(default_factory=list)
allow_properties_access: bool = True
allow_dump_stacks: bool = False
allow_runtime_logging: bool = False
Expand All @@ -114,5 +117,5 @@ class Fragment:
package: str = "fragment"
svn: str = "0"
framework_version: str = "0.2.3"
fragments: list[FragmentReference] = field(default_factory=list)
containers: list[Container] = field(default_factory=list)
fragments: List[FragmentReference] = OrderlessField(default_factory=list)
containers: List[Container] = OrderlessField(default_factory=list)
100 changes: 100 additions & 0 deletions src/confcom/azext_confcom/lib/serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from dataclasses import asdict
import json
from pathlib import Path
from textwrap import dedent
from typing import Union

from azext_confcom.lib.opa import opa_eval
from azext_confcom.lib.policy import Container, FragmentReference, Fragment, Policy
import re


# This is a single entrypoint for serializing both Policy and Fragment objects
def policy_serialize(policy: Union[Policy, Fragment]):

if isinstance(policy, Fragment):
return fragment_serialize(policy)

policy_dict = asdict(policy)
fragments_json = json.dumps(policy_dict.pop("fragments"), indent=2)
containers_json = json.dumps(policy_dict.pop("containers"), indent=2)

return dedent(f"""
package {policy_dict.pop('package')}

api_version := "{policy_dict.pop('api_version')}"
framework_version := "{policy_dict.pop('framework_version')}"

fragments := {fragments_json}

containers := {containers_json}

{chr(10).join(f"{key} := {str(value).lower()}" for key, value in policy_dict.items() if key.startswith("allow"))}

mount_device := data.framework.mount_device
unmount_device := data.framework.unmount_device
mount_overlay := data.framework.mount_overlay
unmount_overlay := data.framework.unmount_overlay
create_container := data.framework.create_container
exec_in_container := data.framework.exec_in_container
exec_external := data.framework.exec_external
shutdown_container := data.framework.shutdown_container
signal_container_process := data.framework.signal_container_process
plan9_mount := data.framework.plan9_mount
plan9_unmount := data.framework.plan9_unmount
get_properties := data.framework.get_properties
dump_stacks := data.framework.dump_stacks
runtime_logging := data.framework.runtime_logging
load_fragment := data.framework.load_fragment
scratch_mount := data.framework.scratch_mount
scratch_unmount := data.framework.scratch_unmount

reason := {{"errors": data.framework.errors}}
""")


def fragment_serialize(fragment: Fragment):

fragment_dict = asdict(fragment)
fragments_json = json.dumps(fragment_dict.pop("fragments"), indent=2)
containers_json = json.dumps(fragment_dict.pop("containers"), indent=2)

return dedent(f"""
package {fragment_dict.pop('package')}

svn := "{fragment_dict.pop('svn')}"
framework_version := "{fragment_dict.pop('framework_version')}"

fragments := {fragments_json}

containers := {containers_json}
""")


def policy_deserialize(file_path: str):

with open(file_path, 'r') as f:
content = f.read()

package_match = re.search(r'package\s+(\S+)', content)
package_name = package_match.group(1)

PolicyType = Policy if package_name == "policy" else Fragment

raw_json = opa_eval(Path(file_path), f"data.{package_name}")["result"][0]["expressions"][0]["value"]

raw_fragments = raw_json.pop("fragments", [])
raw_containers = raw_json.pop("containers", [])

return PolicyType(
package=package_name,
fragments=[FragmentReference(**fragment) for fragment in raw_fragments],
containers=[Container(**container) for container in raw_containers],
**raw_json
)
7 changes: 5 additions & 2 deletions src/confcom/azext_confcom/security_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# --------------------------------------------------------------------------------------------

import copy
from dataclasses import asdict
import json
import warnings
from enum import Enum, auto
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import TypeAdapter

import deepdiff
from azext_confcom import config, os_util
Expand Down Expand Up @@ -411,7 +411,10 @@ def _policy_serialization(self, pretty_print=False, include_sidecars: bool = Tru
for container in policy:
container[config.POLICY_FIELD_CONTAINERS_ELEMENTS_ALLOW_STDIO_ACCESS] = False

policy += [asdict(Container(**c)) for c in self._container_definitions]
policy += [
TypeAdapter(Container).dump_python(Container(**c), mode="json")
for c in self._container_definitions
]

if pretty_print:
return pretty_print_func(policy)
Expand Down
Loading
Loading