Skip to content
338 changes: 337 additions & 1 deletion src/containerapp/azext_containerapp/_arc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from azure.cli.core.azclierror import (ValidationError, ResourceNotFoundError, CLIError, InvalidArgumentValueError)
from ._constants import (CUSTOM_CORE_DNS_VOLUME_NAME, CUSTOM_CORE_DNS_VOLUME_MOUNT_PATH,
CUSTOM_CORE_DNS, CORE_DNS, KUBE_SYSTEM, EMPTY_CUSTOM_CORE_DNS)
CUSTOM_CORE_DNS, CORE_DNS, KUBE_SYSTEM, EMPTY_CUSTOM_CORE_DNS, OPENSHIFT_DNS)

logger = get_logger(__name__)

Expand Down Expand Up @@ -280,6 +280,21 @@ def update_deployment(resource_name, resource_namespace, kube_client, deployment
raise ValidationError(f"other errors while patching deployment coredns in kube-system {str(e)}")


def create_or_update_deployment(name, namespace, kube_client, deployment):
validate_resource_name_and_resource_namespace_not_empty(name, namespace)

try:
logger.info(f"Start to create deployment {name} in namespace {namespace}")
apps_v1_api = client.AppsV1Api(kube_client)
apps_v1_api.create_namespaced_deployment(namespace=namespace, body=deployment)
except client.exceptions.ApiException as e:
if e.status == 409:
logger.warning(f"Deployment '{name}' already exists, replacing it")
apps_v1_api.replace_namespaced_deployment(name=name, namespace=namespace, body=deployment)
else:
raise CLIError(f"Failed to create or replace Deployment'{name}': {str(e)}")


def replace_deployment(resource_name, resource_namespace, kube_client, deployment):
validate_resource_name_and_resource_namespace_not_empty(resource_name, resource_namespace)

Expand Down Expand Up @@ -321,6 +336,21 @@ def update_configmap(resource_name, resource_namespace, kube_client, config_map)
raise CLIError(f"other errors while patching config map coredns in kube-system {str(e)}")


def create_or_update_configmap(name, namespace, kube_client, configmap):
validate_resource_name_and_resource_namespace_not_empty(name, namespace)

try:
logger.info(f"Start to create configmap {name} in namespace {namespace}")
core_v1_api = client.CoreV1Api(kube_client)
core_v1_api.create_namespaced_config_map(namespace=namespace, body=configmap)
except client.exceptions.ApiException as e:
if e.status == 409:
logger.warning(f"Configmap '{name}' already exists, replacing it")
core_v1_api.replace_namespaced_config_map(name=name, namespace=namespace, body=configmap)
else:
raise CLIError(f"Failed to create or replace ConfigMap '{name}': {str(e)}")


def replace_configmap(resource_name, resource_namespace, kube_client, config_map):
validate_resource_name_and_resource_namespace_not_empty(resource_name, resource_namespace)

Expand Down Expand Up @@ -356,3 +386,309 @@ def validate_resource_name_and_resource_namespace_not_empty(resource_name, resou
raise InvalidArgumentValueError("Arg resource_name should not be None or Empty")
if resource_namespace is None or len(resource_namespace) == 0:
raise InvalidArgumentValueError("Arg resource_namespace should not be None or Empty")


def create_or_replace_cluster_role(rbac_api, role_name, role):
try:
logger.info(f"Creating new ClusterRole '{role_name}'")
rbac_api.create_cluster_role(body=role)
except client.exceptions.ApiException as e:
if e.status == 409:
logger.info(f"ClusterRole '{role_name}' already exists, replacing it")
rbac_api.replace_cluster_role(name=role_name, body=role)
else:
raise CLIError(f"Failed to create or replace ClusterRole '{role_name}': {str(e)}")


def create_or_replace_cluster_rolebinding(rbac_api, rolebinding_name, rolebinding):
try:
logger.info(f"Creating new ClusterRolebinding '{rolebinding_name}'")
rbac_api.create_cluster_role_binding(body=rolebinding)
except client.exceptions.ApiException as e:
if e.status == 409:
logger.info(f"ClusterRole '{rolebinding_name}' already exists, replacing it")
rbac_api.replace_cluster_role_binding(name=rolebinding_name, body=rolebinding)
else:
raise CLIError(f"Failed to create or replace ClusterRole '{rolebinding_name}': {str(e)}")


def create_openshift_custom_coredns_resources(kube_client, namespace=OPENSHIFT_DNS):
try:
logger.info("Creating custom CoreDNS resources in OpenShift")
core_v1_api = client.CoreV1Api(kube_client)
rbac_api = client.RbacAuthorizationV1Api(kube_client)

# 1. Create ClusterRole
cluster_role = client.V1ClusterRole(
metadata=client.V1ObjectMeta(
name=CUSTOM_CORE_DNS
),
rules=[
client.V1PolicyRule(
api_groups=[""],
resources=["services", "endpoints", "pods", "namespaces"],
verbs=["list", "watch"]
),
client.V1PolicyRule(
api_groups=["discovery.k8s.io"],
resources=["endpointslices"],
verbs=["list", "watch"]
)
]
)
create_or_replace_cluster_role(rbac_api, CUSTOM_CORE_DNS, cluster_role)

# 2. Create ClusterRoleBinding
cluster_role_binding = client.V1ClusterRoleBinding(
metadata=client.V1ObjectMeta(
name=CUSTOM_CORE_DNS
),
role_ref=client.V1RoleRef(
api_group="rbac.authorization.k8s.io",
kind="ClusterRole",
name=CUSTOM_CORE_DNS
),
subjects=[
client.V1Subject(
kind="ServiceAccount",
name="default",
namespace=namespace
)
]
)
create_or_replace_cluster_rolebinding(rbac_api, CUSTOM_CORE_DNS, cluster_role_binding)

# 3. Create ConfigMap
existing_config_map = core_v1_api.read_namespaced_config_map(name=CUSTOM_CORE_DNS, namespace=KUBE_SYSTEM)
corefile_data = existing_config_map.data.get("k4apps-default.io.server") or existing_config_map.data.get("Corefile")
if not corefile_data:
raise ValidationError(F"Neither 'k4apps-default.io.server' nor 'Corefile' key found in the {CUSTOM_CORE_DNS} ConfigMap in {KUBE_SYSTEM} namespace.")

config_map = client.V1ConfigMap(
metadata=client.V1ObjectMeta(
name=CUSTOM_CORE_DNS,
namespace=namespace
),
data={"Corefile": corefile_data}
)

create_or_update_configmap(name=CUSTOM_CORE_DNS, namespace=namespace, kube_client=kube_client, configmap=config_map)
logger.info("Custom CoreDNS ConfigMap created successfully")

# 4. Create Deployment
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=CUSTOM_CORE_DNS,
namespace=namespace
),
spec=client.V1DeploymentSpec(
replicas=1,
selector=client.V1LabelSelector(
match_labels={"app": CUSTOM_CORE_DNS}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": CUSTOM_CORE_DNS}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="coredns",
image="coredns/coredns:latest",
args=["-conf", "/etc/coredns/Corefile"],
volume_mounts=[
client.V1VolumeMount(
name="config-volume",
mount_path="/etc/coredns"
)
]
)
],
volumes=[
client.V1Volume(
name="config-volume",
config_map=client.V1ConfigMapVolumeSource(
name=CUSTOM_CORE_DNS
)
)
]
)
)
)
)
create_or_update_deployment(name=CUSTOM_CORE_DNS, namespace=namespace, kube_client=kube_client, deployment=deployment)
logger.info("Custom CoreDNS Deployment created successfully")

# 5 Create Service
service = client.V1Service(
metadata=client.V1ObjectMeta(
name=CUSTOM_CORE_DNS,
namespace=namespace
),
spec=client.V1ServiceSpec(
selector={"app": CUSTOM_CORE_DNS},
ports=[
client.V1ServicePort(
protocol="UDP",
port=53,
target_port=53
)
]
)
)
core_v1_api.create_namespaced_service(namespace=namespace, body=service)
logger.info("Custom CoreDNS Service created successfully")

except client.exceptions.ApiException as e:
if e.status == 409:
logger.warning("Custom CoreDNS resources already exist")
else:
raise CLIError(f"Failed to create custom CoreDNS resources: {str(e)}")
except Exception as e:
raise CLIError(f"An error occurred while creating custom CoreDNS resources: {str(e)}")


def patch_openshift_dns_operator(kube_client, domain, original_folder=None):
try:
logger.info("Patching OpenShift DNS operator to add custom resolver")

# Fetch the existing DNS operator configuration
custom_objects_api = client.CustomObjectsApi(kube_client)

dns_operator_config = get_and_save_openshift_dns_operator_config(kube_client, original_folder)

coredns_service = client.CoreV1Api(kube_client).read_namespaced_service(name=CUSTOM_CORE_DNS, namespace=OPENSHIFT_DNS)

# Add the custom resolver to the DNS operator configuration
servers = dns_operator_config.get("spec", {}).get("servers", [])
custom_resolver = {
"name": CUSTOM_CORE_DNS,
"zones": [domain, f"internal.{domain}"],
"forwardPlugin": {
"upstreams": [coredns_service.spec.cluster_ip],
}
}

# Check if the custom resolver already exists
if not any(server.get("name") == CUSTOM_CORE_DNS for server in servers):
servers.append(custom_resolver)
dns_operator_config["spec"]["servers"] = servers

# Update the DNS operator configuration
custom_objects_api.patch_cluster_custom_object(
group="operator.openshift.io",
version="v1",
plural="dnses",
name="default",
body=dns_operator_config
)
logger.info("Successfully patched OpenShift DNS operator with custom resolver")
else:
logger.info("Custom resolver already exists in the DNS operator configuration")

except client.exceptions.ApiException as e:
raise CLIError(f"Failed to patch DNS operator: {str(e)}")
except Exception as e:
raise CLIError(f"An error occurred while patching DNS operator: {str(e)}")


def extract_domain_from_configmap(kube_client, resource_name=CUSTOM_CORE_DNS, namespace=KUBE_SYSTEM):
import re

try:
core_v1_api = client.CoreV1Api(kube_client)
configmap = core_v1_api.read_namespaced_config_map(name=CUSTOM_CORE_DNS, namespace=KUBE_SYSTEM)
if configmap is None:
raise ResourceNotFoundError(f"ConfigMap '{resource_name}' not found in namespace '{namespace}'.")

corefile = configmap.data.get("k4apps-default.io.server")
if not corefile:
raise ValidationError("'k4apps-default.io.server' key found in the coredns-custom ConfigMap in kube-system namespace.")

# Extract the domain (excluding 'dapr')
for line in corefile.splitlines():
match = re.match(r'^\s*([a-zA-Z0-9\-\.]+):53\s*{', line)
if match and match.group(1) != "dapr":
return match.group(1)

raise ValidationError("No valid domain found in CoreDNS configmap data.")
except Exception as e:
logger.error(f"Failed to extract domain from configmap: {str(e)}")
return None


def get_and_save_openshift_dns_operator_config(kube_client, folder=None):
try:
custom_objects_api = client.CustomObjectsApi(kube_client)
dns_operator_config = custom_objects_api.get_cluster_custom_object(
group="operator.openshift.io",
version="v1",
plural="dnses",
name="default"
)

if folder is not None:
filepath = os.path.join(folder, "openshift-dns-operator-config.json")
with open(filepath, "w") as f:
f.write(json.dumps(dns_operator_config, indent=2))
logger.info(f"OpenShift DNS operator configuration saved to {filepath}")

return dns_operator_config
except Exception as e:
raise ValidationError(f"Failed to retrieve OpenShift DNS operator configuration: {str(e)}")


def restart_openshift_dns_pods(kube_client):
try:
label_selector = "dns.operator.openshift.io/daemonset-dns=default"

# Get the list of pods first to show what will be deleted
core_v1_api = client.CoreV1Api(kube_client)
pods = core_v1_api.list_namespaced_pod(
namespace=OPENSHIFT_DNS,
label_selector=label_selector
)

if not pods.items:
logger.info(f"No DNS pods found in namespace '{OPENSHIFT_DNS}' with label '{label_selector}'")
return

# Show user what pods will be deleted
pod_names = [pod.metadata.name for pod in pods.items]
logger.info(f"Found {len(pod_names)} DNS pods to restart:")
for pod_name in pod_names:
logger.info(f" - {pod_name}")

try:
response = input(f"The DNS pods in namespace '{OPENSHIFT_DNS}' needs to be restarted. Are you sure you want to proceed? (y/n): ")
confirmed = response.lower() in ['y', 'yes']
except (EOFError, KeyboardInterrupt):
logger.info("Operation cancelled by user")
return

if not confirmed:
logger.info("Operation cancelled by user")
return

# Delete the pods
logger.info(f"Deleting DNS pods in namespace '{OPENSHIFT_DNS}'...")
delete_options = client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=30
)

core_v1_api.delete_collection_namespaced_pod(
namespace=OPENSHIFT_DNS,
label_selector=label_selector,
body=delete_options
)

logger.info("Successfully initiated deletion of DNS pods. DaemonSet will recreate them automatically.")

except client.exceptions.ApiException as e:
if e.status == 404:
logger.warning(f"Namespace '{OPENSHIFT_DNS}' or pods with label '{label_selector}' not found")
else:
raise CLIError(f"Failed to restart DNS pods: {str(e)}")
except Exception as e:
raise CLIError(f"An error occurred while restarting DNS pods: {str(e)}")
4 changes: 3 additions & 1 deletion src/containerapp/azext_containerapp/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@
SUPPORTED_RUNTIME_LIST = [RUNTIME_GENERIC, RUNTIME_JAVA]

AKS_AZURE_LOCAL_DISTRO = "AksAzureLocal"
SETUP_CORE_DNS_SUPPORTED_DISTRO = [AKS_AZURE_LOCAL_DISTRO]
OPENSHIFT_DISTRO = "openshift"
SETUP_CORE_DNS_SUPPORTED_DISTRO = [AKS_AZURE_LOCAL_DISTRO, OPENSHIFT_DISTRO]
CUSTOM_CORE_DNS_VOLUME_NAME = 'custom-config-volume'
CUSTOM_CORE_DNS_VOLUME_MOUNT_PATH = '/etc/coredns/custom'
CUSTOM_CORE_DNS = 'coredns-custom'
CORE_DNS = 'coredns'
OPENSHIFT_DNS = 'openshift-dns'
KUBE_SYSTEM = 'kube-system'
EMPTY_CUSTOM_CORE_DNS = """
apiVersion: v1
Expand Down
4 changes: 2 additions & 2 deletions src/containerapp/azext_containerapp/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
validate_custom_location_name_or_id, validate_env_name_or_id_for_up,
validate_otlp_headers, validate_target_port_range, validate_timeout_in_seconds)
from ._constants import (MAXIMUM_CONTAINER_APP_NAME_LENGTH, MAXIMUM_APP_RESILIENCY_NAME_LENGTH, MAXIMUM_COMPONENT_RESILIENCY_NAME_LENGTH,
AKS_AZURE_LOCAL_DISTRO)
AKS_AZURE_LOCAL_DISTRO, OPENSHIFT_DISTRO)


def load_arguments(self, _):
Expand Down Expand Up @@ -378,7 +378,7 @@ def load_arguments(self, _):
c.argument('yaml', type=file_type, help='Path to a .yaml file with the configuration of a Dapr component. All other parameters will be ignored. For an example, see https://learn.microsoft.com/en-us/azure/container-apps/dapr-overview?tabs=bicep1%2Cyaml#component-schema')

with self.argument_context('containerapp arc setup-core-dns') as c:
c.argument('distro', arg_type=get_enum_type([AKS_AZURE_LOCAL_DISTRO]), required=True, help="The distro supported to setup CoreDNS.")
c.argument('distro', arg_type=get_enum_type([AKS_AZURE_LOCAL_DISTRO, OPENSHIFT_DISTRO]), required=True, help="The distro supported to setup CoreDNS.")
c.argument('kube_config', help="Path to the kube config file.")
c.argument('kube_context', help="Kube context from current machine.")
c.argument('skip_ssl_verification', help="Skip SSL verification for any cluster connection.")
Expand Down
Loading
Loading