@@ -24,6 +24,7 @@ IPTABLES_SAVE_CMD = "iptables-save"
2424IP6TABLES_SAVE_CMD = "ip6tables-save"
2525UFW_CMD = "ufw"
2626DPKG_CMD = "dpkg"
27+ ALLOW_SSH_CMD = ["sudo" , UFW_CMD , "allow" , "ssh" ]
2728
2829AFTER_RULES_PATH = os .path .join (UFW_CONFIG_DIR , "after.rules" )
2930AFTER6_RULES_PATH = os .path .join (UFW_CONFIG_DIR , "after6.rules" )
@@ -241,7 +242,7 @@ def reload_ufw():
241242# --- UTILITY & HELPER FUNCTIONS ---
242243
243244
244- def enable_ufw ():
245+ def enable_ufw (force = False ):
245246 """
246247 Ensures UFW is properly enabled and configured for basic access.
247248
@@ -251,6 +252,9 @@ def enable_ufw():
251252 3. If UFW is found to be inactive, it executes `sudo ufw enable`, which
252253 will prompt the user for confirmation via the console.
253254
255+ Args:
256+ force (bool): If True, prompts the user to skip the SSH rule check.
257+
254258 The script will exit if any of these steps fail, as a functioning UFW
255259 is critical for subsequent operations.
256260 """
@@ -266,10 +270,18 @@ def enable_ufw():
266270 )
267271
268272 # --- 2. Allow SSH (Ensures rule exists) ---
269- allow_ssh_command = ['sudo' , UFW_CMD , 'allow' , 'ssh' ]
270- logging .info (f"Ensuring SSH rule exists: { ' ' .join (allow_ssh_command )} " )
271- subprocess .run (allow_ssh_command , check = True )
272- logging .info ("Successfully ensured 'allow ssh' rule." )
273+ skip_ssh = False
274+ if force :
275+ print ("Force mode enabled. Do you want to skip ensuring 'allow ssh' rule? [y/N] " , end = "" , flush = True )
276+ choice = input ().strip ().lower ()
277+ if choice == 'y' :
278+ skip_ssh = True
279+ logging .info ("Skipping SSH rule check as requested." )
280+
281+ if not skip_ssh :
282+ logging .info (f"Ensuring SSH rule exists: { ' ' .join (ALLOW_SSH_CMD )} " )
283+ subprocess .run (ALLOW_SSH_CMD , check = True )
284+ logging .info ("Successfully ensured 'allow ssh' rule." )
273285
274286 # --- 3. Enable UFW if needed ---
275287 if "Status: inactive" in result .stdout :
@@ -880,7 +892,7 @@ def backup():
880892 return backup_helper ()
881893
882894
883- def enable ():
895+ def enable (force = False ):
884896 """
885897 Ensures the Docker UFW fix is applied and that UFW is enabled.
886898
@@ -913,7 +925,7 @@ def enable():
913925 sys .exit (1 )
914926
915927 # Step 2: Enable UFW
916- ufw_enabled_ok = enable_ufw ()
928+ ufw_enabled_ok = enable_ufw (force = force )
917929 summary ["ufw_enabled" ] = ufw_enabled_ok
918930 if not ufw_enabled_ok :
919931 logging .critical ("\n !!! Failed to enable UFW. Aborting !!!" )
@@ -945,7 +957,7 @@ def enable():
945957 logging .info ("\n --- Enable process complete. ---" )
946958
947959
948- def apply_rules ():
960+ def apply_rules (force = False ):
949961 """
950962 Atomically resets and applies the firewall configuration from `udwall.conf`.
951963
@@ -958,6 +970,10 @@ def apply_rules():
958970 index-shifting errors during deletion.
959971 4. Calls the `add_rules()` function to apply the new configuration from
960972 `udwall.conf`.
973+
974+ Args:
975+ force (bool): If True, skips safety checks (REQUIRED_RULES, PROTECTED_RULES)
976+ and prompts for user confirmation.
961977 """
962978 logging .info ("--- Starting atomic apply operation ---" )
963979
@@ -975,14 +991,61 @@ def apply_rules():
975991
976992
977993 # Convert lists of dicts to sets of tuples for easy comparison
978- loaded_rules_set = {tuple (sorted (d .items ())) for d in REQUIRED_RULES }
994+ loaded_rules_set = {tuple (sorted (d .items ())) for d in loaded_rules }
979995
980- for rule in REQUIRED_RULES :
981- if tuple (sorted (rule .items ())) not in loaded_rules_set :
982- logging .error (f"CRITICAL: udwall.conf is missing a required safety rule: { rule } " )
996+ # --- Safety Checks ---
997+ if not force :
998+ # 1. Check for REQUIRED_RULES (exact match)
999+ required_rules_set = {tuple (sorted (d .items ())) for d in REQUIRED_RULES }
1000+ missing_rules = []
1001+ for rule in REQUIRED_RULES :
1002+ if tuple (sorted (rule .items ())) not in loaded_rules_set :
1003+ missing_rules .append (rule )
1004+
1005+ if missing_rules :
1006+ logging .error (f"CRITICAL: udwall.conf is missing required safety rules: { missing_rules } " )
9831007 logging .error ("Aborting apply operation to prevent potential lockout." )
1008+ logging .info ("To bypass this check and apply anyway, use: --apply -f" )
9841009 sys .exit (1 )
985- logging .info ("Success: udwall.conf contains all required safety rules." )
1010+
1011+ # 2. Check for SSH port 22 explicitly (common lockout cause)
1012+ # We look for any rule that allows tcp on port 22
1013+ ssh_found = False
1014+ for rule in loaded_rules :
1015+ if str (rule .get ('to' )) == '22' and rule .get ('connectionType' ) == 'tcp' and rule .get ('isEnabled' ):
1016+ ssh_found = True
1017+ break
1018+
1019+ if not ssh_found :
1020+ logging .error ("CRITICAL: No rule found allowing SSH (port 22/tcp)." )
1021+ logging .error ("Applying this configuration could lock you out of the server." )
1022+ logging .info ("To bypass this check and apply anyway, use: --apply -f" )
1023+ sys .exit (1 )
1024+
1025+ logging .info ("Success: udwall.conf contains all required safety rules." )
1026+
1027+ else :
1028+ # --- Force Mode Confirmation ---
1029+ logging .warning ("!!! FORCE MODE ENABLED !!!" )
1030+ logging .warning ("Skipping safety checks for REQUIRED_RULES and PROTECTED_RULES." )
1031+ logging .warning ("This may result in a lockout if SSH rules are missing." )
1032+
1033+ # Check if 22/tcp is in the new rules to give a specific warning
1034+ ssh_found = False
1035+ for rule in loaded_rules :
1036+ if str (rule .get ('to' )) == '22' and rule .get ('connectionType' ) == 'tcp' and rule .get ('isEnabled' ):
1037+ ssh_found = True
1038+ break
1039+
1040+ if not ssh_found :
1041+ logging .warning ("WARNING: Your new configuration does NOT appear to allow SSH (port 22/tcp)." )
1042+ logging .warning ("You WILL likely lose access to this server." )
1043+
1044+ print ("Are you sure you want to proceed? [Y/n] " , end = "" , flush = True )
1045+ choice = input ().strip ().lower ()
1046+ if choice != 'y' :
1047+ logging .info ("Aborted by user." )
1048+ sys .exit (0 )
9861049
9871050 check_ufw_installed ()
9881051
@@ -991,7 +1054,13 @@ def apply_rules():
9911054 summary ["backup_created" ] = backup ()
9921055
9931056 # --- 1. Define protected rules that should not be deleted ---
994- logging .info (f"Protected rules (will not be deleted): { ', ' .join (PROTECTED_RULES )} " )
1057+ # In force mode, we do NOT protect any rules. We wipe everything not in the config.
1058+ current_protected_rules = [] if force else PROTECTED_RULES
1059+
1060+ if current_protected_rules :
1061+ logging .info (f"Protected rules (will not be deleted): { ', ' .join (current_protected_rules )} " )
1062+ else :
1063+ logging .info ("No protected rules. All existing rules will be removed." )
9951064
9961065 # --- 2. Get all current numbered rules ---
9971066 try :
@@ -1021,7 +1090,7 @@ def apply_rules():
10211090 rule_def_check = rule_def .replace ("(v6)" , "" ).strip ()
10221091
10231092 # Check if the normalized definition is in our protected list
1024- if rule_def_check not in PROTECTED_RULES :
1093+ if rule_def_check not in current_protected_rules :
10251094 # Optional: print what is being marked for deletion for debugging
10261095 # print(f"Marking for deletion: [{rule_num}] {rule_def}")
10271096 rules_to_delete .append (rule_num )
@@ -1286,6 +1355,8 @@ def main():
12861355 group .add_argument ("--apply" , action = "store_true" , help = "Atomically remove old rules and apply the new configuration." )
12871356 group .add_argument ("--status" , action = "store_true" , help = "Show the current UFW status, including numbered rules." )
12881357
1358+ parser .add_argument ("-f" , "--force" , action = "store_true" , help = "Force apply/enable, skipping safety checks." )
1359+
12891360 args = parser .parse_args ()
12901361
12911362 # If no arguments are provided, print help and exit.
@@ -1300,11 +1371,11 @@ def main():
13001371 elif args .dry_run :
13011372 dry_run_rules ()
13021373 elif args .enable :
1303- enable ()
1374+ enable (force = args . force )
13041375 elif args .create :
13051376 create_config_from_ufw ()
13061377 elif args .apply :
1307- apply_rules ()
1378+ apply_rules (force = args . force )
13081379 elif args .status :
13091380 show_status ()
13101381 else :
0 commit comments