Event orchestration system for a smart city that receives events from different services, normalizes data, applies business rules, and generates derived events for other services.
The system is based on an event-oriented architecture with the following main components:
-
API (FastAPI)
- Receives events via REST endpoint
/ingest/{service} - Validates and processes events through the application layer
- Exposes endpoint for querying stored events
- Receives events via REST endpoint
-
Application Layer (
app/application/)ingest.py: Orchestrates the event ingestion flow- Manages event deduplication
- Persists base and derived events
-
Domain Layer (
app/domain/)- Event Normalization (
events/normalization/):- Normalizes received payloads using Pydantic schemas
- Supports different payload types (Health, Energy, Transport, Security)
- Business Rules (
events/rules/):- Evaluates normalized events and generates derived events
- Implements orchestration logic between services
- Orchestration (
orchestration/):- Registry of factories per service
- Factories that combine normalizers and rule evaluators
- Event Normalization (
-
Infrastructure Layer (
app/infra/)- Persistence (
persistence/):- SQLAlchemy models for Event and OutboxMessage
- Repositories for data access
- Outbox Pattern (
outbox/):- Worker that processes pending messages
- Ensures delivery of derived events to other services
- Implements retry and attempt control
- Persistence (
-
Database (PostgreSQL)
- Stores base and derived events
- Outbox table for asynchronous processing
- Support for deduplication via unique key
1. Event received via API → /ingest/{service}
2. Factory Registry identifies the service
3. Normalizer validates and normalizes the payload
4. Rule evaluator processes the normalized event
5. Derived events are created and persisted
6. Notifications are enqueued in the outbox
7. Worker processes outbox and publishes events
- Factory Pattern: Each service has a factory that provides normalizer and rule evaluator
- Strategy Pattern: Different normalization and evaluation strategies per service
- Outbox Pattern: Guarantees delivery of derived events
- Repository Pattern: Data access abstraction
- Docker and Docker Compose installed
- Make (optional, but recommended)
- Clone the repository:
git clone <repository-url>
cd smartcity-orchestrator- Create a
.envfile in the project root with the following variables:
DATABASE_URL=postgresql://postgres:postgres@db:5432/postgresmake setupThis command will:
- Build Docker images
- Start all services
- Wait for the database to be ready
- Run database migrations
Build images:
make buildStart services:
make upStart and view logs:
make up-logsStop services:
make downView logs:
make logs # All services
make logs-api # API only
make logs-db # Database only
make logs-worker # Worker onlyRun migrations:
make migrate # Apply all migrations
make migrate-status # View current status
make migrate-history # View migration historyRun tests:
make test # All tests
make test-cov # With coverage
make test-file FILE=tests/api/test_routes.py # Specific fileOpen shell in container:
make shellClean volumes and containers:
make cleanAfter starting the services, the API will be available at:
- API: http://localhost:8000
- Swagger Documentation: http://localhost:8000/docs
- ReDoc Documentation: http://localhost:8000/redoc
GET /health- Health checkPOST /ingest/{service}- Event ingestionGET /events- List events (with pagination)
{
"patient_id": 12345,
"alert": "emergency",
"location": "Rua das Flores, 100"
}class HealthPayload(BasePayload):
patient_id: Optional[int] = None
alert: Optional[str] = None
location: Optional[str] = NoneWhen alert == "emergency", the system generates two derived events:
- Event for Transport:
{
"action": "dispatch_nearest_vehicle",
"reason": "health_emergency",
"location": "Rua das Flores, 100",
"patient_id": 12345
}- Deduplication Key:
health_emergency_{patient_id}
- Event for Security:
{
"priority": "high",
"action": "escort_and_clear_traffic",
"reason": "health_emergency",
"location": "Rua das Flores, 100",
"patient_id": 12345
}- Deduplication Key:
health_emergency_{patient_id}
curl -X POST "http://localhost:8000/ingest/health" \
-H "Content-Type: application/json" \
-d '{
"patient_id": 12345,
"alert": "emergency",
"location": "Rua das Flores, 100"
}'{
"stored_event_id": "550e8400-e29b-41d4-a716-446655440000",
"derived_events": [
"550e8400-e29b-41d4-a716-446655440001",
"550e8400-e29b-41d4-a716-446655440002"
]
}{
"energy": 650.0,
"neighborhood": "downtown"
}class EnergyPayload(BasePayload):
energy: Optional[float] = None
neighborhood: Optional[str] = NoneWhen energy > 500.0 kWh, the system generates an event for Security:
Event for Security:
{
"alert": "possible_risk",
"reason": "critical_energy_usage",
"neighborhood": "downtown",
"energy": 650.0
}- Deduplication Key:
critical_energy_usage_{neighborhood} - Configurable threshold:
THRESHOLD_KWH = 500.0
curl -X POST "http://localhost:8000/ingest/energy" \
-H "Content-Type: application/json" \
-d '{
"energy": 650.0,
"neighborhood": "downtown"
}'If energy <= 500.0, no derived events are generated:
curl -X POST "http://localhost:8000/ingest/energy" \
-H "Content-Type: application/json" \
-d '{
"energy": 400.0,
"neighborhood": "suburbs"
}'Response:
{
"stored_event_id": "550e8400-e29b-41d4-a716-446655440003",
"derived_events": []
}{
"bus_id": 42,
"lat": -23.5505,
"lon": -46.6333
}class TransportPayload(BasePayload):
bus_id: Optional[int] = None
lat: Optional[float] = None
lon: Optional[float] = NoneThis service currently does not have rules that generate derived events. Events are only normalized and stored.
curl -X POST "http://localhost:8000/ingest/transport" \
-H "Content-Type: application/json" \
-d '{
"bus_id": 42,
"lat": -23.5505,
"lon": -46.6333
}'{
"alert": true,
"camera_trigger": "motion_detected"
}class SecurityPayload(BasePayload):
alert: Optional[bool] = None
camera_trigger: Optional[str] = NoneThis service currently does not have rules that generate derived events. Events are only normalized and stored.
curl -X POST "http://localhost:8000/ingest/security" \
-H "Content-Type: application/json" \
-d '{
"alert": true,
"camera_trigger": "motion_detected"
}'The system supports deduplication through the dedupe_key parameter:
curl -X POST "http://localhost:8000/ingest/energy?dedupe_key=unique_key_123" \
-H "Content-Type: application/json" \
-d '{
"energy": 600.0,
"neighborhood": "downtown"
}'If the same dedupe_key is used again, the system returns the existing event without creating duplicates.
To list stored events:
curl "http://localhost:8000/events?limit=10&offset=0"Response:
[
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"service": "health",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"patient_id": 12345,
"alert": "emergency",
"location": "Rua das Flores, 100"
},
"normalized_payload": {
"patient_id": 12345,
"alert": "emergency",
"location": "Rua das Flores, 100"
},
"deduplication_key": null,
"source_event_id": null,
"created_at": "2024-01-15T10:30:00Z"
},
{
"id": "550e8400-e29b-41d4-a716-446655440001",
"service": "transport",
"timestamp": "2024-01-15T10:30:00Z",
"payload": {
"action": "dispatch_nearest_vehicle",
"reason": "health_emergency",
"location": "Rua das Flores, 100",
"patient_id": 12345
},
"normalized_payload": null,
"deduplication_key": "health_emergency_12345",
"source_event_id": "550e8400-e29b-41d4-a716-446655440000",
"created_at": "2024-01-15T10:30:00Z"
}
]smartcity-orchestrator/
├── alembic/ # Database migrations
├── app/
│ ├── api/ # REST endpoints (FastAPI)
│ ├── application/ # Application logic
│ ├── core/ # Configuration and DB connection
│ ├── domain/ # Domain logic
│ │ ├── events/ # Normalization and rules
│ │ └── orchestration/ # Factories and registry
│ ├── infra/ # Infrastructure
│ │ ├── outbox/ # Worker and enqueue
│ │ └── persistence/ # Models and repositories
│ └── main.py # FastAPI application
├── tests/ # Unit and integration tests
├── docker-compose.yml # Docker configuration
├── Dockerfile # Docker image
├── Makefile # Helper commands
└── requirements.txt # Python dependencies
- FastAPI: Web framework for REST API
- PostgreSQL: Relational database
- SQLAlchemy: ORM for Python
- Alembic: Database migrations
- Pydantic: Data validation and schemas
- Docker & Docker Compose: Containerization
- pytest: Testing framework
make migrate-create MESSAGE="migration description"make test # All tests
make test-cov # With coverage report
make test-pattern PATTERN="test_ingest" # Specific tests- Create payload schema in
app/domain/events/normalization/payloads.py - Create normalizer (or use
PydanticEventNormalizer) - Create rule evaluator in
app/domain/events/rules/ - Create factory in
app/domain/orchestration/factories/ - Register in
FactoryRegistryinapp/domain/orchestration/registry.py
[Specify license]