Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 206 additions & 30 deletions app/directadmin_api.py

Large diffs are not rendered by default.

277 changes: 262 additions & 15 deletions app/main.py

Large diffs are not rendered by default.

133 changes: 129 additions & 4 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,28 @@

db = SQLAlchemy()

class UserDomain(db.Model):
"""Model for storing multiple domains per user"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
domain = db.Column(db.String(255), nullable=False)
order_index = db.Column(db.Integer, nullable=False, default=0)
created_at = db.Column(db.DateTime, default=datetime.utcnow)

# Relationship back to user
user = db.relationship('User', backref=db.backref('domains', lazy=True, order_by='UserDomain.order_index'))

def __repr__(self):
return f'<UserDomain {self.domain} for user {self.user_id}>'

def to_dict(self):
return {
'id': self.id,
'domain': self.domain,
'order_index': self.order_index,
'created_at': self.created_at.isoformat() if self.created_at else None
}

class User(UserMixin, db.Model):
# Primary fields
id = db.Column(db.Integer, primary_key=True)
Expand Down Expand Up @@ -92,12 +114,114 @@ def get_da_password(self):

def has_da_config(self):
"""Check if user has configured DirectAdmin settings"""
try:
# Check if we have domains configured
domains = self.get_domains()
has_domains = len(domains) > 0
except Exception:
# If domains relationship fails, fall back to legacy da_domain
has_domains = bool(self.da_domain)

return all([
self.da_server,
self.da_username,
self.da_password_encrypted,
self.da_domain
])
self.da_password_encrypted
]) and has_domains

# ===== Domain Management =====

def get_domains(self):
"""Get all domains for this user in order"""
try:
# Try to get domains from the new multi-domain table
domains = [d.domain for d in self.domains]

# If no domains in new table but legacy domain exists, include it
if not domains and self.da_domain:
return [self.da_domain]

return domains
except Exception as e:
# If domains relationship fails (table doesn't exist or other error),
# fall back to legacy da_domain if available
if self.da_domain:
return [self.da_domain]
return []

def get_first_domain(self):
"""Get the first domain (default) for this user"""
try:
domains = self.get_domains()
if domains:
return domains[0]
# Fall back to legacy da_domain if no domains in new table
return self.da_domain
except Exception:
# If anything fails, return legacy da_domain
return self.da_domain

def add_domain(self, domain):
"""Add a new domain for this user"""
try:
# Check if domain already exists
existing = UserDomain.query.filter_by(user_id=self.id, domain=domain).first()
if existing:
return False, "Domain already exists"

# Get next order index
max_order = db.session.query(db.func.max(UserDomain.order_index)).filter_by(user_id=self.id).scalar()
next_order = (max_order or -1) + 1

# Create new domain
user_domain = UserDomain(
user_id=self.id,
domain=domain,
order_index=next_order
)

db.session.add(user_domain)
return True, "Domain added successfully"

except Exception as e:
print(f"Error adding domain {domain} for user {self.username}: {e}")
return False, f"Failed to add domain: {str(e)}"

def remove_domain(self, domain):
"""Remove a domain for this user"""
try:
user_domain = UserDomain.query.filter_by(user_id=self.id, domain=domain).first()
if not user_domain:
return False, "Domain not found"

db.session.delete(user_domain)

# Reorder remaining domains
remaining_domains = UserDomain.query.filter_by(user_id=self.id).filter(
UserDomain.order_index > user_domain.order_index
).all()

for domain_obj in remaining_domains:
domain_obj.order_index -= 1

return True, "Domain removed successfully"

except Exception as e:
print(f"Error removing domain {domain} for user {self.username}: {e}")
return False, f"Failed to remove domain: {str(e)}"

def reorder_domains(self, domain_list):
"""Reorder domains based on provided list"""
try:
for i, domain in enumerate(domain_list):
user_domain = UserDomain.query.filter_by(user_id=self.id, domain=domain).first()
if user_domain:
user_domain.order_index = i

return True, "Domains reordered successfully"

except Exception as e:
print(f"Error reordering domains for user {self.username}: {e}")
return False, f"Failed to reorder domains: {str(e)}"

# ===== TOTP/2FA Management =====

Expand Down Expand Up @@ -155,7 +279,8 @@ def to_dict(self):
'has_da_config': self.has_da_config(),
'da_server': self.da_server,
'da_username': self.da_username,
'da_domain': self.da_domain
'da_domain': self.da_domain, # Keep for backward compatibility
'domains': self.get_domains()
# Never include passwords or secrets in dict!
}

Expand Down
175 changes: 164 additions & 11 deletions app/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from flask import Blueprint, render_template, request, jsonify
from flask_login import login_required, current_user
from app.models import db
from app.models import db, UserDomain
from app.directadmin_api import DirectAdminAPI
import traceback

Expand All @@ -20,7 +20,8 @@ def get_da_config():
return jsonify({
'da_server': current_user.da_server or '',
'da_username': current_user.da_username or '',
'da_domain': current_user.da_domain or '',
'da_domain': current_user.da_domain or '', # Keep for backward compatibility
'domains': current_user.get_domains(),
'has_password': bool(current_user.da_password_encrypted),
'theme_preference': current_user.theme_preference or 'light'
})
Expand All @@ -43,8 +44,8 @@ def update_da_config():

print(f"Received data: {data}")

# Validate required fields
required_fields = ['da_server', 'da_username', 'da_domain']
# Validate required fields (domain no longer required here)
required_fields = ['da_server', 'da_username']
missing_fields = [field for field in required_fields if not data.get(field, '').strip()]

if missing_fields:
Expand All @@ -65,7 +66,10 @@ def update_da_config():
# Update settings
current_user.da_server = server_url.rstrip('/')
current_user.da_username = data['da_username'].strip()
current_user.da_domain = data['da_domain'].strip()

# Keep da_domain for backward compatibility with first domain
if data.get('da_domain'):
current_user.da_domain = data['da_domain'].strip()

# Update password if provided
if data.get('da_password'):
Expand Down Expand Up @@ -96,33 +100,182 @@ def update_da_config():
def test_connection():
"""Test DirectAdmin connection"""
try:
print(f"\n=== Test Connection Request Received ===")
data = request.get_json()
print(f"Request data: {data}")

# Use provided or saved credentials
server = data.get('da_server') or current_user.da_server
username = data.get('da_username') or current_user.da_username
password = data.get('da_password') or current_user.get_da_password()

# Get the first configured domain for testing, if any
user_domains = current_user.get_domains()
domain = user_domains[0] if user_domains else None

print(f"Test connection with server: {server}, username: {username}, domain: {domain}")

if not all([server, username, password]):
return jsonify({'error': 'Missing credentials'}), 400
print(f"Missing credentials - server: {bool(server)}, username: {bool(username)}, password: {bool(password)}")
return jsonify({'error': 'Missing credentials', 'success': False}), 200

# Ensure proper URL format
if not server.startswith(('http://', 'https://')):
server = 'https://' + server

# Test connection
api = DirectAdminAPI(server, username, password)
# Test connection with domain if available
print(f"Creating DirectAdminAPI instance...")
api = DirectAdminAPI(server, username, password, domain)

print(f"Calling test_connection()...")
success, message = api.test_connection()
print(f"Test connection result: success={success}, message={message}")

return jsonify({
result = {
'success': success,
'message': message
}
print(f"Sending response: {result}")
return jsonify(result)

except Exception as e:
error_msg = str(e)
print(f"Test connection error: {error_msg}")
print(traceback.format_exc())

# Provide more specific error messages
if 'timeout' in error_msg.lower():
error_msg = 'Connection timed out. Please check your DirectAdmin server URL and network connection.'
elif 'connection' in error_msg.lower():
error_msg = 'Unable to connect to DirectAdmin server. Please verify the server URL is correct.'
elif 'ssl' in error_msg.lower() or 'certificate' in error_msg.lower():
error_msg = 'SSL certificate error. Try using HTTP instead of HTTPS, or check your certificate configuration.'
else:
error_msg = f'Connection test failed: {error_msg}'

return jsonify({'error': error_msg, 'success': False}), 200

@settings_bp.route('/api/domains', methods=['GET'])
@login_required
def get_domains():
"""Get all domains for the current user"""
try:
domains = current_user.get_domains()
return jsonify({
'success': True,
'domains': domains
})
except Exception as e:
print(f"Error getting domains: {str(e)}")
print(traceback.format_exc())
return jsonify({'error': 'An internal error has occurred.'}), 500

@settings_bp.route('/api/domains', methods=['POST'])
@login_required
def add_domain():
"""Add a new domain for the current user"""
try:
data = request.get_json()
if not data or not data.get('domain'):
return jsonify({'error': 'Domain is required'}), 400

domain = data['domain'].strip()
if not domain:
return jsonify({'error': 'Domain cannot be empty'}), 400

# Basic domain validation
if not '.' in domain or ' ' in domain:
return jsonify({'error': 'Invalid domain format'}), 400

success, message = current_user.add_domain(domain)

if success:
# Update da_domain if this is the first domain (backward compatibility)
if not current_user.da_domain:
current_user.da_domain = domain

db.session.commit()
return jsonify({
'success': True,
'message': message,
'domains': current_user.get_domains()
})
else:
return jsonify({'error': message}), 400

except Exception as e:
print(f"Test connection error: {str(e)}")
print(f"Error adding domain: {str(e)}")
print(traceback.format_exc())
return jsonify({'error': 'An internal error has occurred.', 'success': False}), 200
db.session.rollback()
return jsonify({'error': 'An internal error has occurred.'}), 500

@settings_bp.route('/api/domains', methods=['DELETE'])
@login_required
def remove_domain():
"""Remove a domain for the current user"""
try:
data = request.get_json()
if not data or not data.get('domain'):
return jsonify({'error': 'Domain is required'}), 400

domain = data['domain'].strip()
success, message = current_user.remove_domain(domain)

if success:
# Update da_domain if we removed the current one
if current_user.da_domain == domain:
first_domain = current_user.get_first_domain()
current_user.da_domain = first_domain

db.session.commit()
return jsonify({
'success': True,
'message': message,
'domains': current_user.get_domains()
})
else:
return jsonify({'error': message}), 400

except Exception as e:
print(f"Error removing domain: {str(e)}")
print(traceback.format_exc())
db.session.rollback()
return jsonify({'error': 'An internal error has occurred.'}), 500

@settings_bp.route('/api/domains/reorder', methods=['POST'])
@login_required
def reorder_domains():
"""Reorder domains for the current user"""
try:
data = request.get_json()
if not data or not data.get('domains'):
return jsonify({'error': 'Domains list is required'}), 400

domains = data['domains']
if not isinstance(domains, list):
return jsonify({'error': 'Domains must be a list'}), 400

success, message = current_user.reorder_domains(domains)

if success:
# Update da_domain to the first domain (backward compatibility)
if domains:
current_user.da_domain = domains[0]

db.session.commit()
return jsonify({
'success': True,
'message': message,
'domains': current_user.get_domains()
})
else:
return jsonify({'error': message}), 400

except Exception as e:
print(f"Error reordering domains: {str(e)}")
print(traceback.format_exc())
db.session.rollback()
return jsonify({'error': 'An internal error has occurred.'}), 500

@settings_bp.route('/api/theme', methods=['POST'])
@login_required
Expand Down
Loading
Loading