Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions CCS_Detailed_Design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Central Communication Server (CCS) - Detailed Design

## 1. Introduction

This document outlines the detailed design for the Central Communication Server (CCS). The CCS is a core component responsible for managing real-time communication, user authentication, presence, and message persistence.

## 2. Technology Stack

* **Programming Language:** Python 3.9+
* **Web Framework:** FastAPI
* **Asynchronous Server Gateway Interface (ASGI):** Uvicorn
* **Real-time Communication Protocol:** WebSockets
* **Database:** PostgreSQL
* **Authentication:** JSON Web Tokens (JWT)
* **Caching (Optional, for future scalability):** Redis (e.g., for presence status, session management)

## 3. System Architecture Overview

The CCS will expose WebSocket endpoints for real-time communication and HTTP endpoints for authentication and user management. It will interact with the PostgreSQL database for persistent storage.

```
+-------------------+ +------------------------+ +-------------------+
| Clients |<---->| CCS |<---->| PostgreSQL |
| (Web, Mobile, CLI)| | (FastAPI, WebSockets) | | Database |
+-------------------+ +------------------------+ +-------------------+
|
| (Future Enhancement)
|
v
+-------+
| Redis |
+-------+
```

## 4. Module Structure

The CCS will be organized into the following primary modules:

* **`main.py`**: Entry point of the application. Initializes FastAPI app, database connections, and routes.
* **`auth/`**: Handles user authentication and JWT management.
* `auth_service.py`: Logic for user registration, login, password hashing, JWT generation and validation.
* `auth_routes.py`: HTTP API endpoints for `/register`, `/login`.
* **`users/`**: Manages user profiles and presence.
* `user_models.py`: Pydantic models for user data.
* `user_service.py`: Logic for fetching user details, updating profiles.
* `presence_service.py`: Manages user online/offline status and broadcasts updates.
* **`messaging/`**: Handles real-time message routing and persistence.
* `connection_manager.py`: Manages active WebSocket connections. Stores connections per user or per group.
* `message_router.py`: Routes incoming messages to appropriate recipients (one-to-one, group).
* `message_service.py`: Handles storage and retrieval of messages from the database.
* `websocket_routes.py`: WebSocket endpoints for establishing connections and message exchange.
* **`database/`**: Manages database interactions.
* `db_config.py`: Database connection settings.
* `db_models.py`: SQLAlchemy ORM models for database tables.
* `crud.py`: Create, Read, Update, Delete operations for database models.
* **`core/`**: Core utilities and configurations.
* `config.py`: Application settings (e.g., JWT secret, database URL).
* `security.py`: Password hashing utilities.
* **`tests/`**: Unit and integration tests for all modules.

## 5. Key Classes and Functions

### 5.1. `auth/auth_service.py`

* `class AuthService`:
* `async def register_user(user_data: UserCreateSchema) -> UserSchema`: Registers a new user. Hashes password. Stores in DB.
* `async def authenticate_user(username: str, password: str) -> Optional[UserSchema]`: Authenticates a user.
* `def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str`: Creates a JWT.
* `async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserSchema`: Decodes JWT and retrieves user.

### 5.2. `users/presence_service.py`

* `class PresenceService`:
* `active_users: set[str] = set()`: Stores user IDs of currently online users.
* `async def user_connected(user_id: str)`: Marks user as online, broadcasts presence.
* `async def user_disconnected(user_id: str)`: Marks user as offline, broadcasts presence.
* `async def broadcast_presence_update(user_id: str, status: str)`

### 5.3. `messaging/connection_manager.py`

* `class ConnectionManager`:
* `active_connections: dict[str, WebSocket] = {}`: Maps user_id to WebSocket connection.
* `async def connect(user_id: str, websocket: WebSocket)`: Accepts and stores a new connection.
* `def disconnect(user_id: str)`: Removes a connection.
* `async def send_personal_message(message: str, user_id: str)`
* `async def broadcast(message: str)`: Sends a message to all connected clients (e.g., for system-wide announcements or group chats if not handled separately).
* `async def send_to_group(group_id: str, message: str)`: (If group chat is implemented) Sends message to all members of a group.

### 5.4. `messaging/message_router.py`

* `class MessageRouter`:
* `def __init__(self, connection_manager: ConnectionManager, message_service: MessageService)`
* `async def route_message(sender_id: str, raw_message: dict)`: Parses message type (e.g., one-to-one, group, system), validates, and forwards to `ConnectionManager` or `MessageService`.

### 5.5. `messaging/message_service.py`

* `class MessageService`:
* `async def store_message(sender_id: str, recipient_id: str, content: str, timestamp: datetime) -> MessageSchema`: Stores a message in the database.
* `async def get_message_history(user_id1: str, user_id2: str, limit: int = 100, offset: int = 0) -> list[MessageSchema]`: Retrieves chat history between two users.
* `async def get_group_message_history(group_id: str, limit: int = 100, offset: int = 0) -> list[MessageSchema]`: Retrieves group message history.

## 6. Database Schema (PostgreSQL)

* **`users` table:**
* `id`: SERIAL PRIMARY KEY
* `username`: VARCHAR(50) UNIQUE NOT NULL
* `email`: VARCHAR(100) UNIQUE NOT NULL
* `hashed_password`: VARCHAR(255) NOT NULL
* `full_name`: VARCHAR(100)
* `created_at`: TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
* `last_login_at`: TIMESTAMP WITH TIME ZONE

* **`messages` table:**
* `id`: SERIAL PRIMARY KEY
* `sender_id`: INTEGER REFERENCES `users`(`id`) NOT NULL
* `recipient_id`: INTEGER REFERENCES `users`(`id`) NULL (for one-to-one messages)
* `group_id`: INTEGER REFERENCES `groups`(`id`) NULL (for group messages)
* `content`: TEXT NOT NULL
* `sent_at`: TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
* `is_read`: BOOLEAN DEFAULT FALSE

* **`groups` table:** (For group chat functionality)
* `id`: SERIAL PRIMARY KEY
* `name`: VARCHAR(100) NOT NULL
* `description`: TEXT
* `created_by`: INTEGER REFERENCES `users`(`id`) NOT NULL
* `created_at`: TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP

* **`group_members` table:** (Many-to-many relationship between users and groups)
* `id`: SERIAL PRIMARY KEY
* `user_id`: INTEGER REFERENCES `users`(`id`) NOT NULL
* `group_id`: INTEGER REFERENCES `groups`(`id`) NOT NULL
* `joined_at`: TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
* UNIQUE (`user_id`, `group_id`)

* **Constraints/Indexes:**
* Indexes on `users.username`, `users.email`.
* Indexes on `messages.sender_id`, `messages.recipient_id`, `messages.group_id`, `messages.sent_at`.
* Indexes on `groups.name`.
* Foreign key constraints as defined above.

## 7. Error Handling Strategy

* **HTTP API Endpoints:**
* Use standard HTTP status codes (e.g., 200 OK, 201 Created, 400 Bad Request, 401 Unauthorized, 403 Forbidden, 404 Not Found, 500 Internal Server Error).
* FastAPI's `HTTPException` will be used for standard error responses.
* Response body for errors: `{"detail": "Error message or description"}`.
* **WebSocket Communication:**
* Define a standard message format for errors, e.g., `{"type": "error", "payload": {"code": <error_code>, "message": "<description>"}}`.
* `1xxx` series WebSocket close codes will be used where appropriate.
* Examples of error codes:
* `1001`: Authentication failed
* `1002`: Invalid message format
* `1003`: Target user offline (if not queuing messages)
* `1004`: Rate limit exceeded
* **Server-Side Errors:**
* All unexpected errors will be caught at a global level and logged.
* A generic error message will be sent to the client to avoid exposing sensitive details.

## 8. Logging Strategy

* **Library:** Standard Python `logging` module, configured by FastAPI/Uvicorn.
* **Log Levels:**
* `DEBUG`: Detailed information, typically of interest only when diagnosing problems. (e.g., raw incoming/outgoing messages, connection attempts).
* `INFO`: Confirmation that things are working as expected. (e.g., user login, message sent, server startup).
* `WARNING`: An indication that something unexpected happened, or indicative of some problem in the near future (e.g., 'disk space low'). (e.g., failed login attempt, message delivery retry).
* `ERROR`: Due to a more serious problem, the software has not been able to perform some function. (e.g., database connection failure, unhandled exception in a request).
* `CRITICAL`: A serious error, indicating that the program itself may be unable to continue running.
* **Log Format:**
* `%(asctime)s - %(name)s - %(levelname)s - %(module)s.%(funcName)s:%(lineno)d - %(message)s`
* Example: `2023-10-27 10:00:00,000 - uvicorn.access - INFO - main.handle_request:123 - GET /users/me HTTP/1.1 200 OK`
* **Log Output:**
* Console (stdout/stderr) during development.
* File-based logging in production (e.g., `/var/log/ccs/ccs.log`) with log rotation.
* **Key Information to Log:**
* Application startup and shutdown.
* Incoming requests (HTTP and WebSocket connections) with relevant metadata (IP, user_id if authenticated).
* Authentication successes and failures.
* Message processing details (sender, receiver/group, timestamp) - potentially at DEBUG level for content.
* Database queries (optional, can be verbose, usually enabled at DEBUG level).
* All errors and exceptions with stack traces.
* Presence updates (user connected/disconnected).

## 9. Security Considerations (Initial Thoughts)

* **Input Validation:** All incoming data (HTTP request bodies, WebSocket messages) will be strictly validated using Pydantic models.
* **Password Hashing:** `passlib` library with a strong hashing algorithm (e.g., bcrypt, Argon2).
* **JWT Security:**
* Use HTTPS for all communication.
* Strong, secret key for JWT signing.
* Short-lived access tokens, implement refresh token mechanism if needed.
* **WebSocket Security:**
* `wss://` (WebSocket Secure) in production.
* Authenticate WebSocket connections promptly after establishment.
* **Rate Limiting:** Consider implementing rate limiting on API endpoints and WebSocket messages to prevent abuse.
* **Dependency Management:** Keep dependencies up-to-date to patch known vulnerabilities.

## 10. Scalability Considerations (Initial Thoughts)

* **Statelessness:** Design services to be as stateless as possible to allow horizontal scaling. User session/connection info might need a shared store (e.g., Redis) if scaling beyond one server instance.
* **Asynchronous Operations:** Leverage Python's `asyncio` and FastAPI's async capabilities to handle many concurrent connections efficiently.
* **Database Optimization:** Proper indexing, connection pooling. Consider read replicas for the database in the future.
* **Load Balancing:** A load balancer will be needed if deploying multiple CCS instances.
* **Message Queues (Advanced):** For very high throughput or to decouple services further, a message queue (e.g., RabbitMQ, Kafka) could be introduced between message reception and processing/delivery.

This document provides a foundational design. Further details will be elaborated during the implementation phase of each module.
Binary file added ccs/__pycache__/main.cpython-312.pyc
Binary file not shown.
Binary file added ccs/core/__pycache__/config.cpython-312.pyc
Binary file not shown.
26 changes: 26 additions & 0 deletions ccs/core/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
APP_NAME: str = "Central Communication Server"
DEBUG: bool = True # Set to True for development to enable init_db()
SECRET_KEY: str = "your-secret-key" # CHANGE THIS!
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

# Corrected DATABASE_URL with a placeholder numeric port
DATABASE_URL: str = "postgresql://user:password@host:5432/dbname"

# Optional Redis settings for caching/presence
REDIS_HOST: Optional[str] = None
REDIS_PORT: int = 6379

class Config:
env_file = ".env" # If you want to use a .env file
env_file_encoding = 'utf-8'

settings = Settings()

# Example usage:
# from ccs.core.config import settings
# print(settings.DATABASE_URL)
23 changes: 23 additions & 0 deletions ccs/core/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from passlib.context import CryptContext

# Use bcrypt for password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class PasswordSecurity:
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)

@staticmethod
def get_password_hash(password: str) -> str:
"""Hashes a plain password."""
return pwd_context.hash(password)

# Example Usage:
# from ccs.core.security import PasswordSecurity
#
# hashed_pw = PasswordSecurity.get_password_hash("mysecretpassword")
# print(f"Hashed: {hashed_pw}")
# print(f"Verification successful: {PasswordSecurity.verify_password('mysecretpassword', hashed_pw)}")
# print(f"Verification failure: {PasswordSecurity.verify_password('wrongpassword', hashed_pw)}")
Binary file not shown.
Binary file not shown.
102 changes: 102 additions & 0 deletions ccs/database/crud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update as sqlalchemy_update, delete as sqlalchemy_delete
from typing import Any, Dict, Generic, List, Optional, Type, TypeVar, Union

from ccs.database.db_models import Base

ModelType = TypeVar("ModelType", bound=Base)

class CRUDBase(Generic[ModelType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).

**Parameters**

* `model`: A SQLAlchemy model class
"""
self.model = model

async def get(self, db: AsyncSession, id: Any) -> Optional[ModelType]:
statement = select(self.model).where(self.model.id == id)
result = await db.execute(statement)
return result.scalar_one_or_none()

async def get_multi(
self, db: AsyncSession, *, skip: int = 0, limit: int = 100
) -> List[ModelType]:
statement = select(self.model).offset(skip).limit(limit)
result = await db.execute(statement)
return result.scalars().all()

async def create(self, db: AsyncSession, *, obj_in: Dict[str, Any]) -> ModelType:
db_obj = self.model(**obj_in)
db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj

async def update(
self, db: AsyncSession, *, db_obj: ModelType, obj_in: Union[Dict[str, Any]]
) -> ModelType:
if isinstance(obj_in, dict):
update_data = obj_in
else: # Pydantic model
update_data = obj_in.model_dump(exclude_unset=True)

for field, value in update_data.items():
setattr(db_obj, field, value)

db.add(db_obj)
await db.commit()
await db.refresh(db_obj)
return db_obj

async def remove(self, db: AsyncSession, *, id: int) -> Optional[ModelType]:
obj = await self.get(db, id=id)
if obj:
await db.delete(obj)
await db.commit()
return obj

# Example of how to use it for a specific model:
# from .db_models import User
# from .schemas import UserCreate, UserUpdate # Assuming you have Pydantic schemas
#
# class CRUDUser(CRUDBase[User]):
# async def get_by_username(self, db: AsyncSession, *, username: str) -> Optional[User]:
# statement = select(self.model).where(self.model.username == username)
# result = await db.execute(statement)
# return result.scalar_one_or_none()
#
# # Add more specific methods for the User model here
#
# user_crud = CRUDUser(User)

# This file will contain CRUD (Create, Read, Update, Delete) operations
# for your SQLAlchemy models.
# For each model, you might have a class that inherits from CRUDBase
# or implements its own specific CRUD methods.
#
# Example:
# from .db_models import User
# from .schemas import UserCreateSchema, UserUpdateSchema # You'll need Pydantic schemas
#
# async def get_user(db: AsyncSession, user_id: int):
# return await db.get(User, user_id)
#
# async def create_user(db: AsyncSession, user: UserCreateSchema):
# db_user = User(username=user.username, email=user.email, hashed_password=user.hashed_password)
# db.add(db_user)
# await db.commit()
# await db.refresh(db_user)
# return db_user
#
# ... and so on for update, delete, and other specific queries.
#
# The CRUDBase class provides a generic way to handle most common operations.
# Specific CRUD classes for each model can inherit from it and add model-specific methods.
# For example, `class CRUDUser(CRUDBase[User]): ...`
# This helps in keeping the database interaction logic organized and reusable.
Loading