diff --git a/docs/_quarto.yml b/docs/_quarto.yml index b8365fc1f..ee512938f 100644 --- a/docs/_quarto.yml +++ b/docs/_quarto.yml @@ -160,6 +160,7 @@ quartodoc: - name: Validate.col_vals_null - name: Validate.col_vals_not_null - name: Validate.col_vals_regex + - name: Validate.col_vals_within_spec - name: Validate.col_vals_expr - name: Validate.rows_distinct - name: Validate.rows_complete diff --git a/pointblank/_constants.py b/pointblank/_constants.py index a0fba5b70..e45d5c191 100644 --- a/pointblank/_constants.py +++ b/pointblank/_constants.py @@ -18,6 +18,7 @@ "in_set": ["numeric", "str"], "not_in_set": ["numeric", "str"], "regex": ["str"], + "within_spec": ["str"], "null": ["str", "numeric", "bool", "datetime", "duration"], "not_null": ["str", "numeric", "bool", "datetime", "duration"], } @@ -34,6 +35,7 @@ "col_vals_in_set": "in_set", "col_vals_not_in_set": "not_in_set", "col_vals_regex": "regex", + "col_vals_within_spec": "within_spec", "col_vals_null": "null", "col_vals_not_null": "not_null", "col_vals_expr": "expr", @@ -78,6 +80,7 @@ "col_vals_in_set", "col_vals_not_in_set", "col_vals_regex", + "col_vals_within_spec", "col_vals_null", "col_vals_not_null", "col_vals_expr", @@ -342,6 +345,18 @@ +""", + "col_vals_within_spec": """ + + col_vals_within_spec + + + + + + + + """, "col_exists": """ diff --git a/pointblank/_interrogation.py b/pointblank/_interrogation.py index 09d9d8333..ee35b5882 100644 --- a/pointblank/_interrogation.py +++ b/pointblank/_interrogation.py @@ -9,6 +9,13 @@ from narwhals.typing import FrameT from pointblank._constants import IBIS_BACKENDS +from pointblank._spec_utils import ( + check_credit_card, + check_iban, + check_isbn, + check_postal_code, + check_vin, +) from pointblank._utils import ( _column_test_prep, _convert_to_narwhals, @@ -1721,6 +1728,459 @@ def interrogate_regex(tbl: FrameT, column: str, values: dict | str, na_pass: boo return result_tbl.to_native() +def interrogate_within_spec(tbl: FrameT, column: str, values: dict, na_pass: bool) -> FrameT: + """Within specification interrogation.""" + from pointblank._spec_utils import ( + regex_email, + regex_ipv4_address, + regex_ipv6_address, + regex_mac, + regex_phone, + regex_swift_bic, + regex_url, + ) + + spec = values["spec"] + spec_lower = spec.lower() + + # Parse spec for country-specific formats + country = None + if "[" in spec and "]" in spec: + # Extract country code from spec like "postal_code[US]" or "iban[DE]" + base_spec = spec[: spec.index("[")] + country = spec[spec.index("[") + 1 : spec.index("]")] + spec_lower = base_spec.lower() + + # Convert to Narwhals for cross-backend compatibility + nw_tbl = nw.from_native(tbl) + + # Regex-based specifications can use Narwhals directly (no materialization needed) + regex_specs = { + "email": regex_email(), + "url": regex_url(), + "phone": regex_phone(), + "ipv4": regex_ipv4_address(), + "ipv4_address": regex_ipv4_address(), + "ipv6": regex_ipv6_address(), + "ipv6_address": regex_ipv6_address(), + "mac": regex_mac(), + "mac_address": regex_mac(), + "swift": regex_swift_bic(), + "swift_bic": regex_swift_bic(), + "bic": regex_swift_bic(), + } + + if spec_lower in regex_specs: + # Use regex validation through Narwhals (works for all backends including Ibis!) + pattern = regex_specs[spec_lower] + + # For SWIFT/BIC, need to uppercase first + if spec_lower in ("swift", "swift_bic", "bic"): + col_expr = nw.col(column).str.to_uppercase() + else: + col_expr = nw.col(column) + + result_tbl = nw_tbl.with_columns( + pb_is_good_1=nw.col(column).is_null() & na_pass, + pb_is_good_2=col_expr.str.contains(f"^{pattern}$", literal=False).fill_null(False), + ) + + result_tbl = result_tbl.with_columns( + pb_is_good_=nw.col("pb_is_good_1") | nw.col("pb_is_good_2") + ).drop("pb_is_good_1", "pb_is_good_2") + + return result_tbl.to_native() + + # For specifications requiring checksums or complex logic: + # Auto-detect Ibis tables and use database-native validation when available + native_tbl = nw_tbl.to_native() + is_ibis = hasattr(native_tbl, "execute") + + # Use database-native validation for VIN and credit_card when using Ibis + if is_ibis and spec_lower == "vin": + # Route to database-native VIN validation + return interrogate_within_spec_db(tbl, column, values, na_pass) + elif is_ibis and spec_lower in ("credit_card", "creditcard"): + # Route to database-native credit card validation + return interrogate_credit_card_db(tbl, column, values, na_pass) + + # For non-Ibis tables or other specs, materialize data and use Python validation + # Get the column data as a list + col_data = nw_tbl.select(column).to_native() + + # Convert to list based on backend + if hasattr(col_data, "to_list"): # Polars + col_list = col_data[column].to_list() + elif hasattr(col_data, "tolist"): # Pandas + col_list = col_data[column].tolist() + else: # For Ibis tables, we need to execute the query first + try: + # Try to execute if it's an Ibis table + if hasattr(col_data, "execute"): + col_data_exec = col_data.execute() + if hasattr(col_data_exec, "to_list"): # Polars result + col_list = col_data_exec[column].to_list() + elif hasattr(col_data_exec, "tolist"): # Pandas result + col_list = col_data_exec[column].tolist() + else: + col_list = list(col_data_exec[column]) + else: + col_list = list(col_data[column]) + except Exception: + # Fallback to direct list conversion + col_list = list(col_data[column]) + + # Validate based on spec type (checksum-based validations) + if spec_lower in ("isbn", "isbn-10", "isbn-13"): + is_valid_list = check_isbn(col_list) + elif spec_lower == "vin": + is_valid_list = check_vin(col_list) + elif spec_lower in ("credit_card", "creditcard"): + is_valid_list = check_credit_card(col_list) + elif spec_lower == "iban": + is_valid_list = check_iban(col_list, country=country) + elif spec_lower in ("postal_code", "postalcode", "postcode", "zip"): + if country is None: + raise ValueError("Country code required for postal code validation") + is_valid_list = check_postal_code(col_list, country=country) + else: + raise ValueError(f"Unknown specification type: {spec}") + + # Create result table with validation results + # For Ibis tables, execute to get a materialized dataframe first + native_tbl = nw_tbl.to_native() + if hasattr(native_tbl, "execute"): + native_tbl = native_tbl.execute() + + # Add validation column - convert native table to Series, then back through Narwhals + if is_polars_dataframe(native_tbl): + import polars as pl + + native_tbl = native_tbl.with_columns(pb_is_good_2=pl.Series(is_valid_list)) + elif is_pandas_dataframe(native_tbl): + import pandas as pd + + native_tbl["pb_is_good_2"] = pd.Series(is_valid_list, index=native_tbl.index) + else: + raise NotImplementedError(f"Backend type not supported: {type(native_tbl)}") + + result_tbl = nw.from_native(native_tbl) # Handle NA values and combine validation results + result_tbl = result_tbl.with_columns( + pb_is_good_1=nw.col(column).is_null() & na_pass, + ) + + result_tbl = result_tbl.with_columns( + pb_is_good_=nw.col("pb_is_good_1") | nw.col("pb_is_good_2") + ).drop("pb_is_good_1", "pb_is_good_2") + + return result_tbl.to_native() + + +def interrogate_within_spec_db(tbl: FrameT, column: str, values: dict, na_pass: bool) -> FrameT: + """ + Database-native specification validation (proof of concept). + + This function uses Ibis expressions to perform validation entirely in SQL, + avoiding data materialization for remote database tables. Currently only + supports VIN validation as a proof of concept. + + Parameters + ---------- + tbl + The table to interrogate (must be an Ibis table). + column + The column to validate. + values + Dictionary containing 'spec' key with specification type. + na_pass + Whether to pass null values. + + Returns + ------- + FrameT + Result table with pb_is_good_ column indicating validation results. + + Notes + ----- + This is a proof-of-concept implementation demonstrating database-native + validation. It translates complex Python validation logic (regex, checksums) + into SQL expressions that can be executed directly in the database. + """ + spec = values["spec"] + spec_lower = spec.lower() + + # Check if this is an Ibis table + native_tbl = tbl + if hasattr(tbl, "to_native"): + native_tbl = tbl.to_native() if callable(tbl.to_native) else tbl + + is_ibis = hasattr(native_tbl, "execute") + + if not is_ibis: + # Fall back to regular implementation for non-Ibis tables + return interrogate_within_spec(tbl, column, values, na_pass) + + # Route to appropriate database-native validation + if spec_lower == "credit_card": + return interrogate_credit_card_db(tbl, column, values, na_pass) + elif spec_lower != "vin": + raise NotImplementedError( + f"Database-native validation for '{spec}' not yet implemented. " + "Currently 'vin' and 'credit_card' are supported in interrogate_within_spec_db(). " + "Use interrogate_within_spec() for other specifications." + ) + + # VIN validation using Ibis expressions (database-native) + # Implementation based on ISO 3779 standard with check digit algorithm + try: + import ibis + except ImportError: + raise ImportError("Ibis is required for database-native validation") + + # VIN transliteration map (character to numeric value for checksum) + # Based on ISO 3779 standard for VIN check digit calculation + transliteration = { + "A": 1, + "B": 2, + "C": 3, + "D": 4, + "E": 5, + "F": 6, + "G": 7, + "H": 8, + "J": 1, + "K": 2, + "L": 3, + "M": 4, + "N": 5, + "P": 7, + "R": 9, + "S": 2, + "T": 3, + "U": 4, + "V": 5, + "W": 6, + "X": 7, + "Y": 8, + "Z": 9, + "0": 0, + "1": 1, + "2": 2, + "3": 3, + "4": 4, + "5": 5, + "6": 6, + "7": 7, + "8": 8, + "9": 9, + } + + # Position weights for checksum calculation + weights = [8, 7, 6, 5, 4, 3, 2, 10, 0, 9, 8, 7, 6, 5, 4, 3, 2] + + # Get the column as an Ibis expression + col_expr = native_tbl[column] + + # Basic checks: length must be 17, no invalid characters (I, O, Q) + valid_length = col_expr.length() == 17 + no_invalid_chars = ( + ~col_expr.upper().contains("I") + & ~col_expr.upper().contains("O") + & ~col_expr.upper().contains("Q") + ) + + # Calculate checksum using Ibis expressions + # For each position, extract character, transliterate to number, multiply by weight, sum + checksum = ibis.literal(0) + + for pos in range(17): + if pos == 8: # Position 9 (0-indexed 8) is the check digit itself + continue + + # Extract character at position (1-indexed for substr) + char = col_expr.upper().substr(pos, 1) + + # Build a case expression for transliteration using ibis.cases() + # Add final else condition for invalid characters + conditions = [(char == ch, num) for ch, num in transliteration.items()] + value = ibis.cases(*conditions, else_=0) # Default: invalid char = 0 (will fail validation) + + # Multiply by weight and add to checksum + checksum = checksum + (value * weights[pos]) + + # Check digit calculation: checksum % 11 + # If result is 10, check digit should be 'X', otherwise it's the digit itself + expected_check = checksum % 11 + actual_check_char = col_expr.upper().substr(8, 1) # Position 9 (0-indexed 8) + + # Validate check digit using ibis.cases() + check_digit_valid = ibis.cases( + (expected_check == 10, actual_check_char == "X"), + (expected_check < 10, actual_check_char == expected_check.cast(str)), + else_=False, + ) + + # Combine all validation checks + is_valid = valid_length & no_invalid_chars & check_digit_valid + + # Handle NULL values + if na_pass: + # NULL values should pass when na_pass=True + is_valid = col_expr.isnull() | is_valid + else: + # NULL values should explicitly fail when na_pass=False + # Use fill_null to convert NULL results to False + is_valid = is_valid.fill_null(False) + + # Add validation column to table + result_tbl = native_tbl.mutate(pb_is_good_=is_valid) + + return result_tbl + + +def interrogate_credit_card_db( + tbl: FrameT, column: str, values: dict[str, str], na_pass: bool +) -> FrameT: + """ + Database-native credit card validation using Luhn algorithm in SQL. + + This function implements the Luhn checksum algorithm entirely in SQL using + Ibis expressions, avoiding data materialization for remote database tables. + This is a unique implementation that validates credit card numbers directly + in the database. + + Parameters + ---------- + tbl + The table to interrogate (must be an Ibis table). + column + The column to validate. + values + Dictionary containing 'spec' key (should be 'credit_card'). + na_pass + Whether to pass null values. + + Returns + ------- + FrameT + Result table with pb_is_good_ column indicating validation results. + + Notes + ----- + The Luhn algorithm works as follows: + 1. Remove spaces and hyphens from the card number + 2. Starting from the rightmost digit, double every second digit + 3. If doubled digit > 9, subtract 9 + 4. Sum all digits + 5. Valid if sum % 10 == 0 + + This implementation translates the entire algorithm into SQL expressions. + """ + # Check if this is an Ibis table + native_tbl = tbl + if hasattr(tbl, "to_native"): + native_tbl = tbl.to_native() if callable(tbl.to_native) else tbl + + is_ibis = hasattr(native_tbl, "execute") + + if not is_ibis: + # Fall back to regular implementation for non-Ibis tables + return interrogate_within_spec(tbl, column, values, na_pass) + + try: + import ibis + except ImportError: + raise ImportError("Ibis is required for database-native validation") + + # Get the column as an Ibis expression + col_expr = native_tbl[column] + + # Step 1: Clean the input - remove spaces and hyphens + # First check format: only digits, spaces, and hyphens allowed + valid_chars = col_expr.re_search(r"^[0-9\s\-]+$").notnull() + + # Clean: remove spaces and hyphens + clean_card = col_expr.replace(" ", "").replace("-", "") + + # Step 2: Check length (13-19 digits after cleaning) + card_length = clean_card.length() + valid_length = (card_length >= 13) & (card_length <= 19) + + # Step 3: Luhn algorithm implementation in SQL + # We'll process each digit position and calculate the checksum + # Starting from the right, double every second digit + + # Initialize checksum + checksum = ibis.literal(0) + + # Process up to 19 digits (maximum credit card length) + for pos in range(19): + # Calculate position from right (0 = rightmost) + pos_from_right = pos + + # Extract digit at this position from the right + # substr with negative index or using length - pos + digit_pos = card_length - pos_from_right + digit_char = clean_card.substr(digit_pos - 1, 1) + + # Convert character to integer (using case statement) + digit_val = ibis.cases( + (digit_char == "0", 0), + (digit_char == "1", 1), + (digit_char == "2", 2), + (digit_char == "3", 3), + (digit_char == "4", 4), + (digit_char == "5", 5), + (digit_char == "6", 6), + (digit_char == "7", 7), + (digit_char == "8", 8), + (digit_char == "9", 9), + else_=-1, # Invalid character + ) + + # Check if this position should be processed (within card length) + in_range = digit_pos > 0 + + # Double every second digit (odd positions from right, 0-indexed) + should_double = (pos_from_right % 2) == 1 + + # Calculate contribution to checksum + # If should_double: double the digit, then if > 9 subtract 9 + doubled = digit_val * 2 + adjusted = ibis.cases( + (should_double & (doubled > 9), doubled - 9), + (should_double, doubled), + else_=digit_val, + ) + + # Add to checksum only if in range + contribution = ibis.cases( + (in_range, adjusted), + else_=0, + ) + + checksum = checksum + contribution + + # Step 4: Valid if checksum % 10 == 0 + luhn_valid = (checksum % 10) == 0 + + # Combine all validation checks + is_valid = valid_chars & valid_length & luhn_valid + + # Handle NULL values + if na_pass: + # NULL values should pass when na_pass=True + is_valid = col_expr.isnull() | is_valid + else: + # NULL values should explicitly fail when na_pass=False + is_valid = is_valid.fill_null(False) + + # Add validation column to table + result_tbl = native_tbl.mutate(pb_is_good_=is_valid) + + return result_tbl + + def interrogate_null(tbl: FrameT, column: str) -> FrameT: """Null interrogation.""" diff --git a/pointblank/_spec_utils.py b/pointblank/_spec_utils.py new file mode 100644 index 000000000..6d60fa885 --- /dev/null +++ b/pointblank/_spec_utils.py @@ -0,0 +1,1015 @@ +from __future__ import annotations + +import re + + +def regex_email() -> str: + """Regex pattern for email validation.""" + # Requires at least one dot in the domain part + return r"^[a-zA-Z0-9.!#$%&'*+\/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$" + + +def regex_url() -> str: + """Regex pattern for URL validation.""" + # Simplified but comprehensive URL regex + return r"^(https?|ftp):\/\/[^\s/$.?#].[^\s]*$" + + +def regex_phone() -> str: + """Regex pattern for phone number validation.""" + # Matches various phone number formats - requires at least 7 digits total + return r"^[\+]?[(]?[0-9]{1,4}[)]?[-\s\.]?[(]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,9}([-\s\.]?[0-9]{1,9})+$" + + +def regex_ipv4_address() -> str: + """Regex pattern for IPv4 address validation.""" + return r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$" + + +def regex_ipv6_address() -> str: + """Regex pattern for IPv6 address validation.""" + return r"^(([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:))$" + + +def regex_mac() -> str: + """Regex pattern for MAC address validation.""" + return r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$" + + +def regex_swift_bic() -> str: + """Regex pattern for SWIFT/BIC code validation.""" + # Bank code (4 letters) + Country code (2 letters) + Location code (2 letters/digits) + optional Branch code (3 letters/digits) + return r"^[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$" + + +def regex_vin() -> str: + """Regex pattern for VIN validation (basic format check).""" + return r"^[A-HJ-NPR-Z0-9]{17}$" + + +def regex_credit_card_1() -> str: + """Get first regex pattern for credit card validation.""" + return r"^[0-9\s\-]+$" + + +def regex_credit_card_2() -> str: + """Get second regex pattern for credit card validation.""" + return r"^[0-9]{13,19}$" + + +def regex_iban(country: str | None = None) -> str: + """ + Regex pattern for IBAN validation. + + Parameters + ---------- + country + Optional two or three-letter country code. If provided, returns country-specific pattern. + """ + if country is None: + # Generic IBAN pattern: 2 letters, 2 digits, up to 30 alphanumeric + return r"^[A-Z]{2}[0-9]{2}[A-Z0-9]{1,30}$" + + # Country-specific patterns + iban_patterns = { + "AL": r"^AL[0-9]{10}[A-Z0-9]{16}$", + "ALB": r"^AL[0-9]{10}[A-Z0-9]{16}$", + "AD": r"^AD[0-9]{10}[A-Z0-9]{12}$", + "AND": r"^AD[0-9]{10}[A-Z0-9]{12}$", + "AT": r"^AT[0-9]{18}$", + "AUT": r"^AT[0-9]{18}$", + "BE": r"^BE[0-9]{14}$", + "BEL": r"^BE[0-9]{14}$", + "BA": r"^BA[0-9]{18}$", + "BIH": r"^BA[0-9]{18}$", + "BG": r"^BG[0-9]{2}[A-Z]{4}[0-9]{6}[A-Z0-9]{8}$", + "BGR": r"^BG[0-9]{2}[A-Z]{4}[0-9]{6}[A-Z0-9]{8}$", + "BR": r"^BR[0-9]{25}[A-Z]{1}[A-Z0-9]{1}$", + "BRA": r"^BR[0-9]{25}[A-Z]{1}[A-Z0-9]{1}$", + "HR": r"^HR[0-9]{19}$", + "HRV": r"^HR[0-9]{19}$", + "CY": r"^CY[0-9]{10}[A-Z0-9]{16}$", + "CYP": r"^CY[0-9]{10}[A-Z0-9]{16}$", + "CZ": r"^CZ[0-9]{22}$", + "CZE": r"^CZ[0-9]{22}$", + "DK": r"^DK[0-9]{16}$", + "DNK": r"^DK[0-9]{16}$", + "EE": r"^EE[0-9]{18}$", + "EST": r"^EE[0-9]{18}$", + "FO": r"^FO[0-9]{16}$", + "FRO": r"^FO[0-9]{16}$", + "FI": r"^FI[0-9]{16}$", + "FIN": r"^FI[0-9]{16}$", + "FR": r"^FR[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "FRA": r"^FR[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "PF": r"^PF[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "PYF": r"^PF[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "TF": r"^TF[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "ATF": r"^TF[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "DE": r"^DE[0-9]{20}$", + "DEU": r"^DE[0-9]{20}$", + "GI": r"^GI[0-9]{2}[A-Z]{4}[A-Z0-9]{15}$", + "GIB": r"^GI[0-9]{2}[A-Z]{4}[A-Z0-9]{15}$", + "GE": r"^GE[0-9]{2}[A-Z]{2}[0-9]{16}$", + "GEO": r"^GE[0-9]{2}[A-Z]{2}[0-9]{16}$", + "GR": r"^GR[0-9]{9}[A-Z0-9]{16}$", + "GRC": r"^GR[0-9]{9}[A-Z0-9]{16}$", + "GL": r"^GL[0-9]{16}$", + "GRL": r"^GL[0-9]{16}$", + "HU": r"^HU[0-9]{26}$", + "HUN": r"^HU[0-9]{26}$", + "IS": r"^IS[0-9]{24}$", + "ISL": r"^IS[0-9]{24}$", + "IE": r"^IE[0-9]{2}[A-Z]{4}[0-9]{14}$", + "IRL": r"^IE[0-9]{2}[A-Z]{4}[0-9]{14}$", + "IL": r"^IL[0-9]{21}$", + "ISR": r"^IL[0-9]{21}$", + "IT": r"^IT[0-9]{2}[A-Z]{1}[0-9]{10}[A-Z0-9]{12}$", + "ITA": r"^IT[0-9]{2}[A-Z]{1}[0-9]{10}[A-Z0-9]{12}$", + "LV": r"^LV[0-9]{2}[A-Z]{4}[A-Z0-9]{13}$", + "LVA": r"^LV[0-9]{2}[A-Z]{4}[A-Z0-9]{13}$", + "LB": r"^LB[0-9]{6}[A-Z0-9]{20}$", + "LBN": r"^LB[0-9]{6}[A-Z0-9]{20}$", + "LI": r"^LI[0-9]{7}[A-Z0-9]{12}$", + "LIE": r"^LI[0-9]{7}[A-Z0-9]{12}$", + "LT": r"^LT[0-9]{18}$", + "LTU": r"^LT[0-9]{18}$", + "LU": r"^LU[0-9]{5}[A-Z0-9]{13}$", + "LUX": r"^LU[0-9]{5}[A-Z0-9]{13}$", + "MK": r"^MK[0-9]{5}[A-Z0-9]{10}[0-9]{2}$", + "MKD": r"^MK[0-9]{5}[A-Z0-9]{10}[0-9]{2}$", + "MT": r"^MT[0-9]{2}[A-Z]{4}[0-9]{5}[A-Z0-9]{18}$", + "MLT": r"^MT[0-9]{2}[A-Z]{4}[0-9]{5}[A-Z0-9]{18}$", + "MU": r"^MU[0-9]{2}[A-Z]{4}[0-9]{19}[A-Z]{3}$", + "MUS": r"^MU[0-9]{2}[A-Z]{4}[0-9]{19}[A-Z]{3}$", + "YT": r"^YT[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "MYT": r"^YT[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "MC": r"^MC[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "MCO": r"^MC[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "ME": r"^ME[0-9]{20}$", + "MNE": r"^ME[0-9]{20}$", + "NL": r"^NL[0-9]{2}[A-Z]{4}[0-9]{10}$", + "NLD": r"^NL[0-9]{2}[A-Z]{4}[0-9]{10}$", + "NC": r"^NC[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "NCL": r"^NC[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "NO": r"^NO[0-9]{13}$", + "NOR": r"^NO[0-9]{13}$", + "PL": r"^PL[0-9]{26}$", + "POL": r"^PL[0-9]{26}$", + "PT": r"^PT[0-9]{23}$", + "PRT": r"^PT[0-9]{23}$", + "RO": r"^RO[0-9]{2}[A-Z]{4}[A-Z0-9]{16}$", + "ROU": r"^RO[0-9]{2}[A-Z]{4}[A-Z0-9]{16}$", + "PM": r"^PM[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "SPM": r"^PM[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "SM": r"^SM[0-9]{2}[A-Z]{1}[0-9]{10}[A-Z0-9]{12}$", + "SMR": r"^SM[0-9]{2}[A-Z]{1}[0-9]{10}[A-Z0-9]{12}$", + "SA": r"^SA[0-9]{4}[A-Z0-9]{18}$", + "SAU": r"^SA[0-9]{4}[A-Z0-9]{18}$", + "RS": r"^RS[0-9]{20}$", + "SRB": r"^RS[0-9]{20}$", + "SK": r"^SK[0-9]{22}$", + "SVK": r"^SK[0-9]{22}$", + "SI": r"^SI[0-9]{17}$", + "SVN": r"^SI[0-9]{17}$", + "ES": r"^ES[0-9]{22}$", + "ESP": r"^ES[0-9]{22}$", + "SE": r"^SE[0-9]{22}$", + "SWE": r"^SE[0-9]{22}$", + "CH": r"^CH[0-9]{7}[A-Z0-9]{12}$", + "CHE": r"^CH[0-9]{7}[A-Z0-9]{12}$", + "TN": r"^TN[0-9]{22}$", + "TUN": r"^TN[0-9]{22}$", + "TR": r"^TR[0-9]{8}[A-Z0-9]{16}$", + "TUR": r"^TR[0-9]{8}[A-Z0-9]{16}$", + "GB": r"^GB[0-9]{2}[A-Z]{4}[0-9]{14}$", + "GBR": r"^GB[0-9]{2}[A-Z]{4}[0-9]{14}$", + "WF": r"^WF[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + "WLF": r"^WF[0-9]{12}[A-Z0-9]{11}[0-9]{2}$", + } + + return iban_patterns.get(country.upper(), r"^[A-Z]{2}[0-9]{2}[A-Z0-9]{1,30}$") + + +def regex_postal_code(country: str) -> str: + """ + Get regex pattern for postal code validation for a specific country. + + Parameters + ---------- + country + Two or three-letter country code. + """ + postal_patterns = { + "AD": r"^AD[0-9]{3}$", + "AND": r"^AD[0-9]{3}$", + "AF": r"^[0-9]{4}$", + "AFG": r"^[0-9]{4}$", + "AI": r"^AI-2640$", + "AIA": r"^AI-2640$", + "AL": r"^[0-9]{4}$", + "ALB": r"^[0-9]{4}$", + "AM": r"^[0-9]{4}$", + "ARM": r"^[0-9]{4}$", + "AR": r"^([A-Z][0-9]{4}[A-Z]{3}|[A-Z][0-9]{4})$", + "ARG": r"^([A-Z][0-9]{4}[A-Z]{3}|[A-Z][0-9]{4})$", + "AS": r"^96799$", + "ASM": r"^96799$", + "AT": r"^[0-9]{4}$", + "AUT": r"^[0-9]{4}$", + "AU": r"^[0-9]{4}$", + "AUS": r"^[0-9]{4}$", + "AZ": r"^AZ[0-9]{4}$", + "AZE": r"^AZ[0-9]{4}$", + "BA": r"^[0-9]{5}$", + "BIH": r"^[0-9]{5}$", + "BB": r"^BB[0-9]{5}$", + "BRB": r"^BB[0-9]{5}$", + "BD": r"^[0-9]{4}$", + "BGD": r"^[0-9]{4}$", + "BE": r"^[0-9]{4}$", + "BEL": r"^[0-9]{4}$", + "BG": r"^[0-9]{4}$", + "BGR": r"^[0-9]{4}$", + "BH": r"^[0-9]{3,4}$", + "BHR": r"^[0-9]{3,4}$", + "BL": r"^97133$", + "BLM": r"^97133$", + "BM": r"^[A-Z]{2}\s?[0-9]{2}$", + "BMU": r"^[A-Z]{2}\s?[0-9]{2}$", + "BN": r"^[A-Z]{2}\s?[0-9]{4}$", + "BRN": r"^[A-Z]{2}\s?[0-9]{4}$", + "BR": r"^[0-9]{5}-?[0-9]{3}$", + "BRA": r"^[0-9]{5}-?[0-9]{3}$", + "BT": r"^[0-9]{5}$", + "BTN": r"^[0-9]{5}$", + "BY": r"^[0-9]{6}$", + "BLR": r"^[0-9]{6}$", + "CA": r"^[A-Z][0-9][A-Z]\s?[0-9][A-Z][0-9]$", + "CAN": r"^[A-Z][0-9][A-Z]\s?[0-9][A-Z][0-9]$", + "CC": r"^6799$", + "CCK": r"^6799$", + "CH": r"^[0-9]{4}$", + "CHE": r"^[0-9]{4}$", + "CL": r"^[0-9]{7}$", + "CHL": r"^[0-9]{7}$", + "CN": r"^[0-9]{6}$", + "CHN": r"^[0-9]{6}$", + "CO": r"^[0-9]{6}$", + "COL": r"^[0-9]{6}$", + "CR": r"^[0-9]{5}$", + "CRI": r"^[0-9]{5}$", + "CU": r"^[0-9]{5}$", + "CUB": r"^[0-9]{5}$", + "CV": r"^[0-9]{4}$", + "CPV": r"^[0-9]{4}$", + "CX": r"^6798$", + "CXR": r"^6798$", + "CY": r"^[0-9]{4}$", + "CYP": r"^[0-9]{4}$", + "CZ": r"^[0-9]{3}\s?[0-9]{2}$", + "CZE": r"^[0-9]{3}\s?[0-9]{2}$", + "DE": r"^[0-9]{5}$", + "DEU": r"^[0-9]{5}$", + "DK": r"^[0-9]{4}$", + "DNK": r"^[0-9]{4}$", + "DO": r"^[0-9]{5}$", + "DOM": r"^[0-9]{5}$", + "DZ": r"^[0-9]{5}$", + "DZA": r"^[0-9]{5}$", + "EC": r"^[0-9]{6}$", + "ECU": r"^[0-9]{6}$", + "EE": r"^[0-9]{5}$", + "EST": r"^[0-9]{5}$", + "EG": r"^[0-9]{5}$", + "EGY": r"^[0-9]{5}$", + "ES": r"^[0-9]{5}$", + "ESP": r"^[0-9]{5}$", + "ET": r"^[0-9]{4}$", + "ETH": r"^[0-9]{4}$", + "FI": r"^[0-9]{5}$", + "FIN": r"^[0-9]{5}$", + "FK": r"^FIQQ 1ZZ$", + "FLK": r"^FIQQ 1ZZ$", + "FM": r"^(96941|96942|96943|96944)$", + "FSM": r"^(96941|96942|96943|96944)$", + "FO": r"^[0-9]{3}$", + "FRO": r"^[0-9]{3}$", + "FR": r"^[0-9]{5}$", + "FRA": r"^[0-9]{5}$", + "GB": r"^([A-Z]{1,2}[0-9]{1,2}[A-Z]?)\s?([0-9][A-Z]{2})$", + "GBR": r"^([A-Z]{1,2}[0-9]{1,2}[A-Z]?)\s?([0-9][A-Z]{2})$", + "GF": r"^973[0-9]{2}$", + "GUF": r"^973[0-9]{2}$", + "GI": r"^GX11 1AA$", + "GIB": r"^GX11 1AA$", + "GL": r"^39[0-9]{2}$", + "GRL": r"^39[0-9]{2}$", + "GP": r"^971[0-9]{2}$", + "GLP": r"^971[0-9]{2}$", + "GR": r"^[0-9]{3}\s?[0-9]{2}$", + "GRC": r"^[0-9]{3}\s?[0-9]{2}$", + "GT": r"^[0-9]{5}$", + "GTM": r"^[0-9]{5}$", + "GU": r"^969[0-9]{2}$", + "GUM": r"^969[0-9]{2}$", + "HR": r"^[0-9]{5}$", + "HRV": r"^[0-9]{5}$", + "HT": r"^[0-9]{4}$", + "HTI": r"^[0-9]{4}$", + "HU": r"^[0-9]{4}$", + "HUN": r"^[0-9]{4}$", + "ID": r"^[0-9]{5}$", + "IDN": r"^[0-9]{5}$", + "IE": r"^[A-Z][0-9]{2}\s?[A-Z0-9]{4}$", + "IRL": r"^[A-Z][0-9]{2}\s?[A-Z0-9]{4}$", + "IN": r"^[0-9]{6}$", + "IND": r"^[0-9]{6}$", + "IO": r"^BBND 1ZZ$", + "IOT": r"^BBND 1ZZ$", + "IQ": r"^[0-9]{5}$", + "IRQ": r"^[0-9]{5}$", + "IR": r"^[0-9]{10}$", + "IRN": r"^[0-9]{10}$", + "IS": r"^[0-9]{3}$", + "ISL": r"^[0-9]{3}$", + "IT": r"^[0-9]{5}$", + "ITA": r"^[0-9]{5}$", + "JP": r"^[0-9]{3}-?[0-9]{4}$", + "JPN": r"^[0-9]{3}-?[0-9]{4}$", + "KR": r"^[0-9]{5}$", + "KOR": r"^[0-9]{5}$", + "KY": r"^KY[0-9]-[0-9]{4}$", + "CYM": r"^KY[0-9]-[0-9]{4}$", + "LI": r"^948[5-9]|949[0-7]$", + "LIE": r"^948[5-9]|949[0-7]$", + "LK": r"^[0-9]{5}$", + "LKA": r"^[0-9]{5}$", + "LT": r"^LT-?[0-9]{5}$", + "LTU": r"^LT-?[0-9]{5}$", + "LU": r"^[0-9]{4}$", + "LUX": r"^[0-9]{4}$", + "LV": r"^LV-?[0-9]{4}$", + "LVA": r"^LV-?[0-9]{4}$", + "MC": r"^980[0-9]{2}$", + "MCO": r"^980[0-9]{2}$", + "MD": r"^MD-?[0-9]{4}$", + "MDA": r"^MD-?[0-9]{4}$", + "MH": r"^(96960|96970)$", + "MHL": r"^(96960|96970)$", + "MK": r"^[0-9]{4}$", + "MKD": r"^[0-9]{4}$", + "MP": r"^9695[0-2]$", + "MNP": r"^9695[0-2]$", + "MQ": r"^972[0-9]{2}$", + "MTQ": r"^972[0-9]{2}$", + "MX": r"^[0-9]{5}$", + "MEX": r"^[0-9]{5}$", + "MY": r"^[0-9]{5}$", + "MYS": r"^[0-9]{5}$", + "NC": r"^988[0-9]{2}$", + "NCL": r"^988[0-9]{2}$", + "NE": r"^[0-9]{4}$", + "NER": r"^[0-9]{4}$", + "NF": r"^2899$", + "NFK": r"^2899$", + "NG": r"^[0-9]{6}$", + "NGA": r"^[0-9]{6}$", + "NI": r"^[0-9]{5}$", + "NIC": r"^[0-9]{5}$", + "NL": r"^[0-9]{4}\s?[A-Z]{2}$", + "NLD": r"^[0-9]{4}\s?[A-Z]{2}$", + "NO": r"^[0-9]{4}$", + "NOR": r"^[0-9]{4}$", + "NP": r"^[0-9]{5}$", + "NPL": r"^[0-9]{5}$", + "NZ": r"^[0-9]{4}$", + "NZL": r"^[0-9]{4}$", + "OM": r"^[0-9]{3}$", + "OMN": r"^[0-9]{3}$", + "PE": r"^([A-Z]{4,5}\s?[0-9]{2}|[0-9]{5})$", + "PER": r"^([A-Z]{4,5}\s?[0-9]{2}|[0-9]{5})$", + "PF": r"^987[0-9]{2}$", + "PYF": r"^987[0-9]{2}$", + "PG": r"^[0-9]{3}$", + "PNG": r"^[0-9]{3}$", + "PH": r"^[0-9]{4}$", + "PHL": r"^[0-9]{4}$", + "PK": r"^[0-9]{5}$", + "PAK": r"^[0-9]{5}$", + "PL": r"^[0-9]{2}-?[0-9]{3}$", + "POL": r"^[0-9]{2}-?[0-9]{3}$", + "PM": r"^97500$", + "SPM": r"^97500$", + "PN": r"^PCRN 1ZZ$", + "PCN": r"^PCRN 1ZZ$", + "PR": r"^00[679][0-9]{2}$", + "PRI": r"^00[679][0-9]{2}$", + "PT": r"^[0-9]{4}-?[0-9]{3}$", + "PRT": r"^[0-9]{4}-?[0-9]{3}$", + "PW": r"^96940$", + "PLW": r"^96940$", + "PY": r"^[0-9]{4}$", + "PRY": r"^[0-9]{4}$", + "RE": r"^974[0-9]{2}$", + "REU": r"^974[0-9]{2}$", + "RO": r"^[0-9]{6}$", + "ROU": r"^[0-9]{6}$", + "RS": r"^[0-9]{5}$", + "SRB": r"^[0-9]{5}$", + "RU": r"^[0-9]{6}$", + "RUS": r"^[0-9]{6}$", + "SA": r"^[0-9]{5}$", + "SAU": r"^[0-9]{5}$", + "SD": r"^[0-9]{5}$", + "SDN": r"^[0-9]{5}$", + "SE": r"^[0-9]{3}\s?[0-9]{2}$", + "SWE": r"^[0-9]{3}\s?[0-9]{2}$", + "SG": r"^[0-9]{6}$", + "SGP": r"^[0-9]{6}$", + "SH": r"^(STHL 1ZZ|ASCN 1ZZ)$", + "SHN": r"^(STHL 1ZZ|ASCN 1ZZ)$", + "SI": r"^[0-9]{4}$", + "SVN": r"^[0-9]{4}$", + "SJ": r"^[0-9]{4}$", + "SJM": r"^[0-9]{4}$", + "SK": r"^[0-9]{3}\s?[0-9]{2}$", + "SVK": r"^[0-9]{3}\s?[0-9]{2}$", + "SM": r"^4789[0-9]$", + "SMR": r"^4789[0-9]$", + "SN": r"^[0-9]{5}$", + "SEN": r"^[0-9]{5}$", + "SO": r"^[A-Z]{2}\s?[0-9]{5}$", + "SOM": r"^[A-Z]{2}\s?[0-9]{5}$", + "SV": r"^CP\s?[0-9]{4}$", + "SLV": r"^CP\s?[0-9]{4}$", + "SZ": r"^[A-Z][0-9]{3}$", + "SWZ": r"^[A-Z][0-9]{3}$", + "TC": r"^TKCA 1ZZ$", + "TCA": r"^TKCA 1ZZ$", + "TH": r"^[0-9]{5}$", + "THA": r"^[0-9]{5}$", + "TJ": r"^[0-9]{6}$", + "TJK": r"^[0-9]{6}$", + "TM": r"^[0-9]{6}$", + "TKM": r"^[0-9]{6}$", + "TN": r"^[0-9]{4}$", + "TUN": r"^[0-9]{4}$", + "TR": r"^[0-9]{5}$", + "TUR": r"^[0-9]{5}$", + "TW": r"^[0-9]{3}([0-9]{2})?$", + "TWN": r"^[0-9]{3}([0-9]{2})?$", + "TZ": r"^[0-9]{5}$", + "TZA": r"^[0-9]{5}$", + "UA": r"^[0-9]{5}$", + "UKR": r"^[0-9]{5}$", + "UM": r"^96898$", + "UMI": r"^96898$", + "US": r"^[0-9]{5}(-[0-9]{4})?$", + "USA": r"^[0-9]{5}(-[0-9]{4})?$", + "ZIP": r"^[0-9]{5}(-[0-9]{4})?$", # Alias for US + "UY": r"^[0-9]{5}$", + "URY": r"^[0-9]{5}$", + "UZ": r"^[0-9]{6}$", + "UZB": r"^[0-9]{6}$", + "VA": r"^00120$", + "VAT": r"^00120$", + "VC": r"^VC[0-9]{4}$", + "VCT": r"^VC[0-9]{4}$", + "VE": r"^[0-9]{4}$", + "VEN": r"^[0-9]{4}$", + "VG": r"^VG[0-9]{4}$", + "VGB": r"^VG[0-9]{4}$", + "VI": r"^008[0-9]{2}$", + "VIR": r"^008[0-9]{2}$", + "VN": r"^[0-9]{6}$", + "VNM": r"^[0-9]{6}$", + "WF": r"^986[0-9]{2}$", + "WLF": r"^986[0-9]{2}$", + "YT": r"^976[0-9]{2}$", + "MYT": r"^976[0-9]{2}$", + "ZA": r"^[0-9]{4}$", + "ZAF": r"^[0-9]{4}$", + "ZM": r"^[0-9]{5}$", + "ZMB": r"^[0-9]{5}$", + } + + return postal_patterns.get(country.upper(), r"^[0-9A-Z\s-]+$") + + +# Helper functions for string cleaning + + +def remove_hyphens(x: str, replacement: str = "") -> str: + """Remove hyphens from a string.""" + return x.replace("-", replacement) + + +def remove_spaces(x: str, replacement: str = "") -> str: + """Remove spaces from a string.""" + return x.replace(" ", replacement) + + +def remove_letters(x: str, replacement: str = "") -> str: + """Remove letters from a string.""" + return re.sub(r"[a-zA-Z]", replacement, x) + + +def remove_punctuation(x: str, replacement: str = " ") -> str: + """Remove punctuation from a string.""" + return re.sub(r"[^\w\s]", replacement, x) + + +# Validation functions + + +def is_isbn_10(x: str) -> bool: + """ + Check if a string is a valid ISBN-10. + + Parameters + ---------- + x + String to validate. + + Returns + ------- + bool + True if valid ISBN-10, False otherwise. + """ + x = remove_hyphens(x) + x = remove_punctuation(x) + x = x.lower() + x = remove_spaces(x) + + if not re.match(r"\d{9}[0-9x]", x): + return False + + digits = list(x) + + # If the check digit is "x" then substitute that for "10" + if digits[9] == "x": + digits[9] = "10" + + # Recast as integer values + try: + digits = [int(d) for d in digits] + except ValueError: + return False + + # The sum of vector multiplication of digits by the digit + # weights (10 to 1 across the digits) should be + # divided evenly by 11 for this to be a valid ISBN-10 + return sum(d * w for d, w in zip(digits, range(10, 0, -1))) % 11 == 0 + + +def is_isbn_13(x: str) -> bool: + """ + Check if a string is a valid ISBN-13. + + Parameters + ---------- + x + String to validate. + + Returns + ------- + bool + True if valid ISBN-13, False otherwise. + """ + x = remove_hyphens(x) + + if not re.match(r"\d{13}", x): + return False + + try: + digits = [int(d) for d in x] + except ValueError: + return False + + check = digits[12] + remainder = sum(d * w for d, w in zip(digits[:12], [1, 3] * 6)) % 10 + + return (remainder == 0 and check == 0) or (10 - remainder == check) + + +def check_isbn(x: list[str]) -> list[bool]: + """ + Check if strings are valid ISBNs (10 or 13 digit). + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + results = [] + for val in x: + if val is None or (isinstance(val, float) and val != val): # Check for None or NaN + results.append(False) + continue + + val_clean = remove_hyphens(str(val)) + val_clean = remove_punctuation(val_clean) + val_clean = val_clean.lower() + val_clean = remove_spaces(val_clean) + + isbn_length = len(val_clean) + + if isbn_length == 10: + results.append(is_isbn_10(val_clean)) + elif isbn_length == 13: + results.append(is_isbn_13(val_clean)) + else: + results.append(False) + + return results + + +def is_vin(x: str) -> bool: + """ + Check if a string is a valid VIN (Vehicle Identification Number). + + Parameters + ---------- + x + String to validate. + + Returns + ------- + bool + True if valid VIN, False otherwise. + """ + if not re.match(regex_vin(), x.upper()): + return False + + x_lower = x.lower() + digits = list(x_lower) + + weights = [8, 7, 6, 5, 4, 3, 2, 10, 0, 9, 8, 7, 6, 5, 4, 3, 2] + + letter_vals = { + "a": 1, + "b": 2, + "c": 3, + "d": 4, + "e": 5, + "f": 6, + "g": 7, + "h": 8, + "j": 1, + "k": 2, + "l": 3, + "m": 4, + "n": 5, + "p": 7, + "r": 9, + "s": 2, + "t": 3, + "u": 4, + "v": 5, + "w": 6, + "x": 7, + "y": 8, + "z": 9, + } + + total = 0 + for i in range(17): + if not digits[i].isdigit(): + total += letter_vals.get(digits[i], 0) * weights[i] + else: + total += int(digits[i]) * weights[i] + + check = total % 11 + + if check == 10: + check_str = "x" + else: + check_str = str(check) + + return check_str == digits[8] + + +def check_vin(x: list[str]) -> list[bool]: + """ + Check if strings are valid VINs. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + results = [] + for val in x: + if val is None or (isinstance(val, float) and val != val): # Check for None or NaN + results.append(False) + continue + + val_str = str(val) + val_str = remove_hyphens(val_str) + val_str = remove_punctuation(val_str) + val_str = val_str.lower() + val_str = remove_spaces(val_str) + + results.append(is_vin(val_str)) + + return results + + +def luhn(x: str) -> bool: + """ + Check if a string passes the Luhn algorithm (for credit cards). + + Parameters + ---------- + x + String to validate. + + Returns + ------- + bool + True if passes Luhn check, False otherwise. + """ + try: + digits = [int(d) for d in reversed(x)] + except ValueError: + return False + + odd_sum = sum(digits[::2]) + even_digits = [d * 2 for d in digits[1::2]] + even_digits = [d - 9 if d > 9 else d for d in even_digits] + even_sum = sum(even_digits) + + total = odd_sum + even_sum + + return total % 10 == 0 + + +def is_credit_card(x: str) -> bool: + """ + Check if a string is a valid credit card number. + + Parameters + ---------- + x + String to validate. + + Returns + ------- + bool + True if valid credit card number, False otherwise. + """ + if not re.match(regex_credit_card_1(), x): + return False + + x_clean = remove_hyphens(x) + x_clean = remove_punctuation(x_clean) + x_clean = remove_spaces(x_clean) + + if not re.match(regex_credit_card_2(), x_clean): + return False + + return luhn(x_clean) + + +def check_credit_card(x: list[str]) -> list[bool]: + """ + Check if strings are valid credit card numbers. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + return [ + is_credit_card(str(val)) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_iban(x: list[str], country: str | None = None) -> list[bool]: + """ + Check if strings are valid IBANs. + + Parameters + ---------- + x + List of strings to validate. + country + Optional country code for country-specific validation. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_iban(country=country) + return [ + bool(re.match(pattern, str(val).upper())) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_postal_code(x: list[str], country: str) -> list[bool]: + """ + Check if strings are valid postal codes for a given country. + + Parameters + ---------- + x + List of strings to validate. + country + Country code (2 or 3 letter). + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_postal_code(country=country) + return [ + bool(re.match(pattern, str(val).upper())) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_url(x: list[str]) -> list[bool]: + """ + Check if strings are valid URLs. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_url() + return [ + bool(re.match(pattern, str(val))) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_ipv4_address(x: list[str]) -> list[bool]: + """ + Check if strings are valid IPv4 addresses. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_ipv4_address() + return [ + bool(re.match(pattern, str(val))) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_ipv6_address(x: list[str]) -> list[bool]: + """ + Check if strings are valid IPv6 addresses. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_ipv6_address() + return [ + bool(re.match(pattern, str(val))) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_email(x: list[str]) -> list[bool]: + """ + Check if strings are valid email addresses. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_email() + return [ + bool(re.match(pattern, str(val))) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_phone(x: list[str]) -> list[bool]: + """ + Check if strings are valid phone numbers. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_phone() + return [ + bool(re.match(pattern, str(val))) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_mac(x: list[str]) -> list[bool]: + """ + Check if strings are valid MAC addresses. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_mac() + return [ + bool(re.match(pattern, str(val))) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] + + +def check_swift_bic(x: list[str]) -> list[bool]: + """ + Check if strings are valid SWIFT/BIC codes. + + Parameters + ---------- + x + List of strings to validate. + + Returns + ------- + list[bool] + List of boolean values indicating validity. + """ + pattern = regex_swift_bic() + return [ + bool(re.match(pattern, str(val).upper())) + if val is not None and (not isinstance(val, float) or val == val) + else False + for val in x + ] diff --git a/pointblank/validate.py b/pointblank/validate.py index 8f8f0ba91..f4d502b4e 100644 --- a/pointblank/validate.py +++ b/pointblank/validate.py @@ -8645,6 +8645,291 @@ def col_vals_regex( return self + def col_vals_within_spec( + self, + columns: str | list[str] | Column | ColumnSelector | ColumnSelectorNarwhals, + spec: str, + na_pass: bool = False, + pre: Callable | None = None, + segments: SegmentSpec | None = None, + thresholds: int | float | bool | tuple | dict | Thresholds = None, + actions: Actions | None = None, + brief: str | bool | None = None, + active: bool = True, + ) -> Validate: + """ + Validate whether column values fit within a specification. + + The `col_vals_within_spec()` validation method checks whether column values in a table + correspond to a specification (`spec=`) type (details of which are available in the + *Specifications* section). Specifications include common data types like email addresses, + URLs, postal codes, vehicle identification numbers (VINs), International Bank Account + Numbers (IBANs), and more. This validation will operate over the number of test units that + is equal to the number of rows in the table. + + Parameters + ---------- + columns + A single column or a list of columns to validate. Can also use + [`col()`](`pointblank.col`) with column selectors to specify one or more columns. If + multiple columns are supplied or resolved, there will be a separate validation step + generated for each column. + spec + A specification string for defining the specification type. Examples are `"email"`, + `"url"`, and `"postal_code[USA]"`. See the *Specifications* section for all available + options. + na_pass + Should any encountered None, NA, or Null values be considered as passing test units? By + default, this is `False`. Set to `True` to pass test units with missing values. + pre + An optional preprocessing function or lambda to apply to the data table during + interrogation. This function should take a table as input and return a modified table. + Have a look at the *Preprocessing* section for more information on how to use this + argument. + segments + An optional directive on segmentation, which serves to split a validation step into + multiple (one step per segment). Can be a single column name, a tuple that specifies a + column name and its corresponding values to segment on, or a combination of both + (provided as a list). Read the *Segmentation* section for usage information. + thresholds + Set threshold failure levels for reporting and reacting to exceedences of the levels. + The thresholds are set at the step level and will override any global thresholds set in + `Validate(thresholds=...)`. The default is `None`, which means that no thresholds will + be set locally and global thresholds (if any) will take effect. Look at the *Thresholds* + section for information on how to set threshold levels. + actions + Optional actions to take when the validation step(s) meets or exceeds any set threshold + levels. If provided, the [`Actions`](`pointblank.Actions`) class should be used to + define the actions. + brief + An optional brief description of the validation step that will be displayed in the + reporting table. You can use the templating elements like `"{step}"` to insert + the step number, or `"{auto}"` to include an automatically generated brief. If `True` + the entire brief will be automatically generated. If `None` (the default) then there + won't be a brief. + active + A boolean value indicating whether the validation step should be active. Using `False` + will make the validation step inactive (still reporting its presence and keeping indexes + for the steps unchanged). + + Returns + ------- + Validate + The `Validate` object with the added validation step. + + Specifications + -------------- + A specification type must be used with the `spec=` argument. This is a string-based keyword + that corresponds to the type of data in the specified columns. The following keywords can + be used: + + - `"isbn"`: The International Standard Book Number (ISBN) is a unique numerical identifier + for books. This keyword validates both 10-digit and 13-digit ISBNs. + + - `"vin"`: A vehicle identification number (VIN) is a unique code used by the automotive + industry to identify individual motor vehicles. + + - `"postal_code[]"`: A postal code (also known as postcodes, PIN, or ZIP + codes) is a series of letters, digits, or both included in a postal address. Because the + coding varies by country, a country code in either the 2-letter (ISO 3166-1 alpha-2) or + 3-letter (ISO 3166-1 alpha-3) format needs to be supplied (e.g., `"postal_code[US]"` or + `"postal_code[USA]"`). The keyword alias `"zip"` can be used for US ZIP codes. + + - `"credit_card"`: A credit card number can be validated across a variety of issuers. The + validation uses the Luhn algorithm. + + - `"iban[]"`: The International Bank Account Number (IBAN) is a system of + identifying bank accounts across countries. Because the length and coding varies by + country, a country code needs to be supplied (e.g., `"iban[DE]"` or `"iban[DEU]"`). + + - `"swift"`: Business Identifier Codes (also known as SWIFT-BIC, BIC, or SWIFT code) are + unique identifiers for financial and non-financial institutions. + + - `"phone"`, `"email"`, `"url"`, `"ipv4"`, `"ipv6"`, `"mac"`: Phone numbers, email + addresses, Internet URLs, IPv4 or IPv6 addresses, and MAC addresses can be validated with + their respective keywords. + + Only a single `spec=` value should be provided per function call. + + Preprocessing + ------------- + The `pre=` argument allows for a preprocessing function or lambda to be applied to the data + table during interrogation. This function should take a table as input and return a modified + table. This is useful for performing any necessary transformations or filtering on the data + before the validation step is applied. + + The preprocessing function can be any callable that takes a table as input and returns a + modified table. For example, you could use a lambda function to filter the table based on + certain criteria or to apply a transformation to the data. Note that you can refer to + a column via `columns=` that is expected to be present in the transformed table, but may not + exist in the table before preprocessing. Regarding the lifetime of the transformed table, it + only exists during the validation step and is not stored in the `Validate` object or used in + subsequent validation steps. + + Segmentation + ------------ + The `segments=` argument allows for the segmentation of a validation step into multiple + segments. This is useful for applying the same validation step to different subsets of the + data. The segmentation can be done based on a single column or specific fields within a + column. + + Providing a single column name will result in a separate validation step for each unique + value in that column. For example, if you have a column called `"region"` with values + `"North"`, `"South"`, and `"East"`, the validation step will be applied separately to each + region. + + Alternatively, you can provide a tuple that specifies a column name and its corresponding + values to segment on. For example, if you have a column called `"date"` and you want to + segment on only specific dates, you can provide a tuple like + `("date", ["2023-01-01", "2023-01-02"])`. Any other values in the column will be disregarded + (i.e., no validation steps will be created for them). + + A list with a combination of column names and tuples can be provided as well. This allows + for more complex segmentation scenarios. The following inputs are both valid: + + ``` + # Segments from all unique values in the `region` column + # and specific dates in the `date` column + segments=["region", ("date", ["2023-01-01", "2023-01-02"])] + + # Segments from all unique values in the `region` and `date` columns + segments=["region", "date"] + ``` + + The segmentation is performed during interrogation, and the resulting validation steps will + be numbered sequentially. Each segment will have its own validation step, and the results + will be reported separately. This allows for a more granular analysis of the data and helps + identify issues within specific segments. + + Importantly, the segmentation process will be performed after any preprocessing of the data + table. Because of this, one can conceivably use the `pre=` argument to generate a column + that can be used for segmentation. For example, you could create a new column called + `"segment"` through use of `pre=` and then use that column for segmentation. + + Thresholds + ---------- + The `thresholds=` parameter is used to set the failure-condition levels for the validation + step. If they are set here at the step level, these thresholds will override any thresholds + set at the global level in `Validate(thresholds=...)`. + + There are three threshold levels: 'warning', 'error', and 'critical'. The threshold values + can either be set as a proportion failing of all test units (a value between `0` to `1`), + or, the absolute number of failing test units (as integer that's `1` or greater). + + Thresholds can be defined using one of these input schemes: + + 1. use the [`Thresholds`](`pointblank.Thresholds`) class (the most direct way to create + thresholds) + 2. provide a tuple of 1-3 values, where position `0` is the 'warning' level, position `1` is + the 'error' level, and position `2` is the 'critical' level + 3. create a dictionary of 1-3 value entries; the valid keys: are 'warning', 'error', and + 'critical' + 4. a single integer/float value denoting absolute number or fraction of failing test units + for the 'warning' level only + + If the number of failing test units exceeds set thresholds, the validation step will be + marked as 'warning', 'error', or 'critical'. All of the threshold levels don't need to be + set, you're free to set any combination of them. + + Aside from reporting failure conditions, thresholds can be used to determine the actions to + take for each level of failure (using the `actions=` parameter). + + Examples + -------- + ```{python} + #| echo: false + #| output: false + import pointblank as pb + pb.config(report_incl_header=False, report_incl_footer=False, preview_incl_header=False) + ``` + + For the examples here, we'll use a simple Polars DataFrame with an email column. The table + is shown below: + + ```{python} + import pointblank as pb + import polars as pl + + tbl = pl.DataFrame( + { + "email": [ + "user@example.com", + "admin@test.org", + "invalid-email", + "contact@company.co.uk", + ], + } + ) + + pb.preview(tbl) + ``` + + Let's validate that all of the values in the `email` column are valid email addresses. + We'll determine if this validation had any failing test units (there are four test units, + one for each row). + + ```{python} + validation = ( + pb.Validate(data=tbl) + .col_vals_within_spec(columns="email", spec="email") + .interrogate() + ) + + validation + ``` + + The validation table shows that one test unit failed (the invalid email address in row 3). + """ + + assertion_type = _get_fn_name() + + _check_column(column=columns) + _check_pre(pre=pre) + # TODO: add check for segments + # _check_segments(segments=segments) + _check_thresholds(thresholds=thresholds) + _check_boolean_input(param=na_pass, param_name="na_pass") + _check_boolean_input(param=active, param_name="active") + + # Determine threshold to use (global or local) and normalize a local `thresholds=` value + thresholds = ( + self.thresholds if thresholds is None else _normalize_thresholds_creation(thresholds) + ) + + # If `columns` is a ColumnSelector or Narwhals selector, call `col()` on it to later + # resolve the columns + if isinstance(columns, (ColumnSelector, nw.selectors.Selector)): + columns = col(columns) + + # If `columns` is Column value or a string, place it in a list for iteration + if isinstance(columns, (Column, str)): + columns = [columns] + + # Determine brief to use (global or local) and transform any shorthands of `brief=` + brief = self.brief if brief is None else _transform_auto_brief(brief=brief) + + # Package up the `spec=` param into a dictionary for later interrogation + values = {"spec": spec} + + # Iterate over the columns and create a validation step for each + for column in columns: + val_info = _ValidationInfo( + assertion_type=assertion_type, + column=column, + values=values, + na_pass=na_pass, + pre=pre, + segments=segments, + thresholds=thresholds, + actions=actions, + brief=brief, + active=active, + ) + + self._add_validation(validation_info=val_info) + + return self + def col_vals_expr( self, expr: any, @@ -11450,6 +11735,7 @@ def interrogate( "col_vals_in_set", "col_vals_not_in_set", "col_vals_regex", + "col_vals_within_spec", ]: # Process table for column validation tbl = _column_test_prep( @@ -11518,6 +11804,13 @@ def interrogate( tbl=tbl, column=column, values=value, na_pass=na_pass ) + elif assertion_type == "col_vals_within_spec": + from pointblank._interrogation import interrogate_within_spec + + results_tbl = interrogate_within_spec( + tbl=tbl, column=column, values=value, na_pass=na_pass + ) + elif assertion_type == "col_vals_expr": results_tbl = col_vals_expr( data_tbl=data_tbl_step, expr=value, tbl_type=tbl_type @@ -14239,6 +14532,11 @@ def get_tabular_report( values_upd.append(str(pattern)) + elif assertion_type[i] in ["col_vals_within_spec"]: + spec = value["spec"] + + values_upd.append(str(spec)) + elif assertion_type[i] in ["prompt"]: # pragma: no cover # For AI validation, show only the prompt, not the full config if isinstance(value, dict) and "prompt" in value: # pragma: no cover diff --git a/tests/test_col_vals_within_spec.py b/tests/test_col_vals_within_spec.py new file mode 100644 index 000000000..10d7b1bdb --- /dev/null +++ b/tests/test_col_vals_within_spec.py @@ -0,0 +1,925 @@ +import pytest + +import ibis +import pandas as pd +import polars as pl + +from pointblank.validate import Validate + + +IBAN_VALID = { + "AT": [ + "AT582774098454337653", + "AT220332087576467472", + ], + "DE": [ + "DE06495352657836424132", + "DE09121688720378475751", + ], + "GB": [ + "GB39MUJS50172570996370", + "GB14SIPV86193224493527", + ], +} + +POSTAL_CODE_VALID = { + "US": ["99553", "36264", "71660", "85225", "90309"], + "CA": ["L6M 3V5", "V7G 1V1", "B2X 1R5", "E2K 1H3", "M4Y 3C1"], + "DE": ["01945", "03119", "08393", "36457", "99996"], +} + +CREDIT_CARD_VALID = [ + "340000000000009", # American Express + "378734493671000", # American Express Corporate + "6703444444444449", # Bancontact + "4035501000000008", # Cartes Bancaires + "6011000000000004", # Discover + "5500000000000004", # MasterCard + "4012888888881881", # Visa +] + +CREDIT_CARD_INVALID = [ + "ABCDEFJHIGK", + "340000000000000", + "378734493671001", + "5500000000000005", +] + +VIN_VALID = [ + "1FTEW1E41KKD70581", + "ZARBAAB46LM355009", + "JTEBH3FJ60K093139", + "1HD1FS4178Y631180", +] + +VIN_INVALID = [ + "7A8GK4M0706100372", + "WVWZZZ1KZ7U022191", +] + +ISBN_10_VALID = [ + "1101907932", + "0375712356", + "0307957802", + "067940581X", +] + +ISBN_13_VALID = [ + "978-1101907931", + "978-0375712357", + "978-0307957801", +] + +ISBN_10_INVALID = [ + "1101907931", + "0375712358", +] + +ISBN_13_INVALID = [ + "978-1101907930", + "978-0375712358", +] + +SWIFT_BIC_VALID = [ + "RBOSGGSX", + "RZTIAT22263", + "BCEELULL", + "MARKDEFF", +] + +SWIFT_BIC_INVALID = [ + "CE1EL2LLFFF", # Invalid: digits in bank code + "E31DCLLFFF", # Invalid: digits in bank code +] + +PHONE_VALID = [ + "+5-555-555-5555", + "5-555-555-5555", + "555-555-5555", + "(555)555-5555", + "+1 (555) 555 5555", +] + +PHONE_INVALID = [ + "", + "123", + "text", +] + +MAC_VALID = [ + "01-2d-4c-ef-89-ab", + "01-2D-4C-EF-89-AB", + "01:2d:4c:ef:89:ab", + "01:2D:4C:EF:89:AB", +] + +MAC_INVALID = [ + "999999999", + "01-2d-4c-ef-89-ab-06", + "text", +] + +EMAIL_VALID = [ + "test@test.com", + "mail+mail@example.com", + "mail.email@e.test.com", +] + +EMAIL_INVALID = [ + "", + "test", + "@test.com", + "mail@example", +] + +URL_VALID = [ + "http://foo.com/blah_blah", + "https://www.example.com/foo/?bar=baz&inga=42&quux", + "ftp://foo.bar/baz", +] + +URL_INVALID = [ + "http://", + "foo.com", + "http:// shouldfail.com", +] + +IPV4_ADDRESS_VALID = [ + "93.184.220.20", + "161.148.172.130", + "0.0.0.0", + "255.255.255.255", +] + +IPV4_ADDRESS_INVALID = [ + "256.255.255.255", + "2001:0db8:0000:85a3:0000:0000:ac1f:8001", + "", +] + +IPV6_ADDRESS_VALID = [ + "2001:0db8:0000:85a3:0000:0000:ac1f:8001", + "2001:db8:0:85a3:0:0:ac1f:8001", +] + +IPV6_ADDRESS_INVALID = [ + "0db8:0000:85a3:0000:0000:ac1f:8001", + "93.184.220.20", +] + + +@pytest.fixture +def email_valid_pl(): + return pl.DataFrame({"email": EMAIL_VALID}) + + +@pytest.fixture +def email_valid_pd(): + return pd.DataFrame({"email": EMAIL_VALID}) + + +@pytest.fixture +def email_valid_duckdb(tmp_path): + """Create a temporary DuckDB database with email data.""" + db_path = tmp_path / "email_test.ddb" + con = ibis.connect(f"duckdb://{db_path}") + + # Create table with email data + df = pl.DataFrame({"email": EMAIL_VALID}) + con.create_table("email_data", df, overwrite=True) + + return con.table("email_data") + + +@pytest.fixture +def email_valid_sqlite(tmp_path): + """Create a temporary SQLite database with email data.""" + db_path = tmp_path / "email_test.sqlite" + con = ibis.sqlite.connect(db_path) + + # Create table with email data + df = pl.DataFrame({"email": EMAIL_VALID}) + con.create_table("email_data", df, overwrite=True) + + return con.table("email_data") + + +@pytest.fixture +def credit_card_valid_pl(): + return pl.DataFrame({"card": CREDIT_CARD_VALID}) + + +@pytest.fixture +def credit_card_valid_pd(): + return pd.DataFrame({"card": CREDIT_CARD_VALID}) + + +@pytest.fixture +def credit_card_valid_duckdb(tmp_path): + """Create a temporary DuckDB database with credit card data.""" + db_path = tmp_path / "card_test.ddb" + con = ibis.connect(f"duckdb://{db_path}") + + df = pl.DataFrame({"card": CREDIT_CARD_VALID}) + con.create_table("card_data", df, overwrite=True) + + return con.table("card_data") + + +@pytest.fixture +def credit_card_valid_sqlite(tmp_path): + """Create a temporary SQLite database with credit card data.""" + db_path = tmp_path / "card_test.sqlite" + con = ibis.sqlite.connect(db_path) + + df = pl.DataFrame({"card": CREDIT_CARD_VALID}) + con.create_table("card_data", df, overwrite=True) + + return con.table("card_data") + + +@pytest.mark.parametrize( + "fixture_name", + ["email_valid_pl", "email_valid_pd", "email_valid_duckdb", "email_valid_sqlite"], +) +def test_email_validation_valid(fixture_name, request): + """Test valid email addresses across different backends.""" + tbl = request.getfixturevalue(fixture_name) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="email", spec="email").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == len(EMAIL_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_email_validation_invalid(): + """Test invalid email addresses.""" + tbl = pl.DataFrame({"email": EMAIL_INVALID}) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="email", spec="email").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(EMAIL_INVALID) + + +@pytest.mark.parametrize( + "fixture_name", + [ + "credit_card_valid_pl", + "credit_card_valid_pd", + "credit_card_valid_duckdb", + "credit_card_valid_sqlite", + ], +) +def test_credit_card_validation_valid(fixture_name, request): + """Test valid credit card numbers across different backends.""" + tbl = request.getfixturevalue(fixture_name) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="card", spec="credit_card").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == len(CREDIT_CARD_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_credit_card_validation_invalid(): + """Test invalid credit card numbers.""" + tbl = pl.DataFrame({"card": CREDIT_CARD_INVALID}) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="card", spec="credit_card").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(CREDIT_CARD_INVALID) + + +@pytest.mark.parametrize("country", ["AT", "DE", "GB"]) +def test_iban_validation_valid(country): + """Test valid IBANs for different countries.""" + tbl = pl.DataFrame({"iban": IBAN_VALID[country]}) + + validation = ( + Validate(data=tbl) + .col_vals_within_spec(columns="iban", spec=f"iban[{country}]") + .interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == len(IBAN_VALID[country]) + assert validation.n_failed(i=1, scalar=True) == 0 + + +@pytest.mark.parametrize("country,column", [("US", "zip"), ("CA", "postal_code"), ("DE", "plz")]) +def test_postal_code_validation_valid(country, column): + """Test valid postal codes for different countries.""" + tbl = pl.DataFrame({column: POSTAL_CODE_VALID[country]}) + + validation = ( + Validate(data=tbl) + .col_vals_within_spec(columns=column, spec=f"postal_code[{country}]") + .interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == len(POSTAL_CODE_VALID[country]) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_vin_validation_valid(): + """Test valid VINs.""" + tbl = pl.DataFrame({"vin": VIN_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="vin", spec="vin").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(VIN_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_vin_validation_invalid(): + """Test invalid VINs.""" + tbl = pl.DataFrame({"vin": VIN_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="vin", spec="vin").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(VIN_INVALID) + + +def test_isbn_10_validation_valid(): + """Test valid ISBN-10 numbers.""" + tbl = pl.DataFrame({"isbn": ISBN_10_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="isbn", spec="isbn").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(ISBN_10_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_isbn_13_validation_valid(): + """Test valid ISBN-13 numbers.""" + tbl = pl.DataFrame({"isbn": ISBN_13_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="isbn", spec="isbn").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(ISBN_13_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_isbn_10_validation_invalid(): + """Test invalid ISBN-10 numbers.""" + tbl = pl.DataFrame({"isbn": ISBN_10_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="isbn", spec="isbn").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(ISBN_10_INVALID) + + +def test_isbn_13_validation_invalid(): + """Test invalid ISBN-13 numbers.""" + tbl = pl.DataFrame({"isbn": ISBN_13_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="isbn", spec="isbn").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(ISBN_13_INVALID) + + +def test_phone_validation_valid(): + """Test valid phone numbers.""" + tbl = pl.DataFrame({"phone": PHONE_VALID}) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="phone", spec="phone").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == len(PHONE_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_phone_validation_invalid(): + """Test invalid phone numbers.""" + tbl = pl.DataFrame({"phone": PHONE_INVALID}) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="phone", spec="phone").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(PHONE_INVALID) + + +def test_mac_validation_valid(): + """Test valid MAC addresses.""" + tbl = pl.DataFrame({"mac": MAC_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="mac", spec="mac").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(MAC_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_mac_validation_invalid(): + """Test invalid MAC addresses.""" + tbl = pl.DataFrame({"mac": MAC_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="mac", spec="mac").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(MAC_INVALID) + + +def test_swift_bic_validation_valid(): + """Test valid SWIFT/BIC codes.""" + tbl = pl.DataFrame({"swift": SWIFT_BIC_VALID}) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="swift", spec="swift").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == len(SWIFT_BIC_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_swift_bic_validation_invalid(): + """Test invalid SWIFT/BIC codes.""" + tbl = pl.DataFrame({"swift": SWIFT_BIC_INVALID}) + + validation = ( + Validate(data=tbl).col_vals_within_spec(columns="swift", spec="swift").interrogate() + ) + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(SWIFT_BIC_INVALID) + + +def test_url_validation_valid(): + """Test valid URLs.""" + tbl = pl.DataFrame({"url": URL_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="url", spec="url").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(URL_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_url_validation_invalid(): + """Test invalid URLs.""" + tbl = pl.DataFrame({"url": URL_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="url", spec="url").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(URL_INVALID) + + +def test_ipv4_validation_valid(): + """Test valid IPv4 addresses.""" + tbl = pl.DataFrame({"ip": IPV4_ADDRESS_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="ip", spec="ipv4").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(IPV4_ADDRESS_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_ipv4_validation_invalid(): + """Test invalid IPv4 addresses.""" + tbl = pl.DataFrame({"ip": IPV4_ADDRESS_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="ip", spec="ipv4").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(IPV4_ADDRESS_INVALID) + + +def test_ipv6_validation_valid(): + """Test valid IPv6 addresses.""" + tbl = pl.DataFrame({"ip": IPV6_ADDRESS_VALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="ip", spec="ipv6").interrogate() + + assert validation.n_passed(i=1, scalar=True) == len(IPV6_ADDRESS_VALID) + assert validation.n_failed(i=1, scalar=True) == 0 + + +def test_ipv6_validation_invalid(): + """Test invalid IPv6 addresses.""" + tbl = pl.DataFrame({"ip": IPV6_ADDRESS_INVALID}) + + validation = Validate(data=tbl).col_vals_within_spec(columns="ip", spec="ipv6").interrogate() + + assert validation.n_passed(i=1, scalar=True) == 0 + assert validation.n_failed(i=1, scalar=True) == len(IPV6_ADDRESS_INVALID) + + +def test_na_pass_false(): + """Test that NA values fail when na_pass=False.""" + tbl = pl.DataFrame({"email": ["test@test.com", None, "invalid"]}) + + validation = ( + Validate(data=tbl) + .col_vals_within_spec(columns="email", spec="email", na_pass=False) + .interrogate() + ) + + # Should have 1 pass, 2 fails (None and "invalid") + assert validation.n_passed(i=1, scalar=True) == 1 + assert validation.n_failed(i=1, scalar=True) == 2 + + +def test_na_pass_true(): + """Test that NA values pass when na_pass=True.""" + tbl = pl.DataFrame({"email": ["test@test.com", None, "invalid"]}) + + validation = ( + Validate(data=tbl) + .col_vals_within_spec(columns="email", spec="email", na_pass=True) + .interrogate() + ) + + # Should have 2 passes (valid email and None), 1 fail ("invalid") + assert validation.n_passed(i=1, scalar=True) == 2 + assert validation.n_failed(i=1, scalar=True) == 1 + + +def test_regex_specs_no_materialization_ibis(): + """ + Test that regex-based specs (email, url, phone, etc.) don't materialize Ibis tables. + + This verifies that simple regex validations use Narwhals directly, + avoiding data transfer from remote databases. + """ + # Create a DuckDB table with email data + con = ibis.connect("duckdb://") + data = { + "id": [1, 2, 3, 4], + "email": [ + "valid@example.com", + "another.valid@test.org", + "invalid-email", + None, + ], + } + tbl = con.create_table("email_test", data, overwrite=True) + + # Validate emails - should NOT materialize the table + validation = ( + Validate(data=tbl) + .col_vals_within_spec(columns="email", spec="email", na_pass=False) + .interrogate() + ) + + # Verify results are correct + assert validation.n_passed(i=1, scalar=True) == 2 # Two valid emails + assert validation.n_failed(i=1, scalar=True) == 2 # Invalid email + None + + # Test other regex-based specs to ensure they all work without materialization + specs_to_test = [ + ("url", ["https://example.com", "ftp://test.org", "not-a-url", None]), + ("phone", ["+1-555-123-4567", "555.123.4567", "invalid", None]), + ("ipv4", ["192.168.1.1", "10.0.0.1", "999.999.999.999", None]), + ("mac", ["00:1B:44:11:3A:B7", "00-1B-44-11-3A-B7", "invalid", None]), + ("swift_bic", ["DEUTDEFF", "NEDSZAJJ", "invalid", None]), + ] + + for spec, test_data in specs_to_test: + data = {"id": list(range(len(test_data))), "value": test_data} + tbl = con.create_table(f"{spec}_test", data, overwrite=True) + + validation = ( + Validate(data=tbl) + .col_vals_within_spec(columns="value", spec=spec, na_pass=False) + .interrogate() + ) + + # Each spec has 2 valid values, 2 invalid (including None) + assert validation.n_passed(i=1, scalar=True) == 2, f"Failed for spec: {spec}" + assert validation.n_failed(i=1, scalar=True) == 2, f"Failed for spec: {spec}" + + +@pytest.fixture +def vin_test_data_duckdb(): + """Create DuckDB table with VIN test data.""" + con = ibis.connect("duckdb://") + + data = { + "id": [1, 2, 3, 4, 5], + "vin": [ + "1HGBH41JXMN109186", # Valid VIN + "1M8GDM9AXKP042788", # Valid VIN + "1HGBH41JXM0109186", # Invalid (contains 'O') + "1HGBH41JXMN10918", # Invalid (too short) + None, # NULL + ], + } + + return con.create_table("vin_data", data, overwrite=True) + + +@pytest.fixture +def vin_test_data_sqlite(): + """Create SQLite table with VIN test data.""" + con = ibis.connect("sqlite://") + + data = { + "id": [1, 2, 3, 4, 5], + "vin": [ + "1HGBH41JXMN109186", # Valid VIN + "1M8GDM9AXKP042788", # Valid VIN + "1HGBH41JXM0109186", # Invalid (contains 'O') + "1HGBH41JXMN10918", # Invalid (too short) + None, # NULL + ], + } + + return con.create_table("vin_data", data, overwrite=True) + + +@pytest.fixture +def credit_card_test_data_duckdb(): + """Create DuckDB table with credit card test data.""" + con = ibis.connect("duckdb://") + + data = { + "id": [1, 2, 3, 4, 5, 6, 7, 8], + "card_number": [ + "4532015112830366", # Valid Visa + "5425233430109903", # Valid Mastercard + "374245455400126", # Valid Amex (15 digits) + "4532-0151-1283-0366", # Valid Visa with hyphens + "4532 0151 1283 0366", # Valid Visa with spaces + "4532015112830367", # Invalid (wrong check digit) + "1234567890123", # Invalid (fails Luhn) + None, # NULL + ], + } + + return con.create_table("card_data", data, overwrite=True) + + +@pytest.fixture +def credit_card_test_data_sqlite(): + """Create SQLite table with credit card test data.""" + con = ibis.connect("sqlite://") + + data = { + "id": [1, 2, 3, 4, 5, 6, 7, 8], + "card_number": [ + "4532015112830366", # Valid Visa + "5425233430109903", # Valid Mastercard + "374245455400126", # Valid Amex (15 digits) + "4532-0151-1283-0366", # Valid Visa with hyphens + "4532 0151 1283 0366", # Valid Visa with spaces + "4532015112830367", # Invalid (wrong check digit) + "1234567890123", # Invalid (fails Luhn) + None, # NULL + ], + } + + return con.create_table("card_data", data, overwrite=True) + + +def test_vin_validation_duckdb_basic(vin_test_data_duckdb): + """Test basic VIN validation with DuckDB (database-native).""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=vin_test_data_duckdb, + column="vin", + values={"spec": "vin"}, + na_pass=False, + ) + + # Execute to check results + result_df = result.execute() + + # First two VINs should be valid + assert result_df["pb_is_good_"][0] == True + assert result_df["pb_is_good_"][1] == True + + # Third VIN invalid (contains 'O') + assert result_df["pb_is_good_"][2] == False + + # Fourth VIN invalid (too short) + assert result_df["pb_is_good_"][3] == False + + # Fifth is NULL, should fail with na_pass=False + assert result_df["pb_is_good_"][4] == False + + +def test_vin_validation_duckdb_na_pass(vin_test_data_duckdb): + """Test VIN validation with na_pass=True (DuckDB, database-native).""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=vin_test_data_duckdb, + column="vin", + values={"spec": "vin"}, + na_pass=True, + ) + + # Execute to check results + result_df = result.execute() + + # NULL should pass with na_pass=True + assert result_df["pb_is_good_"][4] == True + + +def test_vin_validation_sqlite_basic(vin_test_data_sqlite): + """Test basic VIN validation with SQLite (database-native).""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=vin_test_data_sqlite, + column="vin", + values={"spec": "vin"}, + na_pass=False, + ) + + # Execute to check results + result_df = result.execute() + + # First two VINs should be valid + assert result_df["pb_is_good_"][0] == True + assert result_df["pb_is_good_"][1] == True + + # Third VIN invalid (contains 'O') + assert result_df["pb_is_good_"][2] == False + + # Fourth VIN invalid (too short) + assert result_df["pb_is_good_"][3] == False + + # Fifth is NULL, should fail with na_pass=False + assert result_df["pb_is_good_"][4] == False + + +def test_vin_validation_no_materialization(vin_test_data_duckdb): + """ + Verify that database-native validation doesn't materialize data. + + This test confirms that the validation is performed as a lazy Ibis expression + and only executed when explicitly called. + """ + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=vin_test_data_duckdb, + column="vin", + values={"spec": "vin"}, + na_pass=False, + ) + + # Result should still be an Ibis table (not materialized) + assert hasattr(result, "execute") + + # Should be able to chain more operations without executing + filtered = result.filter(result["pb_is_good_"] == True) + assert hasattr(filtered, "execute") + + # Only materialize when we explicitly execute + materialized = filtered.execute() + assert len(materialized) == 2 # Only 2 valid VINs + + +def test_unsupported_spec_raises_error(vin_test_data_duckdb): + """Test that unsupported specs raise NotImplementedError.""" + from pointblank._interrogation import interrogate_within_spec_db + + with pytest.raises(NotImplementedError, match="Database-native validation for 'email'"): + interrogate_within_spec_db( + tbl=vin_test_data_duckdb, + column="vin", + values={"spec": "email"}, + na_pass=False, + ) + + +def test_fallback_to_regular_for_non_ibis(): + """Test that non-Ibis tables fall back to regular implementation.""" + from pointblank._interrogation import interrogate_within_spec_db + + # Create a Polars DataFrame + df = pl.DataFrame( + { + "vin": [ + "1HGBH41JXMN109186", # Valid + "1HGBH41JXM0109186", # Invalid (contains 'O') + ] + } + ) + + # Should fall back to regular implementation (which will work) + result = interrogate_within_spec_db( + tbl=df, + column="vin", + values={"spec": "vin"}, + na_pass=False, + ) + + # Result should be a Polars DataFrame (not Ibis) + assert isinstance(result, pl.DataFrame) + assert "pb_is_good_" in result.columns + + +def test_credit_card_validation_duckdb_basic(credit_card_test_data_duckdb): + """Test basic credit card validation with DuckDB (database-native).""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=credit_card_test_data_duckdb, + column="card_number", + values={"spec": "credit_card"}, + na_pass=False, + ) + + # Execute to check results + result_df = result.execute() + + # First five should be valid (including formatted ones) + assert result_df["pb_is_good_"][0] == True # Valid Visa + assert result_df["pb_is_good_"][1] == True # Valid Mastercard + assert result_df["pb_is_good_"][2] == True # Valid Amex + assert result_df["pb_is_good_"][3] == True # Valid with hyphens + assert result_df["pb_is_good_"][4] == True # Valid with spaces + + # Sixth should be invalid (wrong check digit) + assert result_df["pb_is_good_"][5] == False + + # Seventh should be invalid (fails Luhn) + assert result_df["pb_is_good_"][6] == False + + # Eighth is NULL, should fail with na_pass=False + assert result_df["pb_is_good_"][7] == False + + +def test_credit_card_validation_duckdb_na_pass(credit_card_test_data_duckdb): + """Test credit card validation with na_pass=True (DuckDB, database-native).""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=credit_card_test_data_duckdb, + column="card_number", + values={"spec": "credit_card"}, + na_pass=True, + ) + + # Execute to check results + result_df = result.execute() + + # NULL should pass with na_pass=True + assert result_df["pb_is_good_"][7] == True + + +def test_credit_card_validation_sqlite_basic(credit_card_test_data_sqlite): + """Test basic credit card validation with SQLite (database-native).""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=credit_card_test_data_sqlite, + column="card_number", + values={"spec": "credit_card"}, + na_pass=False, + ) + + # Execute to check results + result_df = result.execute() + + # First five should be valid + assert result_df["pb_is_good_"][0] == True + assert result_df["pb_is_good_"][1] == True + assert result_df["pb_is_good_"][2] == True + assert result_df["pb_is_good_"][3] == True + assert result_df["pb_is_good_"][4] == True + + # Invalid cards should fail + assert result_df["pb_is_good_"][5] == False + assert result_df["pb_is_good_"][6] == False + assert result_df["pb_is_good_"][7] == False + + +def test_credit_card_validation_no_materialization(credit_card_test_data_duckdb): + """Verify that database-native credit card validation doesn't materialize data.""" + from pointblank._interrogation import interrogate_within_spec_db + + result = interrogate_within_spec_db( + tbl=credit_card_test_data_duckdb, + column="card_number", + values={"spec": "credit_card"}, + na_pass=False, + ) + + # Result should still be an Ibis table (not materialized) + assert hasattr(result, "execute") + + # Should be able to chain more operations without executing + filtered = result.filter(result["pb_is_good_"] == True) + assert hasattr(filtered, "execute") + + # Only materialize when we explicitly execute + materialized = filtered.execute() + assert len(materialized) == 5 # 5 valid credit cards