1010db = SQLAlchemy ()
1111
1212class User (UserMixin , db .Model ):
13+ # Primary fields
1314 id = db .Column (db .Integer , primary_key = True )
1415 username = db .Column (db .String (80 ), unique = True , nullable = False )
1516 password_hash = db .Column (db .String (120 ), nullable = False )
17+
18+ # 2FA/TOTP fields
1619 totp_secret = db .Column (db .String (32 ), nullable = True )
1720 totp_enabled = db .Column (db .Boolean , default = False )
21+
22+ # Admin and metadata
1823 is_admin = db .Column (db .Boolean , default = False )
1924 created_at = db .Column (db .DateTime , default = datetime .utcnow )
2025 last_login = db .Column (db .DateTime , nullable = True )
2126
22- # DirectAdmin Settings (encrypted )
27+ # DirectAdmin Settings (per-user )
2328 da_server = db .Column (db .String (255 ), nullable = True )
2429 da_username = db .Column (db .String (255 ), nullable = True )
2530 da_password_encrypted = db .Column (db .Text , nullable = True )
2631 da_domain = db .Column (db .String (255 ), nullable = True )
2732
28- # Encryption key for DA password (unique per user)
33+ # Unique encryption key per user for DA password
2934 encryption_key = db .Column (db .String (255 ), nullable = True )
3035
3136 def __init__ (self , ** kwargs ):
37+ """Initialize user with encryption key"""
3238 super (User , self ).__init__ (** kwargs )
3339 # Generate encryption key for this user if not provided
3440 if not self .encryption_key :
3541 self .generate_encryption_key ()
3642
43+ def __repr__ (self ):
44+ return f'<User { self .username } >'
45+
46+ # ===== Encryption Key Management =====
47+
3748 def generate_encryption_key (self ):
38- """Generate a new encryption key"""
49+ """Generate a new encryption key for this user """
3950 self .encryption_key = Fernet .generate_key ().decode ()
4051
52+ # ===== Password Management =====
53+
4154 def set_password (self , password ):
55+ """Hash and store user login password"""
4256 self .password_hash = generate_password_hash (password )
4357
4458 def check_password (self , password ):
59+ """Verify user login password"""
4560 return check_password_hash (self .password_hash , password )
4661
62+ # ===== DirectAdmin Password Management =====
63+
4764 def set_da_password (self , password ):
4865 """Encrypt and store DirectAdmin password"""
4966 if not self .encryption_key :
@@ -54,7 +71,7 @@ def set_da_password(self, password):
5471 f = Fernet (self .encryption_key .encode ())
5572 self .da_password_encrypted = f .encrypt (password .encode ()).decode ()
5673 except Exception as e :
57- print (f"Error encrypting password: { e } " )
74+ print (f"Error encrypting DA password: { e } " )
5875 raise
5976 else :
6077 self .da_password_encrypted = None
@@ -66,38 +83,84 @@ def get_da_password(self):
6683 f = Fernet (self .encryption_key .encode ())
6784 return f .decrypt (self .da_password_encrypted .encode ()).decode ()
6885 except Exception as e :
69- print (f"Error decrypting password: { e } " )
86+ print (f"Error decrypting DA password: { e } " )
7087 return None
7188 return None
7289
7390 def has_da_config (self ):
7491 """Check if user has configured DirectAdmin settings"""
75- return all ([self .da_server , self .da_username , self .da_password_encrypted , self .da_domain ])
92+ return all ([
93+ self .da_server ,
94+ self .da_username ,
95+ self .da_password_encrypted ,
96+ self .da_domain
97+ ])
98+
99+ # ===== TOTP/2FA Management =====
76100
77101 def generate_totp_secret (self ):
78- secret = base64 .b32encode (os .urandom (10 )).decode ('utf-8' )
102+ """Generate a new TOTP secret for 2FA"""
103+ # Use pyotp's random_base32 for proper secret generation
104+ secret = pyotp .random_base32 ()
79105 self .totp_secret = secret
106+ print (f"Generated TOTP secret for user { self .username } " )
80107 return secret
81108
82109 def verify_totp (self , token ):
110+ """Verify a TOTP token"""
83111 if not self .totp_enabled or not self .totp_secret :
84- return True
85- totp = pyotp .TOTP (self .totp_secret )
86- return totp .verify (token , valid_window = 1 )
112+ return True # If 2FA not enabled, always pass
113+
114+ if not token :
115+ return False
116+
117+ try :
118+ totp = pyotp .TOTP (self .totp_secret )
119+ # Allow 1 window before/after (30 seconds tolerance)
120+ valid = totp .verify (token , valid_window = 1 )
121+ print (f"TOTP verification for { self .username } : { valid } " )
122+ return valid
123+ except Exception as e :
124+ print (f"TOTP verification error for { self .username } : { e } " )
125+ return False
87126
88127 def get_totp_uri (self ):
128+ """Get provisioning URI for QR code generation"""
129+ if not self .totp_secret :
130+ raise ValueError ("No TOTP secret set" )
131+
89132 return pyotp .totp .TOTP (self .totp_secret ).provisioning_uri (
90133 name = self .username ,
91134 issuer_name = 'DirectAdmin Email Forwarder'
92135 )
93136
137+ def get_totp_qr_uri (self ):
138+ """Alias for get_totp_uri for compatibility"""
139+ return self .get_totp_uri ()
140+
141+ # ===== Utility Methods =====
142+
94143 def to_dict (self ):
144+ """Convert user to dictionary (for API responses)"""
95145 return {
96146 'id' : self .id ,
97147 'username' : self .username ,
98148 'is_admin' : self .is_admin ,
99149 'totp_enabled' : self .totp_enabled ,
100150 'created_at' : self .created_at .isoformat () if self .created_at else None ,
101151 'last_login' : self .last_login .isoformat () if self .last_login else None ,
102- 'has_da_config' : self .has_da_config ()
152+ 'has_da_config' : self .has_da_config (),
153+ 'da_server' : self .da_server ,
154+ 'da_username' : self .da_username ,
155+ 'da_domain' : self .da_domain
156+ # Never include passwords or secrets in dict!
103157 }
158+
159+ def update_last_login (self ):
160+ """Update last login timestamp"""
161+ self .last_login = datetime .utcnow ()
162+
163+ def reset_totp (self ):
164+ """Reset/disable TOTP for this user"""
165+ self .totp_enabled = False
166+ self .totp_secret = None
0 commit comments