Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/rpdk/guard_rail/core/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class DIFFKEYS:
"arrayType",
}

# Top-level object constructs that need special tracking
cfn_object_constructs = {
"tagging",
}

native_constructs = {
"type",
"description",
Expand Down Expand Up @@ -161,6 +166,16 @@ def _is_cfn_leaf_construct(path_list):
return len(path_list) > 0 and path_list[-1] in cfn_leaf_level_constructs


def _is_cfn_object_construct(path_list):
"""This method defines cfn object constructs like tagging"""
return len(path_list) == 1 and path_list[0] in cfn_object_constructs


def _is_tagging_property(path_list):
"""This method checks if path is a tagging nested property"""
return len(path_list) == 2 and path_list[0] == "tagging"
Copy link
Contributor

@agarikana agarikana Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we put 2 inside a var like TAGGING_NEST_DEPTH or something and use that var everywhere?



def _get_path(path_list):
"""This method converts array into schema path notation"""
return "/".join([""] + path_list)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is [""] required or is it AI generated 😄

Expand Down Expand Up @@ -326,10 +341,32 @@ def _translate_dict_change(
def __translate_dict_diff(diffkey, schema_meta_diff, diff_value):
for key, value in diff_value.items():
path_list = _cast_path(key)

if _is_combiner_property(path_list):
raise NotImplementedError(
"Schemas with combiners are not yet supported for stateful evaluation"
)
if _is_cfn_object_construct(path_list):

_add_item(schema_meta_diff, path_list[0], diffkey, True)
if _is_tagging_property(path_list):
# Track nested tagging properties in flat structure
# For removed/added: tagging: {removed: ['/tagging/taggable', ...]}
# For changed: tagging: {changed: [{property: '/tagging/taggable', old_value: ..., new_value: ...}]}
if len(path_list) == 2: # e.g., ['tagging', 'taggable']
parent_key = path_list[0] # 'tagging'

if parent_key not in schema_meta_diff:
schema_meta_diff[parent_key] = {}

# Create the property path
property_path = _get_path(path_list)

# For added/removed, just add the path string
# (changed is handled separately in _translate_values_changed_diff_)
if diffkey not in schema_meta_diff[parent_key]:
schema_meta_diff[parent_key][diffkey] = []
schema_meta_diff[parent_key][diffkey].append(property_path)
if _is_resource_property(path_list):
_add_item(schema_meta_diff, PROPERTIES, diffkey, _get_path(path_list))
if isinstance(value, dict):
Expand Down Expand Up @@ -404,6 +441,24 @@ def _translate_values_changed_diff_(schema_meta_diff, diff_value):
DIFFKEYS.ADDED,
value[DIFFKEYS.NEW_VALUE],
)
if _is_tagging_property(path_list):
# Track changes to nested tagging properties in flat structure
# Create structure like: tagging: {changed: [{property: '/tagging/taggable', ...}]}
if len(path_list) == 2: # e.g., ['tagging', 'taggable']
parent_key = path_list[0] # 'tagging'

if parent_key not in schema_meta_diff:
schema_meta_diff[parent_key] = {}

change_value = {
DIFFKEYS.PROPERTY: "/" + "/".join(path_list),
DIFFKEYS.OLD_VALUE: value[DIFFKEYS.OLD_VALUE],
DIFFKEYS.NEW_VALUE: value[DIFFKEYS.NEW_VALUE],
}

if DIFFKEYS.CHANGED not in schema_meta_diff[parent_key]:
schema_meta_diff[parent_key][DIFFKEYS.CHANGED] = []
schema_meta_diff[parent_key][DIFFKEYS.CHANGED].append(change_value)
if _is_json_construct(path_list):
_add_item(
schema_meta_diff,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,68 @@ rule ensure_maximum_not_contracted when maximum exists
>>
}
}

# Tagging Backward Compatibility Rules

rule ensure_tagging_properties_not_removed when tagging exists
{
tagging.removed !exists
<<
{
"result": "NON_COMPLIANT",
"check_id": "TAG100",
"message": "tagging properties MUST NOT be removed from schema"
}
>>
}

rule ensure_taggable_not_changed when tagging exists
{
when tagging.changed exists {
tagging.changed[*] {
when this.property == "/tagging/taggable" {
this.old_value == true
this.new_value != false
<<
{
"result": "NON_COMPLIANT",
"check_id": "TAG101",
"message": "tagging.taggable MUST NOT change from true to false"
}
>>
}
when this.property == "/tagging/tagOnCreate" {
this.old_value == true
this.new_value != false
<<
{
"result": "NON_COMPLIANT",
"check_id": "TAG102",
"message": "tagging.tagOnCreate MUST NOT change from true to false"
}
>>
}
when this.property == "/tagging/cloudFormationSystemTags" {
this.old_value == true
this.new_value != false
<<
{
"result": "NON_COMPLIANT",
"check_id": "TAG103",
"message": "tagging.cloudFormationSystemTags MUST NOT change from true to false"
}
>>
}
when this.property == "/tagging/tagProperty" {
this.old_value == this.new_value
<<
{
"result": "NON_COMPLIANT",
"check_id": "TAG104",
"message": "tagging.tagProperty MUST NOT change"
}
>>
}
}
}
}
213 changes: 213 additions & 0 deletions tests/integ/runner/test_integ_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,34 @@ def test_exec_compliance_stateful(
assert "Stateful evaluation is not supported yet" == str(e)


def test_exec_compliance_stateful2():
"""Test exec_compliance for stateful"""
try:
new = {
"tagging": {
"taggable": True,
"tagOnCreate": True,
"tagUpdatable": True,
"cloudFormationSystemTags": False,
"tagProperty": "/properties/Tags",
"permissions": [
"test:TagResource",
"test:UntagResource",
"test:ListTagsForResource",
],
}
}
old = {}
payload: Stateful = Stateful(
previous_schema=old,
current_schema=new,
)
compliance_result = exec_compliance(payload)[0]
assert compliance_result.compliant == ["ensure_tagging_properties_not_removed"]
except NotImplementedError as e:
assert "Stateful evaluation is not supported yet" == str(e)


@pytest.mark.parametrize(
"previous_schema, current_schema, collected_rules,non_compliant_rules,warning_rules",
[
Expand Down Expand Up @@ -1242,3 +1270,188 @@ def test_exec_compliance_stateful_default_value_breaking_change(
assert (
non_compliant_result == compliance_result.non_compliant[non_compliant_rule]
)


@pytest.mark.parametrize(
"previous_schema,current_schema,collected_rules,non_compliant_rules,warning_rules",
[
# Test Case: TAG100 - Tagging section removal
(
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {
"taggable": True,
"tagOnCreate": True,
"tagUpdatable": True,
"tagProperty": "/properties/Tags",
},
},
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
},
[],
{
"ensure_tagging_properties_not_removed": {
GuardRuleResult(
check_id="TAG100",
message="tagging properties MUST NOT be removed from schema",
path="/tagging/removed",
)
}
},
[],
),
# Test Case: TAG101 - Taggable changed from true to false
(
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {
"taggable": True,
"tagOnCreate": True,
"tagProperty": "/properties/Tags",
},
},
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {
"taggable": False,
"tagOnCreate": False,
"tagProperty": "/properties/Tags",
},
},
[],
{
"ensure_taggable_not_changed": {
GuardRuleResult(
check_id="TAG101",
message="tagging.taggable MUST NOT change from true to false",
path="/tagging/changed/0/new_value",
),
GuardRuleResult(
check_id="TAG102",
message="tagging.tagOnCreate MUST NOT change from true to false",
path="/tagging/changed/1/new_value",
),
},
},
[],
),
# Test Case: TAG100 - Taggable property removal
(
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"taggable": True, "tagProperty": "/properties/Tags"},
},
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"tagProperty": "/properties/Tags"},
},
[],
{
"ensure_tagging_properties_not_removed": {
GuardRuleResult(
check_id="TAG100",
message="tagging properties MUST NOT be removed from schema",
path="/tagging/removed",
)
}
},
[],
),
# Test Case: TAG102 - TagProperty path changed
(
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"taggable": True, "tagProperty": "/properties/Tags"},
},
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"taggable": True, "tagProperty": "/properties/TagList"},
},
[],
{
"ensure_taggable_not_changed": {
GuardRuleResult(
check_id="TAG104",
message="tagging.tagProperty MUST NOT change",
path="/tagging/changed/0/old_value",
)
}
},
[],
),
# Test Case: TAG100 - TagProperty removal
(
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"taggable": True, "tagProperty": "/properties/Tags"},
},
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"taggable": True},
},
[],
{
"ensure_tagging_properties_not_removed": {
GuardRuleResult(
check_id="TAG100",
message="tagging properties MUST NOT be removed from schema",
path="/tagging/removed",
)
}
},
[],
),
# Test Case: Adding tagging metadata (should be compliant)
(
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
},
{
"typeName": "AWS::Test::Resource",
"properties": {"Id": {"type": "string"}},
"tagging": {"taggable": True, "tagProperty": "/properties/Tags"},
},
[],
{},
[],
),
],
)
def test_exec_compliance_stateful_tagging_backward_compatibility(
previous_schema, current_schema, collected_rules, non_compliant_rules, warning_rules
):
"""Test tagging backward compatibility guard rules (TAG100-TAG104)"""
payload: Stateful = Stateful(
previous_schema=previous_schema,
current_schema=current_schema,
rules=collected_rules,
print_diff_to_console=False,
)
compliance_result = exec_compliance(payload)[0]

# If no violations expected, assert non_compliant is empty
if not non_compliant_rules:
assert (
not compliance_result.non_compliant
), f"Expected no violations but got: {compliance_result.non_compliant}"

# Check each expected violation
for non_compliant_rule, non_compliant_result in non_compliant_rules.items():
assert (
non_compliant_rule in compliance_result.non_compliant
), f"Expected rule '{non_compliant_rule}' not found in violations"
assert (
non_compliant_result == compliance_result.non_compliant[non_compliant_rule]
), f"Mismatch in violations for rule '{non_compliant_rule}'"
Loading