Skip to content

Commit 7f854fb

Browse files
committed
address comments
1 parent b52c322 commit 7f854fb

File tree

6 files changed

+758
-415
lines changed

6 files changed

+758
-415
lines changed

samtranslator/model/__init__.py

Lines changed: 18 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import re
55
from abc import ABC, ABCMeta, abstractmethod
66
from contextlib import suppress
7-
from enum import Enum
87
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar
98

109
from samtranslator.compat import pydantic
@@ -16,6 +15,7 @@
1615
from samtranslator.model.tags.resource_tagging import get_tag_list
1716
from samtranslator.model.types import IS_DICT, IS_STR, PassThrough, Validator, any_type, is_type
1817
from samtranslator.plugins import LifeCycleEvents
18+
from samtranslator.validator.property_rule import PropertyRules
1919

2020
RT = TypeVar("RT", bound=pydantic.BaseModel) # return type
2121

@@ -395,8 +395,11 @@ def _format_all_errors(self, errors: List[Dict[str, Any]]) -> List[str]:
395395
# Multiple types - consolidate with union
396396
type_text = " or ".join(unique_types)
397397
result.append(f"Property '{path}' value must be {type_text}.")
398+
elif len(unique_types) == 1:
399+
# Single type - use type mapping
400+
result.append(f"Property '{path}' value must be {unique_types[0]}.")
398401
else:
399-
# Single or no types - format normally
402+
# No types - format normally
400403
result.append(self._format_single_error(data["error"]))
401404

402405
return result
@@ -542,16 +545,6 @@ def to_cloudformation(self, **kwargs: Any) -> List[Any]:
542545
"""
543546

544547

545-
class ValidationRule(Enum):
546-
MUTUALLY_EXCLUSIVE = "mutually_exclusive"
547-
MUTUALLY_INCLUSIVE = "mutually_inclusive"
548-
CONDITIONAL_REQUIREMENT = "conditional_requirement"
549-
550-
551-
# Simple tuple-based rules: (rule_type, [property_names])
552-
PropertyRule = Tuple[ValidationRule, List[str]]
553-
554-
555548
class SamResourceMacro(ResourceMacro, metaclass=ABCMeta):
556549
"""ResourceMacro that specifically refers to SAM (AWS::Serverless::*) resources."""
557550

@@ -579,6 +572,10 @@ class SamResourceMacro(ResourceMacro, metaclass=ABCMeta):
579572
# Aggregate list of all reserved tags
580573
_RESERVED_TAGS = [_SAM_KEY, _SAR_APP_KEY, _SAR_SEMVER_KEY]
581574

575+
def get_property_validation_rules(self) -> Optional[PropertyRules]:
576+
"""Override this method in child classes to provide PropertyRules validation."""
577+
return None
578+
582579
def get_resource_references(self, generated_cfn_resources, supported_resource_refs): # type: ignore[no-untyped-def]
583580
"""
584581
Constructs the list of supported resource references by going through the list of CFN resources generated
@@ -687,71 +684,21 @@ def _check_tag(self, reserved_tag_name: str, tags: Dict[str, Any]) -> None:
687684
"input.",
688685
)
689686

690-
def validate_before_transform(self, schema_class: Optional[Type[RT]], collect_all_errors: bool = False) -> None:
691-
if not hasattr(self, "__validation_rules__"):
687+
def validate_before_transform(self, schema_class: Type[RT]) -> None:
688+
rules = self.get_property_validation_rules()
689+
if rules is None:
692690
return
693691

694-
rules = self.__validation_rules__
695-
validated_model = (
696-
self.validate_properties_and_return_model(schema_class, collect_all_errors) if schema_class else None
697-
)
698-
699-
error_messages = []
700-
701-
for rule_type, properties in rules:
702-
present = [prop for prop in properties if self._get_property_value(prop, validated_model) is not None]
703-
if rule_type == ValidationRule.MUTUALLY_EXCLUSIVE:
704-
# Check if more than one property exists
705-
if len(present) > 1:
706-
prop_names = [f"'{p}'" for p in present]
707-
error_messages.append(f"Cannot specify {self._combine_string(prop_names)} together.")
708-
709-
elif rule_type == ValidationRule.MUTUALLY_INCLUSIVE:
710-
# If any property in the group is present, then all properties in the group must be present
711-
# Check if some but not all properties from the group are present
712-
missing_some_properties = 0 < len(present) < len(properties)
713-
if missing_some_properties:
714-
error_messages.append(f"Properties must be used together: {self._combine_string(properties)}.")
715-
716-
elif (
717-
rule_type == ValidationRule.CONDITIONAL_REQUIREMENT
718-
and self._get_property_value(properties[0], validated_model) is not None
719-
and self._get_property_value(properties[1], validated_model) is None
720-
):
721-
# First property requires second property
722-
error_messages.append(f"'{properties[0]}' requires '{properties[1]}'.")
692+
# Validate properties and get model, then pass to rules
693+
try:
694+
validated_model = self.validate_properties_and_return_model(schema_class)
695+
except Exception:
696+
validated_model = None
723697

724-
# If there are any validation errors, raise a single exception with all error messages
698+
error_messages = rules.validate_all(validated_model)
725699
if error_messages:
726700
raise InvalidResourceException(self.logical_id, "\n".join(error_messages))
727701

728-
def _combine_string(self, words: List[str]) -> str:
729-
return ", ".join(words[:-1]) + (" and " + words[-1] if len(words) > 1 else words[0] if words else "")
730-
731-
def _get_property_value(self, prop: str, validated_model: Any = None) -> Any:
732-
"""Original property value getter. Supports nested properties with dot notation."""
733-
if "." not in prop:
734-
# Simple property - use existing logic for direct attributes
735-
return getattr(self, prop, None)
736-
737-
# Nested property - use validated model
738-
if validated_model is None:
739-
return None
740-
741-
try:
742-
# Navigate through nested properties using dot notation
743-
value = validated_model
744-
for part in prop.split("."):
745-
if hasattr(value, part):
746-
value = getattr(value, part)
747-
if value is None:
748-
return None
749-
else:
750-
return None
751-
return value
752-
except Exception:
753-
return None
754-
755702

756703
class ResourceTypeResolver:
757704
"""ResourceTypeResolver maps Resource Types to Resource classes, e.g. AWS::Serverless::Function to

samtranslator/model/sam_resources.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -253,14 +253,20 @@ class SamFunction(SamResourceMacro):
253253
"DestinationQueue": SQSQueue.resource_type,
254254
}
255255

256-
# Validation rules
256+
# def get_property_validation_rules(self) -> Optional[PropertyRules]:
257+
# """Override to provide PropertyRules validation for SAM Function."""
257258
# TODO: To enable these rules, we need to update translator test input/output files to property configure template
258259
# to avoid fail-fast. eg: test with DeploymentPreference without AutoPublishAlias would fail fast before reaching testing state
259-
# __validation_rules__ = [
260-
# (ValidationRule.MUTUALLY_EXCLUSIVE, ["ImageUri", "InlineCode", "CodeUri"]),
261-
# (ValidationRule.CONDITIONAL_REQUIREMENT, ["DeploymentPreference", "AutoPublishAlias"]),
262-
# (ValidationRule.CONDITIONAL_REQUIREMENT, ["ProvisionedConcurrencyConfig", "AutoPublishAlias"]),
263-
# ]
260+
# from samtranslator.internal.schema_source.aws_serverless_function import Properties as FunctionProperties
261+
# return (PropertyRules(FunctionProperties)
262+
# .addMutuallyExclusive("ImageUri", "InlineCode", "CodeUri")
263+
# .addConditionalInclusive("DeploymentPreference", ["AutoPublishAlias"])
264+
# .addConditionalInclusive("ProvisionedConcurrencyConfig", ["AutoPublishAlias"])
265+
# .addConditionalInclusive("PackageType=Zip", ["Runtime", "Handler"])
266+
# .addConditionalInclusive("PackageType=Image", ["ImageUri"])
267+
# .addConditionalExclusive("PackageType=Zip", ["ImageUri", "ImageConfig"])
268+
# .addConditionalExclusive("PackageType=Image", ["Runtime", "Handler", "Layers"]))
269+
# return None
264270

265271
def resources_to_link(self, resources: Dict[str, Any]) -> Dict[str, Any]:
266272
try:
@@ -287,7 +293,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
287293
# TODO: Skip pass schema_class=aws_serverless_function.Properties to skip schema validation for now.
288294
# - adding this now would required update error message in error error_function_*_test.py
289295
# - add this when we can verify that changing error message would not break customers
290-
# self.validate_before_transform(schema_class=None, collect_all_errors=False)
296+
# self.validate_before_transform(schema_class=aws_serverless_function.Properties)
291297

292298
if self.DeadLetterQueue:
293299
self._validate_dlq(self.DeadLetterQueue)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from enum import Enum
2+
from typing import Any, List, Optional, Tuple, TypeVar
3+
4+
RT = TypeVar("RT")
5+
6+
7+
class RuleType(Enum):
8+
MUTUALLY_EXCLUSIVE = "mutually_exclusive"
9+
CONDITIONAL_EXCLUSIVE = "conditional_exclusive"
10+
MUTUALLY_INCLUSIVE = "mutually_inclusive"
11+
CONDITIONAL_INCLUSIVE = "conditional_inclusive"
12+
13+
14+
class PropertyRules:
15+
def __init__(self) -> None:
16+
self._rules: List[Tuple[RuleType, List[str], List[str]]] = []
17+
18+
def addMutuallyExclusive(self, *props: str) -> "PropertyRules":
19+
self._rules.append((RuleType.MUTUALLY_EXCLUSIVE, list(props), []))
20+
return self
21+
22+
def addConditionalExclusive(self, source_prop: str, target_props: List[str]) -> "PropertyRules":
23+
self._rules.append((RuleType.CONDITIONAL_EXCLUSIVE, [source_prop], target_props))
24+
return self
25+
26+
def addMutuallyInclusive(self, *props: str) -> "PropertyRules":
27+
self._rules.append((RuleType.MUTUALLY_INCLUSIVE, list(props), []))
28+
return self
29+
30+
def addConditionalInclusive(self, source_prop: str, target_props: List[str]) -> "PropertyRules":
31+
self._rules.append((RuleType.CONDITIONAL_INCLUSIVE, [source_prop], target_props))
32+
return self
33+
34+
def validate_all(self, validated_model: Optional[RT]) -> List[str]:
35+
if validated_model is None:
36+
return []
37+
38+
errors = []
39+
for rule_type, source_props, target_props in self._rules:
40+
# Validate property names during validation
41+
all_props = source_props + target_props
42+
for prop in all_props:
43+
# Skip validation for value-based conditions (PropertyName=Value)
44+
if "=" in prop:
45+
prop_name = prop.split("=", 1)[0]
46+
if "." not in prop_name and not hasattr(validated_model, prop_name):
47+
errors.append(f"Property '{prop_name}' does not exist in the schema")
48+
continue
49+
elif "." not in prop and not hasattr(validated_model, prop):
50+
errors.append(f"Property '{prop}' does not exist in the schema")
51+
continue
52+
53+
error = self._validate_rule(validated_model, rule_type, source_props, target_props)
54+
if error:
55+
errors.append(error)
56+
return errors
57+
58+
def _validate_rule(
59+
self, validated_model: RT, rule_type: RuleType, source_props: List[str], target_props: List[str]
60+
) -> Optional[str]:
61+
if rule_type == RuleType.MUTUALLY_EXCLUSIVE:
62+
return self._validate_mutually_exclusive(validated_model, source_props)
63+
if rule_type == RuleType.CONDITIONAL_EXCLUSIVE:
64+
return self._validate_conditional_exclusive(validated_model, source_props, target_props)
65+
if rule_type == RuleType.MUTUALLY_INCLUSIVE:
66+
return self._validate_mutually_inclusive(validated_model, source_props)
67+
if rule_type == RuleType.CONDITIONAL_INCLUSIVE:
68+
return self._validate_conditional_inclusive(validated_model, source_props, target_props)
69+
return None
70+
71+
def _validate_mutually_exclusive(self, validated_model: RT, source_props: List[str]) -> Optional[str]:
72+
present = [prop for prop in source_props if self._get_property_value(validated_model, prop) is not None]
73+
if len(present) > 1:
74+
present_props = " and ".join(f"'{p}'" for p in present)
75+
return f"Cannot specify {present_props} together."
76+
return None
77+
78+
def _validate_conditional_exclusive(
79+
self, validated_model: RT, source_props: List[str], target_props: List[str]
80+
) -> Optional[str]:
81+
source_present = any(self._check_property_condition(validated_model, prop) for prop in source_props)
82+
if source_present:
83+
conflicting = [prop for prop in target_props if self._check_property_condition(validated_model, prop)]
84+
if conflicting:
85+
conflicting_props = ", ".join(f"'{p}'" for p in conflicting)
86+
return f"'{source_props[0]}' cannot be used with {conflicting_props}."
87+
return None
88+
89+
def _validate_mutually_inclusive(self, validated_model: RT, source_props: List[str]) -> Optional[str]:
90+
present = [prop for prop in source_props if self._get_property_value(validated_model, prop) is not None]
91+
if 0 < len(present) < len(source_props):
92+
missing = [prop for prop in source_props if prop not in present]
93+
missing_props = ", ".join(f"'{p}'" for p in missing)
94+
present_props = ", ".join(f"'{p}'" for p in present)
95+
return f"When using {present_props}, you must also specify {missing_props}."
96+
return None
97+
98+
def _validate_conditional_inclusive(
99+
self, validated_model: RT, source_props: List[str], target_props: List[str]
100+
) -> Optional[str]:
101+
source_present = any(self._check_property_condition(validated_model, prop) for prop in source_props)
102+
if source_present:
103+
missing = [prop for prop in target_props if not self._check_property_condition(validated_model, prop)]
104+
if missing:
105+
missing_props = ", ".join(f"'{p}'" for p in missing)
106+
return f"'{source_props[0]}' requires all of: {missing_props}."
107+
return None
108+
109+
def _get_property_value(self, validated_model: RT, prop: str) -> Any:
110+
if "." not in prop:
111+
return getattr(validated_model, prop, None)
112+
113+
# Handle nested properties recursively
114+
try:
115+
first_part, rest = prop.split(".", 1)
116+
117+
# Get the first level value
118+
if hasattr(validated_model, first_part):
119+
value = getattr(validated_model, first_part)
120+
elif isinstance(validated_model, dict) and first_part in validated_model:
121+
value = validated_model[first_part]
122+
else:
123+
return None
124+
125+
# If value is None, return None
126+
if value is None:
127+
return None
128+
129+
# Recursively get the rest of the property path
130+
return self._get_property_value(value, rest)
131+
except Exception:
132+
return None
133+
134+
def _check_property_condition(self, validated_model: RT, prop: str) -> bool:
135+
"""Check if property condition is met. Supports 'PropertyName=Value' syntax."""
136+
if "=" in prop:
137+
prop_name, expected_value = prop.split("=", 1)
138+
actual_value = self._get_property_value(validated_model, prop_name)
139+
return actual_value is not None and str(actual_value) == expected_value
140+
return self._get_property_value(validated_model, prop) is not None

0 commit comments

Comments
 (0)