|
| 1 | +r"""Fast .env file parser - optimized for performance. |
| 2 | +
|
| 3 | +Key features: |
| 4 | +1. UTF-8 BOM support (\ufeff) |
| 5 | +2. Escape sequences parsing (\n, \t, etc) |
| 6 | +3. Whitespace preservation inside quotes |
| 7 | +4. Strict variable name validation (isidentifier) |
| 8 | +5. Robust 'export' keyword support |
| 9 | +6. Correct duplicate handling |
| 10 | +7. Special symbols in unquoted values |
| 11 | +""" |
| 12 | + |
| 13 | +import os |
| 14 | + |
| 15 | +# Global cache |
| 16 | +_FILE_CACHE: dict[str, dict[str, str]] = {} |
| 17 | + |
| 18 | +# Optimization constants |
| 19 | +_BOM = "\ufeff" |
| 20 | +_EXPORT_LEN = 6 # len("export") |
| 21 | + |
| 22 | + |
| 23 | +def parse_env_file(file_path: str, encoding: str | None = "utf-8") -> dict[str, str]: # noqa: C901, PLR0912 |
| 24 | + """Fast .env file parser with production-grade robustness. |
| 25 | +
|
| 26 | + Optimized for speed while handling edge cases correctly. |
| 27 | + """ |
| 28 | + cache_key = f"{file_path}:{encoding}" |
| 29 | + if cache_key in _FILE_CACHE: |
| 30 | + return _FILE_CACHE[cache_key] |
| 31 | + |
| 32 | + env_vars: dict[str, str] = {} |
| 33 | + |
| 34 | + try: |
| 35 | + # 1. Fast read with immediate BOM handling |
| 36 | + with open(file_path, encoding=encoding) as f: |
| 37 | + content = f.read() |
| 38 | + |
| 39 | + # Remove BOM if present |
| 40 | + if content.startswith(_BOM): |
| 41 | + content = content[1:] |
| 42 | + |
| 43 | + # Local references for loop speed |
| 44 | + _str_strip = str.strip |
| 45 | + _str_startswith = str.startswith |
| 46 | + |
| 47 | + for raw_line in content.splitlines(): |
| 48 | + # Fast initial cleanup |
| 49 | + line = _str_strip(raw_line) |
| 50 | + |
| 51 | + if not line or _str_startswith(line, "#"): |
| 52 | + continue |
| 53 | + |
| 54 | + # 2. Handle 'export' keyword |
| 55 | + # Check if starts with 'export' followed by space (not a var called 'exporter') |
| 56 | + if ( |
| 57 | + _str_startswith(line, "export") |
| 58 | + and len(line) > _EXPORT_LEN |
| 59 | + and line[_EXPORT_LEN].isspace() |
| 60 | + ): |
| 61 | + line = line[_EXPORT_LEN:].lstrip() |
| 62 | + |
| 63 | + # 3. Atomic partition |
| 64 | + key, sep, value = line.partition("=") |
| 65 | + |
| 66 | + if not sep: |
| 67 | + continue |
| 68 | + |
| 69 | + key = key.strip() |
| 70 | + |
| 71 | + # 4. Variable name validation |
| 72 | + # isidentifier() is implemented in C and covers: |
| 73 | + # - Not starting with number |
| 74 | + # - Only alphanumerics and underscore |
| 75 | + # - No hyphens (bash compliant) |
| 76 | + if not key.isidentifier(): |
| 77 | + continue |
| 78 | + |
| 79 | + # 5. Value parsing |
| 80 | + if not value: |
| 81 | + env_vars[key] = "" |
| 82 | + continue |
| 83 | + |
| 84 | + quote = value[0] if value else "" |
| 85 | + |
| 86 | + # Quote handling logic |
| 87 | + if quote in ('"', "'"): |
| 88 | + # Check if quote closes (ignore orphaned quotes) |
| 89 | + if value.endswith(quote) and len(value) > 1: |
| 90 | + # Extract content |
| 91 | + val_content = value[1:-1] |
| 92 | + |
| 93 | + # Double quotes: Support escape sequences |
| 94 | + if quote == '"': |
| 95 | + # Decode common escapes |
| 96 | + # Manual replace is faster than codecs.decode('unicode_escape') for this subset |
| 97 | + if "\\" in val_content: |
| 98 | + val_content = ( |
| 99 | + val_content.replace("\\n", "\n") |
| 100 | + .replace("\\r", "\r") |
| 101 | + .replace("\\t", "\t") |
| 102 | + .replace('\\"', '"') |
| 103 | + .replace("\\\\", "\\") |
| 104 | + ) |
| 105 | + # Single quotes: Minimal escape processing |
| 106 | + elif quote == "'": |
| 107 | + # Only unescape single quote itself if needed |
| 108 | + if "\\'" in val_content: |
| 109 | + val_content = val_content.replace("\\'", "'") |
| 110 | + |
| 111 | + env_vars[key] = val_content |
| 112 | + else: |
| 113 | + # Broken or unclosed quotes -> Treat as unquoted string |
| 114 | + env_vars[key] = value.strip() |
| 115 | + else: |
| 116 | + # Unquoted value - Preserve leading spaces but allow inline comments |
| 117 | + # Do NOT remove leading spaces to preserve intentionality |
| 118 | + |
| 119 | + # Remove inline comments (e.g., VAL=123 # id) |
| 120 | + if "#" in value: |
| 121 | + # Only partition if # exists to avoid overhead |
| 122 | + value = value.partition("#")[0] |
| 123 | + |
| 124 | + # Remove trailing whitespace only at the end |
| 125 | + env_vars[key] = value.rstrip() |
| 126 | + |
| 127 | + except FileNotFoundError: |
| 128 | + pass |
| 129 | + except Exception: # noqa: S110 |
| 130 | + # In critical production, logging would be ideal, but keeping interface clean |
| 131 | + pass |
| 132 | + |
| 133 | + _FILE_CACHE[cache_key] = env_vars |
| 134 | + return env_vars |
| 135 | + |
| 136 | + |
| 137 | +def load_dotenv( |
| 138 | + dotenv_path: str | None = ".env", |
| 139 | + encoding: str | None = "utf-8", |
| 140 | + *, |
| 141 | + override: bool = False, |
| 142 | +) -> bool: |
| 143 | + """Load environment variables from .env file into os.environ. |
| 144 | +
|
| 145 | + Args: |
| 146 | + dotenv_path: Path to .env file (default: ".env") |
| 147 | + encoding: File encoding (default: "utf-8") |
| 148 | + override: Whether to override existing environment variables (default: False) |
| 149 | +
|
| 150 | + Returns: |
| 151 | + True if file was loaded successfully, False otherwise |
| 152 | + """ |
| 153 | + try: |
| 154 | + env_vars = parse_env_file(dotenv_path, encoding) |
| 155 | + |
| 156 | + if not env_vars: |
| 157 | + return False # Empty or invalid file |
| 158 | + |
| 159 | + if override: |
| 160 | + # Override all variables from file |
| 161 | + os.environ.update(env_vars) |
| 162 | + else: |
| 163 | + # Preserve existing environment variables |
| 164 | + # Direct iteration is faster than sets for small/medium dicts |
| 165 | + environ = os.environ |
| 166 | + for key, value in env_vars.items(): |
| 167 | + if key not in environ: |
| 168 | + environ[key] = value |
| 169 | + |
| 170 | + return True |
| 171 | + except Exception: |
| 172 | + return False |
0 commit comments