@@ -2271,6 +2271,175 @@ def _can_connect_to_postgresql(self) -> bool:
22712271 return False
22722272 return True
22732273
2274+ def _calculate_max_worker_processes (self , cpu_cores : int ) -> str | None :
2275+ """Calculate cpu_max_worker_processes configuration value."""
2276+ if self .config .cpu_max_worker_processes == "auto" :
2277+ # auto = minimum(8, 2 * vCores)
2278+ return str (min (8 , 2 * cpu_cores ))
2279+ elif self .config .cpu_max_worker_processes is not None :
2280+ value = self .config .cpu_max_worker_processes
2281+ # Pydantic already enforces minimum of 2 via conint(ge=2)
2282+ # This is an extra safeguard
2283+ if value < 2 :
2284+ raise ValueError (f"cpu-max-worker-processes value { value } is below minimum of 2" )
2285+ cap = 10 * cpu_cores
2286+ if value > cap :
2287+ raise ValueError (
2288+ f"cpu-max-worker-processes value { value } exceeds maximum allowed "
2289+ f"of { cap } (10 * vCores). Please set a value <= { cap } ."
2290+ )
2291+ return str (value )
2292+ return None
2293+
2294+ def _validate_worker_config_value (self , param_name : str , value : int , cpu_cores : int ) -> str :
2295+ """Shared validation logic for worker process parameters.
2296+
2297+ Args:
2298+ param_name: The configuration parameter name (for error messages)
2299+ value: The integer value to validate
2300+ cpu_cores: The number of available CPU cores
2301+
2302+ Returns:
2303+ String representation of the validated value
2304+
2305+ Raises:
2306+ ValueError: If value is less than 2 or exceeds 10 * vCores
2307+ """
2308+ if value < 2 :
2309+ raise ValueError (f"{ param_name } value { value } is below minimum of 2" )
2310+ cap = 10 * cpu_cores
2311+ if value > cap :
2312+ raise ValueError (
2313+ f"{ param_name } value { value } exceeds maximum allowed "
2314+ f"of { cap } (10 * vCores). Please set a value <= { cap } ."
2315+ )
2316+ return str (value )
2317+
2318+ def _calculate_max_parallel_workers (self , base_max_workers : int , cpu_cores : int ) -> str | None :
2319+ """Calculate cpu_max_parallel_workers configuration value."""
2320+ if self .config .cpu_max_parallel_workers == "auto" :
2321+ return str (base_max_workers )
2322+ elif self .config .cpu_max_parallel_workers is not None :
2323+ # Validate the value first
2324+ validated_value_str = self ._validate_worker_config_value (
2325+ "cpu-max-parallel-workers" , self .config .cpu_max_parallel_workers , cpu_cores
2326+ )
2327+ # Apply the min constraint with base_max_workers
2328+ return str (min (int (validated_value_str ), base_max_workers ))
2329+ return None
2330+
2331+ def _calculate_max_parallel_maintenance_workers (
2332+ self , base_max_workers : int , cpu_cores : int
2333+ ) -> str | None :
2334+ """Calculate cpu_max_parallel_maintenance_workers configuration value."""
2335+ if self .config .cpu_max_parallel_maintenance_workers == "auto" :
2336+ return str (base_max_workers )
2337+ elif self .config .cpu_max_parallel_maintenance_workers is not None :
2338+ return self ._validate_worker_config_value (
2339+ "cpu-max-parallel-maintenance-workers" ,
2340+ self .config .cpu_max_parallel_maintenance_workers ,
2341+ cpu_cores ,
2342+ )
2343+ return None
2344+
2345+ def _calculate_max_logical_replication_workers (
2346+ self , base_max_workers : int , cpu_cores : int
2347+ ) -> str | None :
2348+ """Calculate cpu_max_logical_replication_workers configuration value."""
2349+ if self .config .cpu_max_logical_replication_workers == "auto" :
2350+ return str (base_max_workers )
2351+ elif self .config .cpu_max_logical_replication_workers is not None :
2352+ return self ._validate_worker_config_value (
2353+ "cpu-max-logical-replication-workers" ,
2354+ self .config .cpu_max_logical_replication_workers ,
2355+ cpu_cores ,
2356+ )
2357+ return None
2358+
2359+ def _calculate_max_sync_workers_per_subscription (
2360+ self , base_max_workers : int , cpu_cores : int
2361+ ) -> str | None :
2362+ """Calculate cpu_max_sync_workers_per_subscription configuration value."""
2363+ if self .config .cpu_max_sync_workers_per_subscription == "auto" :
2364+ return str (base_max_workers )
2365+ elif self .config .cpu_max_sync_workers_per_subscription is not None :
2366+ return self ._validate_worker_config_value (
2367+ "cpu-max-sync-workers-per-subscription" ,
2368+ self .config .cpu_max_sync_workers_per_subscription ,
2369+ cpu_cores ,
2370+ )
2371+ return None
2372+
2373+ def _calculate_max_parallel_apply_workers_per_subscription (
2374+ self , base_max_workers : int , cpu_cores : int
2375+ ) -> str | None :
2376+ """Calculate cpu_max_parallel_apply_workers_per_subscription configuration value."""
2377+ if self .config .cpu_max_parallel_apply_workers_per_subscription == "auto" :
2378+ return str (base_max_workers )
2379+ elif self .config .cpu_max_parallel_apply_workers_per_subscription is not None :
2380+ return self ._validate_worker_config_value (
2381+ "cpu-max-parallel-apply-workers-per-subscription" ,
2382+ self .config .cpu_max_parallel_apply_workers_per_subscription ,
2383+ cpu_cores ,
2384+ )
2385+ return None
2386+
2387+ def _calculate_worker_process_config (self , cpu_cores : int ) -> dict [str , str ]:
2388+ """Calculate worker process configuration values.
2389+
2390+ Handles 'auto' values and capping logic for worker process parameters.
2391+ Returns a dictionary with the calculated values ready for PostgreSQL.
2392+ """
2393+ result : dict [str , str ] = {}
2394+
2395+ # Calculate cpu_max_worker_processes (baseline for other worker configs)
2396+ cpu_max_worker_processes_value = self ._calculate_max_worker_processes (cpu_cores )
2397+ if cpu_max_worker_processes_value is not None :
2398+ result ["max_worker_processes" ] = cpu_max_worker_processes_value
2399+
2400+ # Get the effective cpu_max_worker_processes for dependent configs
2401+ # Use the calculated value, or fall back to PostgreSQL default (8)
2402+ base_max_workers = int (result .get ("max_worker_processes" , "8" ))
2403+
2404+ # Calculate other worker parameters
2405+ cpu_max_parallel_workers_value = self ._calculate_max_parallel_workers (
2406+ base_max_workers , cpu_cores
2407+ )
2408+ if cpu_max_parallel_workers_value is not None :
2409+ result ["max_parallel_workers" ] = cpu_max_parallel_workers_value
2410+
2411+ cpu_max_parallel_maintenance_workers_value = (
2412+ self ._calculate_max_parallel_maintenance_workers (base_max_workers , cpu_cores )
2413+ )
2414+ if cpu_max_parallel_maintenance_workers_value is not None :
2415+ result ["max_parallel_maintenance_workers" ] = cpu_max_parallel_maintenance_workers_value
2416+
2417+ cpu_max_logical_replication_workers_value = (
2418+ self ._calculate_max_logical_replication_workers (base_max_workers , cpu_cores )
2419+ )
2420+ if cpu_max_logical_replication_workers_value is not None :
2421+ result ["max_logical_replication_workers" ] = cpu_max_logical_replication_workers_value
2422+
2423+ cpu_max_sync_workers_per_subscription_value = (
2424+ self ._calculate_max_sync_workers_per_subscription (base_max_workers , cpu_cores )
2425+ )
2426+ if cpu_max_sync_workers_per_subscription_value is not None :
2427+ result ["max_sync_workers_per_subscription" ] = (
2428+ cpu_max_sync_workers_per_subscription_value
2429+ )
2430+
2431+ cpu_max_parallel_apply_workers_per_subscription_value = (
2432+ self ._calculate_max_parallel_apply_workers_per_subscription (
2433+ base_max_workers , cpu_cores
2434+ )
2435+ )
2436+ if cpu_max_parallel_apply_workers_per_subscription_value is not None :
2437+ result ["max_parallel_apply_workers_per_subscription" ] = (
2438+ cpu_max_parallel_apply_workers_per_subscription_value
2439+ )
2440+
2441+ return result
2442+
22742443 def _api_update_config (self , available_cpu_cores : int ) -> None :
22752444 # Use config value if set, calculate otherwise
22762445 if self .config .experimental_max_connections :
@@ -2286,18 +2455,66 @@ def _api_update_config(self, available_cpu_cores: int) -> None:
22862455 "shared_buffers" : self .config .memory_shared_buffers ,
22872456 "wal_keep_size" : self .config .durability_wal_keep_size ,
22882457 }
2458+
2459+ # Add restart-required worker process parameters via Patroni API
2460+ worker_configs = self ._calculate_worker_process_config (available_cpu_cores )
2461+ if "max_worker_processes" in worker_configs :
2462+ cfg_patch ["max_worker_processes" ] = worker_configs ["max_worker_processes" ]
2463+
2464+ if "max_logical_replication_workers" in worker_configs :
2465+ cfg_patch ["max_logical_replication_workers" ] = worker_configs [
2466+ "max_logical_replication_workers"
2467+ ]
2468+
22892469 base_patch = {}
22902470 if primary_endpoint := self .async_replication .get_primary_cluster_endpoint ():
22912471 base_patch ["standby_cluster" ] = {"host" : primary_endpoint }
22922472 self ._patroni .bulk_update_parameters_controller_by_patroni (cfg_patch , base_patch )
22932473
2294- def update_config (self , is_creating_backup : bool = False ) -> bool :
2295- """Updates Patroni config file based on the existence of the TLS files."""
2296- # Retrieve PostgreSQL parameters.
2474+ def _build_postgresql_parameters (
2475+ self , available_cpu_cores : int , available_memory : int
2476+ ) -> dict | None :
2477+ """Build PostgreSQL configuration parameters.
2478+
2479+ Args:
2480+ available_cpu_cores: Number of available CPU cores
2481+ available_memory: Available memory in bytes
2482+
2483+ Returns:
2484+ Dictionary of PostgreSQL parameters or None if base parameters couldn't be built.
2485+ """
2486+ limit_memory = None
22972487 if self .config .profile_limit_memory :
22982488 limit_memory = self .config .profile_limit_memory * 10 ** 6
2489+
2490+ # Build PostgreSQL parameters.
2491+ pg_parameters = self .postgresql .build_postgresql_parameters (
2492+ self .model .config , available_memory , limit_memory
2493+ )
2494+
2495+ # Calculate and merge worker process configurations
2496+ worker_configs = self ._calculate_worker_process_config (available_cpu_cores )
2497+
2498+ # Add cpu_wal_compression configuration (separate from worker processes)
2499+ if self .config .cpu_wal_compression is not None :
2500+ cpu_wal_compression = "on" if self .config .cpu_wal_compression else "off"
2501+ else :
2502+ # Use config.yaml default when unset (default: true)
2503+ cpu_wal_compression = "on"
2504+
2505+ if pg_parameters is not None :
2506+ pg_parameters .update (worker_configs )
2507+ pg_parameters ["wal_compression" ] = cpu_wal_compression
22992508 else :
2300- limit_memory = None
2509+ pg_parameters = dict (worker_configs )
2510+ pg_parameters ["wal_compression" ] = cpu_wal_compression
2511+ logger .debug (f"pg_parameters set to worker_configs = { pg_parameters } " )
2512+
2513+ return pg_parameters
2514+
2515+ def update_config (self , is_creating_backup : bool = False ) -> bool :
2516+ """Updates Patroni config file based on the existence of the TLS files."""
2517+ # Retrieve PostgreSQL parameters.
23012518 try :
23022519 available_cpu_cores , available_memory = self .get_available_resources ()
23032520 except ApiError as e :
@@ -2306,11 +2523,9 @@ def update_config(self, is_creating_backup: bool = False) -> bool:
23062523 return False
23072524 raise e
23082525
2309- # TODO Updating the lib should accept ConfigData
2310- postgresql_parameters = self .postgresql .build_postgresql_parameters (
2311- self .model .config , # type: ignore
2312- available_memory ,
2313- limit_memory ,
2526+ # Build PostgreSQL parameters
2527+ postgresql_parameters = self ._build_postgresql_parameters (
2528+ available_cpu_cores , available_memory
23142529 )
23152530
23162531 # replication_slots = self.logical_replication.replication_slots()
0 commit comments