Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
resolution: ["highest", "lowest-direct"]
env:
# Shared env variables for all the tests
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9
3.10
9 changes: 4 additions & 5 deletions protovalidate/internal/extra_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import math
import typing
from urllib import parse as urlparse

import celpy
Expand Down Expand Up @@ -42,7 +41,7 @@ def cel_get_field(message: celtypes.Value, field_name: celtypes.Value) -> celpy.
return field_to_cel(message.msg, message.desc.fields_by_name[field_name])


def cel_is_ip(val: celtypes.Value, ver: typing.Optional[celtypes.Value] = None) -> celpy.Result:
def cel_is_ip(val: celtypes.Value, ver: celtypes.Value | None = None) -> celpy.Result:
"""Return True if the string is an IPv4 or IPv6 address, optionally limited to a specific version.

Version 0 or None means either 4 or 6. Passing a version other than 0, 4, or 6 always returns False.
Expand Down Expand Up @@ -307,7 +306,7 @@ def cel_is_nan(val: celtypes.Value) -> celpy.Result:
return celtypes.BoolType(math.isnan(val))


def cel_is_inf(val: celtypes.Value, sign: typing.Optional[celtypes.Value] = None) -> celpy.Result:
def cel_is_inf(val: celtypes.Value, sign: celtypes.Value | None = None) -> celpy.Result:
if not isinstance(val, celtypes.DoubleType):
msg = "invalid argument, expected double"
raise celpy.CELEvalError(msg)
Expand All @@ -326,7 +325,7 @@ def cel_is_inf(val: celtypes.Value, sign: typing.Optional[celtypes.Value] = None


def cel_unique(val: celtypes.Value) -> celpy.Result:
if not isinstance(val, (celtypes.ListType, list)):
if not isinstance(val, celtypes.ListType | list):
msg = "invalid argument, expected list"
raise celpy.CELEvalError(msg)
return celtypes.BoolType(len(val) == len(set(val)))
Expand Down Expand Up @@ -512,7 +511,7 @@ class Ipv6:
_double_colon_at: int # Number of 16-bit pieces found when double colon was found.
_double_colon_seen: bool
_dotted_raw: str # Dotted notation for right-most 32 bits.
_dotted_addr: typing.Optional[Ipv4] # Dotted notation successfully parsed as Ipv4.
_dotted_addr: Ipv4 | None # Dotted notation successfully parsed as Ipv4.
_zone_id_found: bool
_prefix_len: int # 0 -128

Expand Down
42 changes: 21 additions & 21 deletions protovalidate/internal/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def _get_type_name(fd: typing.Any) -> str:
return md["name"]


def _get_type_ctor(fd: typing.Any) -> typing.Optional[typing.Callable[..., celtypes.Value]]:
def _get_type_ctor(fd: typing.Any) -> typing.Callable[..., celtypes.Value] | None:
md = _FIELD_DESC_METADATA_MAP.get(fd)
if md is None:
return None
Expand Down Expand Up @@ -312,19 +312,19 @@ def validate(self, ctx: RuleContext, _: message.Message):
class CelRunner:
runner: celpy.Runner
rule: validate_pb2.Rule
rule_value: typing.Optional[typing.Any] = None
rule_cel: typing.Optional[celtypes.Value] = None
rule_path: typing.Optional[validate_pb2.FieldPath] = None
rule_value: typing.Any | None = None
rule_cel: celtypes.Value | None = None
rule_path: validate_pb2.FieldPath | None = None


class CelRules(Rules):
"""A rule that has rules written in CEL."""

_cel: list[CelRunner]
_rules: typing.Optional[message.Message] = None
_rules_cel: typing.Optional[celtypes.Value] = None
_rules: message.Message | None = None
_rules_cel: celtypes.Value | None = None

def __init__(self, rules: typing.Optional[message.Message]):
def __init__(self, rules: message.Message | None):
self._cel = []
if rules is not None:
self._rules = rules
Expand All @@ -334,8 +334,8 @@ def _validate_cel(
self,
ctx: RuleContext,
*,
this_value: typing.Optional[typing.Any] = None,
this_cel: typing.Optional[celtypes.Value] = None,
this_value: typing.Any | None = None,
this_cel: celtypes.Value | None = None,
for_key: bool = False,
):
activation: dict[str, celtypes.Value] = {}
Expand Down Expand Up @@ -379,8 +379,8 @@ def add_rule(
funcs: dict[str, celpy.CELFunction],
rules: validate_pb2.Rule,
*,
rule_field: typing.Optional[descriptor.FieldDescriptor] = None,
rule_path: typing.Optional[validate_pb2.FieldPath] = None,
rule_field: descriptor.FieldDescriptor | None = None,
rule_path: validate_pb2.FieldPath | None = None,
):
ast = env.compile(rules.expression)
prog = env.program(ast, functions=funcs)
Expand Down Expand Up @@ -430,7 +430,7 @@ class MessageRules(CelRules):

_oneofs: list[MessageOneofRule]

def __init__(self, rules: typing.Optional[message.Message], desc: descriptor.Descriptor):
def __init__(self, rules: message.Message | None, desc: descriptor.Descriptor):
super().__init__(rules)
self._oneofs = []
self._desc = desc
Expand Down Expand Up @@ -467,7 +467,7 @@ def add_oneof(
self._oneofs.append(MessageOneofRule(fields, required=rule.required))


def check_field_type(field: descriptor.FieldDescriptor, expected: int, wrapper_name: typing.Optional[str] = None):
def check_field_type(field: descriptor.FieldDescriptor, expected: int, wrapper_name: str | None = None):
if field.type != expected and (
field.type != descriptor.FieldDescriptor.TYPE_MESSAGE or field.message_type.full_name != wrapper_name
):
Expand Down Expand Up @@ -713,7 +713,7 @@ def validate(self, ctx: RuleContext, message: message.Message):
class RepeatedRules(FieldRules):
"""Rules for a repeated field."""

_item_rules: typing.Optional[FieldRules] = None
_item_rules: FieldRules | None = None

_items_rules_suffix: typing.ClassVar[list[validate_pb2.FieldPathElement]] = [
_field_to_element(
Expand All @@ -730,7 +730,7 @@ def __init__(
funcs: dict[str, celpy.CELFunction],
field: descriptor.FieldDescriptor,
field_level: validate_pb2.FieldRules,
item_rules: typing.Optional[FieldRules],
item_rules: FieldRules | None,
):
super().__init__(env, funcs, field, field_level)
if item_rules is not None:
Expand Down Expand Up @@ -760,8 +760,8 @@ def validate(self, ctx: RuleContext, message: message.Message):
class MapRules(FieldRules):
"""Rules for a map field."""

_key_rules: typing.Optional[FieldRules] = None
_value_rules: typing.Optional[FieldRules] = None
_key_rules: FieldRules | None = None
_value_rules: FieldRules | None = None

_key_rules_suffix: typing.ClassVar[list[validate_pb2.FieldPathElement]] = [
_field_to_element(validate_pb2.MapRules.DESCRIPTOR.fields_by_number[validate_pb2.MapRules.KEYS_FIELD_NUMBER]),
Expand All @@ -783,8 +783,8 @@ def __init__(
funcs: dict[str, celpy.CELFunction],
field: descriptor.FieldDescriptor,
field_level: validate_pb2.FieldRules,
key_rules: typing.Optional[FieldRules],
value_rules: typing.Optional[FieldRules],
key_rules: FieldRules | None,
value_rules: FieldRules | None,
):
super().__init__(env, funcs, field, field_level)
if key_rules is not None:
Expand Down Expand Up @@ -850,7 +850,7 @@ class RuleFactory:

_env: celpy.Environment
_funcs: dict[str, celpy.CELFunction]
_cache: dict[descriptor.Descriptor, typing.Union[list[Rules], Exception]]
_cache: dict[descriptor.Descriptor, list[Rules] | Exception]

def __init__(self, funcs: dict[str, celpy.CELFunction]):
self._env = celpy.Environment(runner_class=InterpretedRunner)
Expand Down Expand Up @@ -1022,7 +1022,7 @@ def _new_field_rule(

def _new_rules(self, desc: descriptor.Descriptor) -> list[Rules]:
result: list[Rules] = []
rule: typing.Optional[Rules] = None
rule: Rules | None = None
all_msg_oneof_fields = set()
if desc.GetOptions().HasExtension(validate_pb2.message): # type: ignore
message_level = desc.GetOptions().Extensions[validate_pb2.message] # type: ignore
Expand Down
11 changes: 5 additions & 6 deletions protovalidate/internal/string_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import math
import re
from decimal import Decimal
from typing import Optional, Union

import celpy
from celpy import celtypes
Expand Down Expand Up @@ -88,7 +87,7 @@ def format(self, fmt: celtypes.Value, args: celtypes.Value) -> celpy.Result:

return celtypes.StringType(result)

def __validate_number(self, arg: Union[celtypes.DoubleType, celtypes.IntType, celtypes.UintType]) -> Optional[str]:
def __validate_number(self, arg: celtypes.DoubleType | celtypes.IntType | celtypes.UintType) -> str | None:
if math.isnan(arg):
return "NaN"
if math.isinf(arg):
Expand Down Expand Up @@ -122,7 +121,7 @@ def __format_exponential(self, arg: celtypes.Value, precision: int) -> str:
raise celpy.CELEvalError(msg)

def __format_int(self, arg: celtypes.Value) -> str:
if isinstance(arg, (celtypes.IntType, celtypes.UintType, celtypes.DoubleType)):
if isinstance(arg, celtypes.IntType | celtypes.UintType | celtypes.DoubleType):
result = self.__validate_number(arg)
if result is not None:
return result
Expand All @@ -134,7 +133,7 @@ def __format_int(self, arg: celtypes.Value) -> str:
raise celpy.CELEvalError(msg)

def __format_hex(self, arg: celtypes.Value) -> str:
if isinstance(arg, (celtypes.IntType, celtypes.UintType)):
if isinstance(arg, celtypes.IntType | celtypes.UintType):
return f"{arg:x}"
if isinstance(arg, celtypes.BytesType):
return arg.hex()
Expand All @@ -147,7 +146,7 @@ def __format_hex(self, arg: celtypes.Value) -> str:
raise celpy.CELEvalError(msg)

def __format_oct(self, arg: celtypes.Value) -> str:
if isinstance(arg, (celtypes.IntType, celtypes.UintType)):
if isinstance(arg, celtypes.IntType | celtypes.UintType):
return f"{arg:o}"
msg = (
"error during formatting: octal clause can only be used on integers, was given "
Expand All @@ -156,7 +155,7 @@ def __format_oct(self, arg: celtypes.Value) -> str:
raise celpy.CELEvalError(msg)

def __format_bin(self, arg: celtypes.Value) -> str:
if isinstance(arg, (celtypes.IntType, celtypes.UintType, celtypes.BoolType)):
if isinstance(arg, celtypes.IntType | celtypes.UintType | celtypes.BoolType):
return f"{arg:b}"
msg = (
"error during formatting: only integers and bools can be formatted as binary, was given "
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ license = "Apache-2.0"
license-files = ["LICENSE"]
keywords = ["validate", "protobuf", "protocol buffer"]
# Keep our minimum version synced with ./.python-version.
requires-python = ">=3.9"
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from buf.validate.conformance.harness import harness_pb2


def run_test_case(tc: typing.Any, result: typing.Optional[harness_pb2.TestResult] = None) -> harness_pb2.TestResult:
def run_test_case(tc: typing.Any, result: harness_pb2.TestResult | None = None) -> harness_pb2.TestResult:
if result is None:
result = harness_pb2.TestResult()
# Run the validator
Expand All @@ -76,7 +76,7 @@ def run_test_case(tc: typing.Any, result: typing.Optional[harness_pb2.TestResult
def run_any_test_case(
pool: descriptor_pool.DescriptorPool,
tc: any_pb2.Any,
result: typing.Optional[harness_pb2.TestResult] = None,
result: harness_pb2.TestResult | None = None,
) -> harness_pb2.TestResult:
type_name = tc.type_url.split("/")[-1]
desc: descriptor.Descriptor = pool.FindMessageTypeByName(type_name)
Expand Down
6 changes: 3 additions & 3 deletions test/test_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from collections.abc import Iterable, MutableMapping
from itertools import chain
from typing import Any, Optional
from typing import Any

import celpy
import pytest
Expand Down Expand Up @@ -62,15 +62,15 @@ def build_variables(bindings: MutableMapping[str, eval_pb2.ExprValue]) -> dict[A
return binder


def get_expected_result(test: simple_pb2.SimpleTest) -> Optional[str]:
def get_expected_result(test: simple_pb2.SimpleTest) -> str | None:
if test.HasField("value"):
val = test.value
if val.HasField("string_value"):
return val.string_value
return None


def get_eval_error_message(test: simple_pb2.SimpleTest) -> Optional[str]:
def get_eval_error_message(test: simple_pb2.SimpleTest) -> str | None:
if test.HasField("eval_error"):
err_set = test.eval_error
if len(err_set.errors) == 1:
Expand Down
2 changes: 1 addition & 1 deletion test/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def check_compilation_errors(validator: protovalidate.Validator, msg: message.Me
def _compare_violations(actual: list[rules.Violation], expected: list[rules.Violation]):
"""Compares two lists of violations. The violations are expected to be in the expected order also."""
assert len(actual) == len(expected)
for a, e in zip(actual, expected):
for a, e in zip(actual, expected, strict=True):
assert a.proto.message == e.proto.message
assert a.proto.rule_id == e.proto.rule_id
assert a.proto.for_key == e.proto.for_key
Expand Down
Loading