A scalable, production-ready backend service for processing orders using an event-driven architecture. This service demonstrates expertise in Python/FastAPI, REST APIs, Apache Kafka, PostgreSQL, Redis, Docker, Kubernetes, and DevOps practices.
This service provides:
- REST API for creating and retrieving orders
- Event-driven architecture with Kafka for order processing
- Inventory management with distributed locking
- Caching layer with Redis
- Full containerization with Docker and Kubernetes
- Comprehensive monitoring and observability
[Client]
|
v
+------------------+
| Order API | REST; Store in DB; Publish to Kafka
+--------+---------+
| |
| v
| +----------+
| | Kafka | topics: orders.created, orders.updated
| +-----+----+
| |
v v
+--------+ +------------------+
| Redis | | Order Consumer | consume -> update inventory, send notification
+--------+ +--------+---------+
^ |
| v
| +------------------+
+---------| PostgreSQL | orders, order_items, products
+------------------+
- Python 3.11+
- Docker and Docker Compose
- PostgreSQL 14+
- Apache Kafka 2.8+
- Redis 7+
- Kubernetes (minikube/kind for local development)
- Clone the repository:
git clone <repository-url>
cd 01-event-driven-order-service- Start all services:
docker-compose up -d- Create Kafka topics:
docker-compose exec kafka kafka-topics --create --topic orders.created --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
docker-compose exec kafka kafka-topics --create --topic orders.updated --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1- Run database migrations:
docker-compose exec api alembic upgrade head- Seed sample products:
docker-compose exec api python scripts/seed_products.py- Test the API:
# Create an order
curl -X POST http://localhost:8000/api/v1/orders \
-H "Content-Type: application/json" \
-d '{
"user_id": "660e8400-e29b-41d4-a716-446655440001",
"items": [
{"product_id": "770e8400-e29b-41d4-a716-446655440002", "quantity": 2}
]
}'
# Get order by ID
curl http://localhost:8000/api/v1/orders/{order_id}
# List orders
curl "http://localhost:8000/api/v1/orders?user_id=660e8400-e29b-41d4-a716-446655440001&limit=10"- Install dependencies:
cd api
pip install -r requirements.txt- Set up environment variables:
export DATABASE_URL=postgresql://postgres:postgres@localhost:5432/orders_db
export REDIS_URL=redis://localhost:6379
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092- Run migrations:
alembic upgrade head- Start the API:
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload- Start the consumer (in a separate terminal):
cd consumer
python -m app.mainproject-root/
├── README.md
├── docker-compose.yml
├── .github/
│ └── workflows/
│ └── ci.yml
├── api/ # Order API service
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── alembic.ini
│ ├── app/
│ │ ├── main.py
│ │ ├── config.py
│ │ ├── api/
│ │ │ ├── routes/
│ │ │ │ └── orders.py
│ │ │ └── deps.py
│ │ ├── core/
│ │ │ ├── kafka_producer.py
│ │ │ └── redis_client.py
│ │ ├── db/
│ │ │ ├── models.py
│ │ │ └── repository.py
│ │ └── schemas/
│ │ └── order.py
│ ├── migrations/ # Alembic migrations
│ ├── scripts/
│ │ └── seed_products.py
│ └── tests/
├── consumer/
│ ├── Dockerfile
│ ├── requirements.txt
│ ├── app/
│ │ ├── main.py
│ │ ├── handlers/
│ │ │ └── order_events.py
│ │ └── services/
│ │ └── inventory.py
│ └── tests/
├── k8s/
│ ├── api-deployment.yaml
│ ├── api-service.yaml
│ ├── consumer-deployment.yaml
│ └── configmap.yaml
└── terraform/ # Optional
└── main.tf
Once the service is running, access the interactive API documentation at:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
- OpenAPI JSON: http://localhost:8000/openapi.json
Create a new order.
Request Body:
{
"user_id": "660e8400-e29b-41d4-a716-446655440001",
"items": [
{
"product_id": "770e8400-e29b-41d4-a716-446655440002",
"quantity": 2
}
]
}Response:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"user_id": "660e8400-e29b-41d4-a716-446655440001",
"status": "CREATED",
"total": 39.98,
"items": [
{
"product_id": "770e8400-e29b-41d4-a716-446655440002",
"quantity": 2,
"unit_price": 19.99
}
],
"created_at": "2024-01-15T10:30:00Z"
}Get order details by ID. Results are cached in Redis for 5 minutes.
List orders with optional filters:
user_id: Filter by user IDstatus: Filter by order status (CREATED, PROCESSING, COMPLETED, FAILED)limit: Number of results (default: 10, max: 100)offset: Pagination offset (default: 0)
Configuration is managed through environment variables:
DATABASE_URL: PostgreSQL connection stringREDIS_URL: Redis connection stringKAFKA_BOOTSTRAP_SERVERS: Kafka broker addresses (comma-separated)LOG_LEVEL: Logging level (DEBUG, INFO, WARNING, ERROR)API_PORT: API server port (default: 8000)
DATABASE_URL: PostgreSQL connection stringREDIS_URL: Redis connection stringKAFKA_BOOTSTRAP_SERVERS: Kafka broker addressesKAFKA_CONSUMER_GROUP: Consumer group name (default: order-processor)DLQ_TOPIC: Dead letter queue topic (default: orders.failed)
Run tests:
# API tests
cd api
pytest tests/ -v
# Consumer tests
cd consumer
pytest tests/ -v
# Integration tests
pytest tests/integration/ -v- Health endpoint:
GET /health - Readiness endpoint:
GET /ready
Prometheus metrics are exposed at /metrics:
http_requests_total: Total HTTP requests by method, endpoint, and statushttp_request_duration_seconds: HTTP request duration histogramkafka_producer_messages_total: Total messages published to Kafkakafka_consumer_messages_total: Total messages consumed from Kafkaredis_cache_hits_total: Redis cache hitsredis_cache_misses_total: Redis cache misses
Structured JSON logging with correlation IDs for distributed tracing.
- Connection Pooling: SQLAlchemy connection pool configured for optimal performance
- Query Optimization: Indexes on frequently queried columns
- Caching: Multi-level caching strategy with Redis
- Kafka Partitioning: Orders partitioned by user_id hash for ordering guarantees
- Rate Limiting: API rate limiting to prevent abuse
- Authentication: JWT-based authentication (configured via middleware)
- Input Validation: Pydantic models for request validation
- SQL Injection Prevention: Parameterized queries
- Secrets Management: Environment variables for sensitive data
Deploy to Kubernetes:
kubectl apply -f k8s/The project includes a GitHub Actions workflow for:
- Linting (ruff, black)
- Unit tests
- Integration tests
- Docker image build
- Deployment to staging (optional)
- Kafka connection errors: Ensure Kafka is running and accessible
- Database connection errors: Check DATABASE_URL and PostgreSQL is running
- Redis connection errors: Verify Redis is running and REDIS_URL is correct
- Consumer not processing: Check consumer logs and Kafka topic configuration
MIT License
Mehdi Jahani