-
Notifications
You must be signed in to change notification settings - Fork 12
API‐Reference
TheRealPourya edited this page Jul 22, 2025
·
1 revision
Complete API documentation for developers
Data class representing a Telegram account
@dataclass
class Account:
"""Represents a Telegram account configuration."""
name: str
api_id: int
api_hash: str
phone: str
enabled: bool = True
proxy_type: Optional[str] = None
proxy_addr: Optional[str] = None
client: Optional[TelegramClient] = None
is_flood_waited: bool = False
flood_wait_until: Optional[datetime] = None
last_used: Optional[datetime] = None
total_checked: int = 0
session_file: Optional[str] = None| Attribute | Type | Description |
|---|---|---|
name |
str |
Human-readable account name |
api_id |
int |
Telegram API ID |
api_hash |
str |
Telegram API hash |
phone |
str |
Phone number associated with account |
enabled |
bool |
Whether account is enabled for use |
proxy_type |
Optional[str] |
Proxy type (socks5, socks4, http) |
proxy_addr |
Optional[str] |
Proxy address (host:port) |
client |
Optional[TelegramClient] |
Telethon client instance |
is_flood_waited |
bool |
Current FloodWait status |
flood_wait_until |
Optional[datetime] |
When FloodWait expires |
last_used |
Optional[datetime] |
Last usage timestamp |
total_checked |
int |
Total usernames checked |
session_file |
Optional[str] |
Session file path |
def is_available(self) -> bool:
"""Check if account is available for use.
Returns:
bool: True if account is enabled and not flood waited
"""
if not self.enabled:
return False
if self.is_flood_waited and self.flood_wait_until:
return datetime.now() >= self.flood_wait_until
return True
def set_flood_wait(self, seconds: int) -> None:
"""Set FloodWait status for specified duration.
Args:
seconds: Duration of FloodWait in seconds
"""
self.is_flood_waited = True
self.flood_wait_until = datetime.now() + timedelta(seconds=seconds + 30) # 30s buffer
def clear_flood_wait(self) -> None:
"""Clear FloodWait status."""
self.is_flood_waited = False
self.flood_wait_until = None
def update_usage(self) -> None:
"""Update usage statistics."""
self.last_used = datetime.now()
self.total_checked += 1Manages username distribution across multiple accounts
class UsernameDistributor:
"""Distributes usernames across multiple accounts for parallel processing."""
def __init__(self, usernames: List[str]):
"""Initialize distributor with username list.
Args:
usernames: List of usernames to check
"""
self.username_queue = asyncio.Queue()
self.results = {
'available': [],
'taken': [],
'errors': []
}
self.total_usernames = len(usernames)
self.processed_count = 0
self.lock = asyncio.Lock()
# Populate queue
for username in usernames:
self.username_queue.put_nowait(username)async def get_username(self) -> Optional[str]:
"""Get next username from queue.
Returns:
Optional[str]: Next username or None if queue is empty
"""
try:
return await asyncio.wait_for(self.username_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
return None
async def return_username(self, username: str) -> None:
"""Return username to queue for retry.
Args:
username: Username to return to queue
"""
await self.username_queue.put(username)
async def add_result(self, username: str, status: str, account_name: str = "") -> None:
"""Add result to appropriate list.
Args:
username: Checked username
status: Result status ('available', 'taken', 'error')
account_name: Name of account that checked username
"""
async with self.lock:
if status == 'available':
self.results['available'].append({
'username': username,
'account': account_name,
'timestamp': datetime.now().isoformat()
})
elif status == 'taken':
self.results['taken'].append({
'username': username,
'account': account_name,
'timestamp': datetime.now().isoformat()
})
else:
self.results['errors'].append({
'username': username,
'account': account_name,
'error': status,
'timestamp': datetime.now().isoformat()
})
self.processed_count += 1
def get_progress(self) -> Dict[str, Any]:
"""Get current progress statistics.
Returns:
Dict containing progress information
"""
return {
'total': self.total_usernames,
'processed': self.processed_count,
'remaining': self.total_usernames - self.processed_count,
'percentage': (self.processed_count / self.total_usernames) * 100 if self.total_usernames > 0 else 0,
'available_count': len(self.results['available']),
'taken_count': len(self.results['taken']),
'error_count': len(self.results['errors'])
}Manages multiple Telegram accounts
class AccountManager:
"""Manages multiple Telegram accounts for username checking."""
def __init__(self, accounts: List[Account]):
"""Initialize account manager.
Args:
accounts: List of Account objects to manage
"""
self.accounts = accounts
self.active_accounts: List[Account] = []
self.stats = {
'total_checked': 0,
'start_time': datetime.now(),
'accounts_used': set()
}async def initialize_accounts(self) -> None:
"""Initialize all enabled accounts."""
for account in self.accounts:
if account.enabled:
try:
await self._setup_account(account)
self.active_accounts.append(account)
print_success(f"✅ Account '{account.name}' initialized successfully")
except Exception as e:
print_error(f"❌ Failed to initialize account '{account.name}': {e}")
async def get_available_account(self) -> Optional[Account]:
"""Get an available account for username checking.
Returns:
Optional[Account]: Available account or None if all are busy
"""
available_accounts = [acc for acc in self.active_accounts if acc.is_available()]
if not available_accounts:
return None
# Return least recently used account
return min(available_accounts, key=lambda acc: acc.last_used or datetime.min)
async def handle_flood_wait(self, account: Account, seconds: int) -> None:
"""Handle FloodWait for specific account.
Args:
account: Account that received FloodWait
seconds: FloodWait duration in seconds
"""
account.set_flood_wait(seconds)
print_warning(f"⏳ Account '{account.name}' flood waited for {seconds} seconds")
# Log FloodWait event
logging.warning(f"Account {account.name} flood waited for {seconds} seconds")
def get_account_status(self) -> List[Dict[str, Any]]:
"""Get status of all accounts.
Returns:
List of dictionaries containing account status information
"""
status_list = []
for account in self.accounts:
status = {
'name': account.name,
'enabled': account.enabled,
'is_flood_waited': account.is_flood_waited,
'flood_wait_until': account.flood_wait_until.isoformat() if account.flood_wait_until else None,
'last_used': account.last_used.isoformat() if account.last_used else None,
'total_checked': account.total_checked,
'is_available': account.is_available(),
'has_proxy': bool(account.proxy_type and account.proxy_addr)
}
status_list.append(status)
return status_list
def get_performance_stats(self) -> Dict[str, Any]:
"""Get performance statistics.
Returns:
Dictionary containing performance metrics
"""
runtime = datetime.now() - self.stats['start_time']
runtime_hours = runtime.total_seconds() / 3600
return {
'total_checked': self.stats['total_checked'],
'runtime_seconds': runtime.total_seconds(),
'runtime_hours': runtime_hours,
'usernames_per_hour': self.stats['total_checked'] / runtime_hours if runtime_hours > 0 else 0,
'accounts_used': len(self.stats['accounts_used']),
'active_accounts': len(self.active_accounts),
'total_accounts': len(self.accounts)
}def print_success(message: str) -> None:
"""Print success message in green.
Args:
message: Message to display
"""
print(f"\033[92m{message}\033[0m")
def print_error(message: str) -> None:
"""Print error message in red.
Args:
message: Error message to display
"""
print(f"\033[91m{message}\033[0m")
def print_warning(message: str) -> None:
"""Print warning message in yellow.
Args:
message: Warning message to display
"""
print(f"\033[93m{message}\033[0m")
def print_info(message: str) -> None:
"""Print info message in blue.
Args:
message: Info message to display
"""
print(f"\033[94m{message}\033[0m")
def clear_screen() -> None:
"""Clear the terminal screen."""
os.system('cls' if os.name == 'nt' else 'clear')
def print_banner() -> None:
"""Display the application banner."""
banner = """
╔══════════════════════════════════════════════════════════════╗
║ ║
║ TELEGRAM USERNAME CHECKER PLUS v2.0 ║
║ ║
║ 🚀 Multi-Account Support ║
║ ⚡ Smart FloodWait Management ║
║ 🎯 High Performance Checking ║
║ ║
╚══════════════════════════════════════════════════════════════╝
"""
print_info(banner)def load_accounts() -> List[Account]:
"""Load accounts from configuration file.
Returns:
List[Account]: List of configured accounts
Raises:
FileNotFoundError: If config.ini is not found
ValueError: If configuration is invalid
"""
config = configparser.ConfigParser()
if not os.path.exists('config.ini'):
raise FileNotFoundError("config.ini not found. Please create it first.")
config.read('config.ini')
accounts = []
for section_name in config.sections():
if section_name.startswith('account'):
try:
account = Account(
name=config.get(section_name, 'name'),
api_id=config.getint(section_name, 'api_id'),
api_hash=config.get(section_name, 'api_hash'),
phone=config.get(section_name, 'phone'),
enabled=config.getboolean(section_name, 'enabled', fallback=True),
proxy_type=config.get(section_name, 'proxy_type', fallback=None),
proxy_addr=config.get(section_name, 'proxy_addr', fallback=None)
)
# Set session file path
account.session_file = f"sessions/{account.name}"
accounts.append(account)
except Exception as e:
print_error(f"Error loading account {section_name}: {e}")
continue
if not accounts:
raise ValueError("No valid accounts found in configuration")
return accounts
def load_config() -> Dict[str, Any]:
"""Load general configuration settings.
Returns:
Dict[str, Any]: Configuration dictionary
"""
config = configparser.ConfigParser()
config.read('config.ini')
settings = {
'max_workers': 5,
'request_delay': 3,
'retry_attempts': 3,
'timeout': 30,
'save_sessions': True,
'log_level': 'INFO'
}
if 'settings' in config:
settings.update({
'max_workers': config.getint('settings', 'max_workers', fallback=5),
'request_delay': config.getint('settings', 'request_delay', fallback=3),
'retry_attempts': config.getint('settings', 'retry_attempts', fallback=3),
'timeout': config.getint('settings', 'timeout', fallback=30),
'save_sessions': config.getboolean('settings', 'save_sessions', fallback=True),
'log_level': config.get('settings', 'log_level', fallback='INFO')
})
return settingsdef load_usernames(filename: str = 'usernames.txt') -> List[str]:
"""Load usernames from file.
Args:
filename: Path to usernames file
Returns:
List[str]: List of usernames to check
Raises:
FileNotFoundError: If usernames file is not found
"""
if not os.path.exists(filename):
raise FileNotFoundError(f"Usernames file '{filename}' not found")
with open(filename, 'r', encoding='utf-8') as f:
usernames = [line.strip() for line in f if line.strip()]
# Remove duplicates while preserving order
seen = set()
unique_usernames = []
for username in usernames:
if username not in seen:
seen.add(username)
unique_usernames.append(username)
return unique_usernames
def save_results(results: Dict[str, List], output_dir: str = '.') -> None:
"""Save checking results to files.
Args:
results: Dictionary containing available, taken, and error lists
output_dir: Directory to save output files
"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# Save available usernames
if results['available']:
available_file = os.path.join(output_dir, f'available_{timestamp}.txt')
with open(available_file, 'w', encoding='utf-8') as f:
for item in results['available']:
if isinstance(item, dict):
f.write(f"{item['username']}\n")
else:
f.write(f"{item}\n")
print_success(f"✅ Available usernames saved to {available_file}")
# Save taken usernames
if results['taken']:
taken_file = os.path.join(output_dir, f'taken_{timestamp}.txt')
with open(taken_file, 'w', encoding='utf-8') as f:
for item in results['taken']:
if isinstance(item, dict):
f.write(f"{item['username']}\n")
else:
f.write(f"{item}\n")
print_info(f"📝 Taken usernames saved to {taken_file}")
# Save errors
if results['errors']:
error_file = os.path.join(output_dir, f'errors_{timestamp}.txt')
with open(error_file, 'w', encoding='utf-8') as f:
for item in results['errors']:
if isinstance(item, dict):
f.write(f"{item['username']}: {item.get('error', 'Unknown error')}\n")
else:
f.write(f"{item}\n")
print_warning(f"⚠️ Errors saved to {error_file}")[settings]
max_workers = 5
request_delay = 3
retry_attempts = 3
timeout = 30
save_sessions = true
log_level = INFO
[account1]
name = Main Account
api_id = 12345678
api_hash = abcdef1234567890abcdef1234567890
phone = +1234567890
enabled = true
proxy_type = socks5
proxy_addr = 127.0.0.1:1080
[account2]
name = Secondary Account
api_id = 87654321
api_hash = 0987654321fedcba0987654321fedcba
phone = +0987654321
enabled = true| Parameter | Type | Default | Description |
|---|---|---|---|
max_workers |
int |
5 |
Maximum concurrent workers |
request_delay |
int |
3 |
Delay between requests (seconds) |
retry_attempts |
int |
3 |
Number of retry attempts |
timeout |
int |
30 |
Request timeout (seconds) |
save_sessions |
bool |
true |
Save Telegram sessions |
log_level |
str |
INFO |
Logging level (DEBUG, INFO, WARNING, ERROR) |
| Parameter | Type | Required | Description |
|---|---|---|---|
name |
str |
✅ | Human-readable account name |
api_id |
int |
✅ | Telegram API ID |
api_hash |
str |
✅ | Telegram API hash |
phone |
str |
✅ | Phone number |
enabled |
bool |
❌ | Enable/disable account (default: true) |
proxy_type |
str |
❌ | Proxy type (socks5, socks4, http) |
proxy_addr |
str |
❌ | Proxy address (host:port) |
class TelegramUsernameCheckerError(Exception):
"""Base exception for Telegram Username Checker."""
pass
class AccountError(TelegramUsernameCheckerError):
"""Account-related errors."""
pass
class ConfigurationError(TelegramUsernameCheckerError):
"""Configuration-related errors."""
pass
class NetworkError(TelegramUsernameCheckerError):
"""Network-related errors."""
pass
class FloodWaitError(TelegramUsernameCheckerError):
"""FloodWait-related errors."""
def __init__(self, seconds: int, account_name: str = ""):
self.seconds = seconds
self.account_name = account_name
super().__init__(f"FloodWait for {seconds} seconds on account {account_name}")async def check_username_safe(account: Account, username: str) -> Dict[str, Any]:
"""Safely check username with comprehensive error handling.
Args:
account: Account to use for checking
username: Username to check
Returns:
Dict containing result and status information
"""
try:
result = await check_username(account, username)
return {
'username': username,
'status': 'available' if result else 'taken',
'account': account.name,
'error': None
}
except FloodWaitError as e:
account.set_flood_wait(e.seconds)
return {
'username': username,
'status': 'flood_wait',
'account': account.name,
'error': f"FloodWait: {e.seconds} seconds"
}
except NetworkError as e:
return {
'username': username,
'status': 'network_error',
'account': account.name,
'error': str(e)
}
except Exception as e:
logging.error(f"Unexpected error checking {username} with {account.name}: {e}")
return {
'username': username,
'status': 'error',
'account': account.name,
'error': str(e)
}import asyncio
from checker import load_accounts, AccountManager, UsernameDistributor
async def main():
# Load accounts from configuration
accounts = load_accounts()
# Initialize account manager
manager = AccountManager(accounts)
await manager.initialize_accounts()
# Load usernames to check
usernames = ['test1', 'test2', 'test3']
distributor = UsernameDistributor(usernames)
# Check usernames
tasks = []
for _ in range(min(len(accounts), 5)): # Max 5 workers
task = asyncio.create_task(username_worker(manager, distributor))
tasks.append(task)
# Wait for completion
await asyncio.gather(*tasks)
# Get results
results = distributor.results
print(f"Available: {len(results['available'])}")
print(f"Taken: {len(results['taken'])}")
print(f"Errors: {len(results['errors'])}")
if __name__ == "__main__":
asyncio.run(main())from checker import Account, AccountManager
# Create accounts programmatically
accounts = [
Account(
name="Account 1",
api_id=12345678,
api_hash="your_api_hash",
phone="+1234567890",
enabled=True
),
Account(
name="Account 2",
api_id=87654321,
api_hash="your_api_hash_2",
phone="+0987654321",
enabled=True,
proxy_type="socks5",
proxy_addr="127.0.0.1:1080"
)
]
# Initialize manager
manager = AccountManager(accounts)
# Get account status
status = manager.get_account_status()
for account_status in status:
print(f"Account: {account_status['name']}")
print(f"Enabled: {account_status['enabled']}")
print(f"Available: {account_status['is_available']}")
print(f"Total Checked: {account_status['total_checked']}")
print("---")import time
from checker import AccountManager
async def monitor_performance(manager: AccountManager):
"""Monitor and display performance metrics."""
while True:
stats = manager.get_performance_stats()
print(f"\rTotal Checked: {stats['total_checked']} | "
f"Rate: {stats['usernames_per_hour']:.1f}/hour | "
f"Active Accounts: {stats['active_accounts']}", end="")
await asyncio.sleep(5) # Update every 5 seconds# Install as a package (if packaged)
pip install telegram-username-checker-plus
# Or import directly
import sys
sys.path.append('/path/to/telegram-username-checker-plus')
from checker import (
Account,
AccountManager,
UsernameDistributor,
load_accounts,
check_username
)from flask import Flask, jsonify, request
from checker import AccountManager, load_accounts
app = Flask(__name__)
manager = None
@app.route('/api/accounts', methods=['GET'])
def get_accounts():
"""Get account status."""
if not manager:
return jsonify({'error': 'Manager not initialized'}), 500
status = manager.get_account_status()
return jsonify({'accounts': status})
@app.route('/api/check', methods=['POST'])
def check_usernames():
"""Check usernames via API."""
data = request.get_json()
usernames = data.get('usernames', [])
if not usernames:
return jsonify({'error': 'No usernames provided'}), 400
# Implement async checking logic here
# Return results
return jsonify({
'status': 'success',
'total': len(usernames),
'results': results
})
if __name__ == '__main__':
# Initialize manager
accounts = load_accounts()
manager = AccountManager(accounts)
app.run(debug=True)import argparse
import asyncio
from checker import main_check_usernames
def cli():
parser = argparse.ArgumentParser(description='Telegram Username Checker Plus')
parser.add_argument('--input', '-i', default='usernames.txt', help='Input file')
parser.add_argument('--output', '-o', default='.', help='Output directory')
parser.add_argument('--workers', '-w', type=int, default=5, help='Number of workers')
parser.add_argument('--config', '-c', default='config.ini', help='Config file')
args = parser.parse_args()
# Run checking with CLI arguments
asyncio.run(main_check_usernames(
input_file=args.input,
output_dir=args.output,
max_workers=args.workers,
config_file=args.config
))
if __name__ == '__main__':
cli()-
Account Management
- Use 3-5 accounts for optimal performance
- Enable accounts with good API limits
- Distribute load evenly across accounts
-
Concurrency
- Start with 5 workers, adjust based on performance
- Monitor FloodWait frequency
- Use appropriate delays between requests
-
Memory Usage
- Process usernames in batches for large lists
- Clear session data periodically
- Monitor memory consumption
-
Network Optimization
- Use reliable proxy servers
- Implement connection pooling
- Handle network timeouts gracefully
class PerformanceMonitor:
"""Monitor application performance."""
def __init__(self):
self.start_time = time.time()
self.request_count = 0
self.error_count = 0
self.flood_wait_count = 0
def record_request(self):
self.request_count += 1
def record_error(self):
self.error_count += 1
def record_flood_wait(self):
self.flood_wait_count += 1
def get_stats(self):
runtime = time.time() - self.start_time
return {
'runtime_seconds': runtime,
'requests_per_second': self.request_count / runtime if runtime > 0 else 0,
'error_rate': self.error_count / self.request_count if self.request_count > 0 else 0,
'flood_wait_rate': self.flood_wait_count / self.request_count if self.request_count > 0 else 0
}📚 Complete API documentation for building amazing integrations!