1- import subprocess
2- import concurrent .futures
3- import re
4- import json
1+ #!/usr/bin/env python3
2+
3+ import os
54import sys
5+ import json
6+ import argparse
7+ from functools import lru_cache
8+ from typing import Dict , List , Any
69
710from init_paths import *
811from paths import *
912
10- DEFAULT_ARGS = ["-a" , "-n" , "-s" ]
11-
12- def run_show_uri (username ):
13+ @lru_cache (maxsize = None )
14+ def load_json_file (file_path : str ) -> Any :
15+ if not os .path .exists (file_path ):
16+ return None
1317 try :
14- cmd = ["python3" , CLI_PATH , "show-user-uri" , "-u" , username ] + DEFAULT_ARGS
15- result = subprocess .run (cmd , capture_output = True , text = True , check = True )
16- output = result .stdout
17- if "Invalid username" in output :
18- return {"username" : username , "error" : "User not found" }
19- return parse_output (username , output )
20- except subprocess .CalledProcessError as e :
21- return {"username" : username , "error" : e .stderr .strip ()}
22-
23- def parse_output (username , output ):
24- ipv4 = None
25- ipv6 = None
26- normal_sub = None
27- nodes = []
28-
29- ipv4_match = re .search (r"IPv4:\s*(hy2://[^\s]+)" , output )
30- ipv6_match = re .search (r"IPv6:\s*(hy2://[^\s]+)" , output )
31- normal_sub_match = re .search (r"Normal-SUB Sublink:\s*(https?://[^\s]+)" , output )
32-
33- if ipv4_match :
34- ipv4 = ipv4_match .group (1 )
35- if ipv6_match :
36- ipv6 = ipv6_match .group (1 )
37- if normal_sub_match :
38- normal_sub = normal_sub_match .group (1 )
39-
40- node_matches = re .findall (r"Node: (.+?) \(IPv[46]\):\s*(hy2://[^\s]+)" , output )
41- for name , uri in node_matches :
42- nodes .append ({"name" : name .strip (), "uri" : uri })
43-
44-
45- return {
46- "username" : username ,
47- "ipv4" : ipv4 ,
48- "ipv6" : ipv6 ,
49- "nodes" : nodes ,
50- "normal_sub" : normal_sub
18+ with open (file_path , 'r' , encoding = 'utf-8' ) as f :
19+ content = f .read ()
20+ return json .loads (content ) if content else None
21+ except (json .JSONDecodeError , IOError ):
22+ return None
23+
24+ @lru_cache (maxsize = None )
25+ def load_env_file (env_file : str ) -> Dict [str , str ]:
26+ env_vars = {}
27+ if os .path .exists (env_file ):
28+ with open (env_file , 'r' , encoding = 'utf-8' ) as f :
29+ for line in f :
30+ line = line .strip ()
31+ if line and not line .startswith ('#' ) and '=' in line :
32+ key , value = line .split ('=' , 1 )
33+ env_vars [key ] = value .strip ()
34+ return env_vars
35+
36+ def generate_uri (username : str , auth_password : str , ip : str , port : str ,
37+ uri_params : Dict [str , str ], ip_version : int , fragment_tag : str ) -> str :
38+ ip_part = f"[{ ip } ]" if ip_version == 6 and ':' in ip else ip
39+ uri_base = f"hy2://{ username } :{ auth_password } @{ ip_part } :{ port } "
40+ query_string = "&" .join ([f"{ k } ={ v } " for k , v in uri_params .items ()])
41+ return f"{ uri_base } ?{ query_string } #{ fragment_tag } "
42+
43+ def process_users (target_usernames : List [str ]) -> List [Dict [str , Any ]]:
44+ config = load_json_file (CONFIG_FILE )
45+ all_users = load_json_file (USERS_FILE )
46+
47+ if not config or not all_users :
48+ print ("Error: Could not load Hysteria2 configuration or user files." , file = sys .stderr )
49+ sys .exit (1 )
50+
51+ nodes = load_json_file (NODES_JSON_PATH ) or []
52+ port = config .get ("listen" , "" ).split (":" )[- 1 ]
53+ tls_config = config .get ("tls" , {})
54+ hy2_env = load_env_file (CONFIG_ENV )
55+ ns_env = load_env_file (NORMALSUB_ENV )
56+
57+ base_uri_params = {
58+ "insecure" : "1" if tls_config .get ("insecure" , True ) else "0" ,
59+ "sni" : hy2_env .get ('SNI' , '' )
5160 }
61+ obfs_password = config .get ("obfs" , {}).get ("salamander" , {}).get ("password" )
62+ if obfs_password :
63+ base_uri_params ["obfs" ] = "salamander"
64+ base_uri_params ["obfs-password" ] = obfs_password
65+
66+ sha256 = tls_config .get ("pinSHA256" )
67+ if sha256 :
68+ base_uri_params ["pinSHA256" ] = sha256
69+
70+ ip4 = hy2_env .get ('IP4' )
71+ ip6 = hy2_env .get ('IP6' )
72+ ns_domain , ns_port , ns_subpath = ns_env .get ('HYSTERIA_DOMAIN' ), ns_env .get ('HYSTERIA_PORT' ), ns_env .get ('SUBPATH' )
73+
74+ results = []
75+ for username in target_usernames :
76+ user_data = all_users .get (username )
77+ if not user_data or "password" not in user_data :
78+ results .append ({"username" : username , "error" : "User not found or password not set" })
79+ continue
80+
81+ auth_password = user_data ["password" ]
82+ user_output = {"username" : username , "ipv4" : None , "ipv6" : None , "nodes" : [], "normal_sub" : None }
83+
84+ if ip4 and ip4 != "None" :
85+ user_output ["ipv4" ] = generate_uri (username , auth_password , ip4 , port , base_uri_params , 4 , f"{ username } -IPv4" )
86+ if ip6 and ip6 != "None" :
87+ user_output ["ipv6" ] = generate_uri (username , auth_password , ip6 , port , base_uri_params , 6 , f"{ username } -IPv6" )
5288
53- def batch_show_uri (usernames , max_workers = 20 ):
54- with concurrent .futures .ThreadPoolExecutor (max_workers = max_workers ) as executor :
55- results = list (executor .map (run_show_uri , usernames ))
89+ for node in nodes :
90+ if node_name := node .get ("name" ):
91+ if node_ip := node .get ("ip" ):
92+ ip_v = 6 if ':' in node_ip else 4
93+ tag = f"{ username } -{ node_name } "
94+ uri = generate_uri (username , auth_password , node_ip , port , base_uri_params , ip_v , tag )
95+ user_output ["nodes" ].append ({"name" : node_name , "uri" : uri })
96+
97+ if ns_domain and ns_port and ns_subpath :
98+ user_output ["normal_sub" ] = f"https://{ ns_domain } :{ ns_port } /{ ns_subpath } /sub/normal/{ auth_password } #{ username } "
99+
100+ results .append (user_output )
101+
56102 return results
57103
58- if __name__ == "__main__" :
59- if len (sys .argv ) < 2 :
60- print ("Usage: python3 show_uri_json.py user1 user2 ..." )
104+ def main ():
105+ parser = argparse .ArgumentParser (description = "Efficiently generate Hysteria2 URIs for multiple users." )
106+ parser .add_argument ('usernames' , nargs = '*' , help = "A list of usernames to process." )
107+ parser .add_argument ('--all' , action = 'store_true' , help = "Process all users from users.json." )
108+
109+ args = parser .parse_args ()
110+ target_usernames = args .usernames
111+
112+ if args .all :
113+ all_users = load_json_file (USERS_FILE )
114+ if all_users :
115+ target_usernames = list (all_users .keys ())
116+ else :
117+ print ("Error: Could not load users.json to process all users." , file = sys .stderr )
118+ sys .exit (1 )
119+
120+ if not target_usernames :
121+ parser .print_help ()
61122 sys .exit (1 )
62123
63- usernames = sys .argv [1 :]
64- output_list = batch_show_uri (usernames )
65- print (json .dumps (output_list , indent = 2 ))
124+ output_list = process_users (target_usernames )
125+ print (json .dumps (output_list , indent = 2 ))
126+
127+ if __name__ == "__main__" :
128+ main ()
0 commit comments