Skip to content

Latest commit

 

History

History
904 lines (749 loc) · 28.2 KB

File metadata and controls

904 lines (749 loc) · 28.2 KB

System Architecture Documentation

Design Philosophy

This system follows Clean Architecture principles with clear separation between:

  • External interfaces (HTTP handlers)
  • Business logic (services, estimators)
  • Data access (adapters)
  • Domain models (shared structures)

Layer Responsibilities

1. Handler Layer (internal/handlers/)

Purpose: HTTP interface to the system

Responsibilities:

  • Parse and validate HTTP requests
  • Convert HTTP data to domain models
  • Handle HTTP-specific concerns (status codes, headers)
  • Route requests to appropriate services
  • Convert domain responses to HTTP responses

Design Decisions:

  • Uses standard net/http (no framework dependency)
  • Validation happens at the boundary
  • Errors are converted to appropriate HTTP status codes
  • No business logic in handlers

Example Flow:

HTTP Request
    ↓
Parse JSON → Validate → Create Domain Model
    ↓
Call Service Layer
    ↓
Convert Response → Send HTTP Response

2. Service Layer (internal/services/)

Purpose: Business logic orchestration

Assessment Service

  • Role: Main workflow coordinator
  • Orchestrates:
    1. Location data fetching
    2. Risk calculation
    3. Mitigation generation
  • Depends on: LocationService, RiskEstimator, LLMService
  • Error handling: Graceful degradation (partial data acceptable)

Location Service

  • Role: Environmental data aggregation
  • Orchestrates: Multiple adapter calls
  • Future optimization: Parallel fetching with goroutines
  • Caching strategy: Should cache by coordinate grid (not implemented)

LLM Service

  • Role: AI integration abstraction
  • Current: Rule-based mitigation generation
  • Future: Real LLM API calls (OpenAI/Anthropic)
  • Prompt engineering: Structured context → Natural language
  • Response parsing: Extract actionable items from LLM output

3. Estimator Layer (internal/estimators/)

Purpose: Domain-specific calculation logic

Risk Estimator Design:

Industry Profile (static)
    +
Request Parameters (dynamic)
    +
Location Context (fetched)
    ↓
Weighted Factor Calculation
    ↓
Risk Score (0-100)
    +
Contributing Factors (explainability)

Why separate from services?

  • Pure calculation logic (no I/O)
  • Easily testable in isolation
  • Risk model can be swapped/upgraded
  • Can be moved to separate microservice later

Industry Profiles:

type IndustryProfile struct {
    BaseRisk       float64  // Starting risk score
    WaterIntensity float64  // Multiplier for water usage
    PollutantTypes []string // For documentation
}

Profiles based on:

  • Industrial wastewater characteristics (EPA guidelines)
  • Historical contamination incidents
  • Treatment complexity requirements
  • Regulatory scrutiny levels

4. Adapter Layer (internal/adapters/)

Purpose: External data source integration

Design Pattern: Strategy Pattern

  • Each adapter implements same interface
  • Can be mocked for testing
  • Real implementations can be swapped in
  • Failures don't crash the system

Water Body Adapter

Mock Implementation:

// Simulates proximity patterns
if math.Mod(lat, 0.5) < 0.3 {
    return river data
}

Production Implementation (to be built):

// 1. Build Overpass API query
query := fmt.Sprintf(`
    [out:json];
    (
        way["waterway"](around:%d,%f,%f);
        way["natural"="water"](around:%d,%f,%f);
    );
    out body;
`, radius, lat, lon, radius, lat, lon)

// 2. HTTP POST to Overpass API
resp, err := http.Post(overpassURL, contentType, queryReader)

// 3. Parse GeoJSON response
var result OverpassResponse
json.Unmarshal(body, &result)

// 4. Calculate distances for each water body
for _, element := range result.Elements {
    distance := haversineDistance(lat, lon, element.Lat, element.Lon)
    waterBody := models.WaterBody{
        Name:       element.Tags["name"],
        Type:       element.Tags["waterway"],
        DistanceKm: distance,
    }
}

// 5. Lookup sensitivity from environmental database
// (rivers used for drinking water = HIGH)
sensitivity := lookupSensitivity(waterBody)

API Endpoints:

  • Overpass API: https://overpass-api.de/api/interpreter
  • Backup: https://overpass.kumi.systems/api/interpreter

Population Adapter

Mock Implementation:

// Uses trig functions to simulate urban/rural
urbanness := abs(sin(lat*10)) * abs(cos(lon*10))
return density based on urbanness

Production Implementation:

// 1. Call WorldPop API
url := fmt.Sprintf(
    "https://api.worldpop.org/v1/services/stats?lat=%f&lon=%f",
    lat, lon,
)

// 2. Parse population count
var response WorldPopResponse
json.Unmarshal(body, &response)

// 3. Calculate density
area := response.GridCellArea // km²
density := response.Population / area

return density

Alternative APIs:

  • US Census Bureau: https://geocoding.geo.census.gov/geocoder/
  • GeoNames: http://api.geonames.org/findNearbyPlaceNameJSON
  • LandScan: https://landscan.ornl.gov/

Land Use Adapter

Mock Implementation:

// Pseudo-random selection
seed := int(lat*1000 + lon*1000)
return landUseTypes[seed % len(types)]

Production Implementation:

// 1. Query USGS Land Cover API
url := fmt.Sprintf(
    "https://www.mrlc.gov/downloads/sciweb1/shared/mrlc/data/%d/NLCD_%d_Land_Cover_L48_20190424.img",
    year, year,
)

// 2. Get pixel value at coordinates
// (requires GeoTIFF parsing library)
value := getPixelValue(lat, lon, nlcdRaster)

// 3. Map NLCD code to category
classification := mapNLCDCode(value)
// 21-24: Developed (residential/commercial)
// 41-43: Forest
// 52: Shrubland
// 71-74: Herbaceous
// 81-82: Agricultural
// 90-95: Wetlands

Alternative APIs:

  • ESA WorldCover: Global 10m resolution
  • Copernicus: European coverage
  • Local planning/zoning APIs

5. Model Layer (internal/models/)

Purpose: Shared data structures

Design Principles:

  • JSON tags for HTTP serialization
  • Validation tags for input checking
  • Comments explain field meanings
  • No business logic in models

Key Models:

// Input from client
type AssessmentRequest struct {
    IndustryType   string  `json:"industry_type"`
    Latitude       float64 `json:"latitude"`
    Longitude      float64 `json:"longitude"`
    WaterUsageM3   float64 `json:"water_usage_m3"`
    TreatmentType  string  `json:"treatment_type"`
}

// Output to client
type AssessmentResponse struct {
    RiskScore          string              // "low", "medium", "high"
    RiskValue          float64             // 0-100
    ContributingFactors []ContributingFactor
    MitigationActions  []string
    LocationContext    LocationContext
}

// Explainability structure
type ContributingFactor struct {
    Factor      string  // e.g., "Water Usage Volume"
    Impact      string  // "increases" or "decreases"
    Weight      float64 // numerical contribution
    Description string  // human-readable explanation
}

Data Flow Diagram

Complete Request Flow

┌─────────────────────────────────────────────────────────────────┐
│ 1. CLIENT SENDS REQUEST                                         │
│    POST /api/v1/assess                                          │
│    {industry_type, lat, lon, water_usage, treatment}            │
└────────────────────────┬────────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────────┐
│ 2. HTTP HANDLER (assessment_handler.go)                         │
│    - Parse JSON                                                 │
│    - Validate: lat [-90,90], lon [-180,180], usage >= 0        │
│    - Create domain model                                        │
└────────────────────────┬────────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────────┐
│ 3. ASSESSMENT SERVICE (assessment_service.go)                   │
│    Orchestrates: Fetch data → Calculate risk → Generate actions │
└────────────────────────┬────────────────────────────────────────┘
                         │
                ┌────────┴────────┐
                │                 │
                ▼                 ▼
┌───────────────────────┐   ┌──────────────────────┐
│ 4a. LOCATION SERVICE  │   │ 4b. RISK ESTIMATOR   │
│ (location_service.go) │   │ (risk_estimator.go)  │
│                       │   │                      │
│ Calls 3 adapters:     │   │ Calculations:        │
│ - Water bodies        │   │ - Get industry       │
│ - Population          │   │   profile            │
│ - Land use            │   │ - Calculate water    │
│                       │   │   usage risk         │
│ Returns:              │   │ - Apply treatment    │
│ LocationContext       │   │   reduction          │
└───────────────────────┘   │ - Calculate          │
                            │   proximity risk     │
                            │ - Sum all factors    │
                            │                      │
                            │ Returns:             │
                            │ - Risk value (0-100) │
                            │ - Contributing       │
                            │   factors list       │
                            └──────────────────────┘
                                      │
                                      ▼
                            ┌──────────────────────┐
                            │ 5. LLM SERVICE       │
                            │ (llm_service.go)     │
                            │                      │
                            │ Input: Risk +        │
                            │        Factors +     │
                            │        Location      │
                            │                      │
                            │ Process:             │
                            │ - Build prompt       │
                            │ - [MVP] Rule-based   │
                            │ - [Prod] Call LLM    │
                            │                      │
                            │ Returns:             │
                            │ - Mitigation actions │
                            └──────────────────────┘
                                      │
                                      ▼
┌─────────────────────────────────────────────────────────────────┐
│ 6. ASSEMBLE RESPONSE                                            │
│    {                                                            │
│      "risk_score": "high",                                      │
│      "risk_value": 78.5,                                        │
│      "contributing_factors": [...],                             │
│      "mitigation_actions": [...],                               │
│      "location_context": {...}                                  │
│    }                                                            │
└────────────────────────┬────────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────────┐
│ 7. HANDLER SENDS HTTP RESPONSE                                  │
│    Status: 200 OK                                               │
│    Content-Type: application/json                               │
└─────────────────────────────────────────────────────────────────┘

Risk Calculation Algorithm

Detailed Formula

FINAL_RISK = BASE_RISK 
           + WATER_USAGE_RISK
           - TREATMENT_REDUCTION
           + PROXIMITY_RISK
           + POPULATION_RISK
           + LANDUSE_RISK

Bounded: max(0, min(100, FINAL_RISK))

Step-by-Step Example

Scenario: Textile factory, 450 m³/day, primary treatment, near river, urban area

1. BASE_RISK (Industry Profile)
   Textile industry base = 65.0
   
2. WATER_USAGE_RISK
   Usage: 450 m³/day
   Baseline: 100 m³/day
   Excess ratio: (450-100)/100 = 3.5
   Water intensity (textile): 1.8
   Risk: min(3.5 * 1.8 * 10, 20) = 20.0 (capped)
   
3. TREATMENT_REDUCTION
   Primary treatment: -15.0
   
4. PROXIMITY_RISK
   River at 2.3 km: base 15.0
   High sensitivity: 15.0 * 1.5 = 22.5
   
5. POPULATION_RISK
   Density: 3850 people/km² (urban)
   Risk: +15.0
   
6. LANDUSE_RISK
   Mixed-use zone: +8.0

FINAL = 65.0 + 20.0 - 15.0 + 22.5 + 15.0 + 8.0
      = 115.5
      = min(115.5, 100) = 100.0 (capped)
      
Category: HIGH (>70)

Explainability System

Why Explainability Matters

Regulatory decisions require transparency. The system tracks:

  • What factors were considered
  • How each factor impacted the score
  • The reasoning behind each contribution

Implementation

Each calculation returns:

type ContributingFactor struct {
    Factor:      "Water Usage Volume"
    Impact:      "increases"
    Weight:      20.0
    Description: "High water usage (> 500 m³/day)"
}

This allows:

  • Auditing: Trace how a risk score was calculated
  • Appeals: Industry can contest specific factors
  • Improvements: Identify which changes reduce risk most
  • Compliance: Show regulatory compliance

Example Output

{
  "risk_value": 78.5,
  "contributing_factors": [
    {
      "factor": "Water Usage Volume",
      "impact": "increases",
      "weight": 18.9,
      "description": "High water usage (> 500 m³/day)"
    },
    {
      "factor": "Wastewater Treatment",
      "impact": "decreases",
      "weight": -15.0,
      "description": "Primary treatment (physical settling)"
    },
    {
      "factor": "Proximity to Water Bodies",
      "impact": "increases",
      "weight": 28.7,
      "description": "Mock River Basin at 2.3 km"
    }
  ]
}

User can see: "Upgrading from primary to secondary treatment would reduce risk by 15 points"

LLM Integration Architecture

Current State (MVP)

func (s *LLMService) GenerateMitigations(...) []string {
    // Rule-based logic
    if treatmentType == "none" {
        return "Install primary treatment"
    }
    // ... more rules
}

Production Architecture

┌─────────────────────────────────────────────────────────┐
│ 1. STRUCTURE PROMPT                                     │
│                                                         │
│    buildLLMPrompt(request, risk, factors, location)    │
│    ↓                                                    │
│    # Industry Context                                   │
│    Industry: Textile                                    │
│    Water Usage: 450 m³/day                              │
│    Treatment: Primary                                   │
│    Risk Score: 78.5/100 (HIGH)                          │
│                                                         │
│    # Key Risk Factors                                   │
│    - High water usage (+18.9)                           │
│    - Nearby river (+28.7)                               │
│    - Dense urban area (+15.0)                           │
│                                                         │
│    # Task                                               │
│    Generate 4-6 actionable mitigation strategies        │
└────────────────────────┬────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────┐
│ 2. CALL LLM API                                         │
│                                                         │
│    POST https://api.anthropic.com/v1/messages          │
│    {                                                    │
│      "model": "claude-3-sonnet-20240229",               │
│      "max_tokens": 500,                                 │
│      "messages": [{                                     │
│        "role": "user",                                  │
│        "content": structuredPrompt                      │
│      }]                                                 │
│    }                                                    │
└────────────────────────┬────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────┐
│ 3. PARSE RESPONSE                                       │
│                                                         │
│    LLM returns natural language:                        │
│    "Based on the high risk score, I recommend:          │
│     1. Upgrade to secondary biological treatment        │
│     2. Install closed-loop water recycling              │
│     3. Implement real-time discharge monitoring         │
│     4. Install dye recovery systems..."                 │
│                                                         │
│    parseLLMResponse(text) extracts action items         │
└────────────────────────┬────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────┐
│ 4. RETURN STRUCTURED ACTIONS                            │
│                                                         │
│    []string{                                            │
│      "Upgrade to secondary biological treatment...",    │
│      "Install closed-loop water recycling...",          │
│      "Implement real-time discharge monitoring...",     │
│      "Install dye recovery systems..."                  │
│    }                                                    │
└─────────────────────────────────────────────────────────┘

Implementation Code

type LLMClient struct {
    apiKey     string
    baseURL    string
    httpClient *http.Client
}

func (c *LLMClient) Complete(ctx context.Context, prompt string) (string, error) {
    reqBody := map[string]interface{}{
        "model":      "claude-3-sonnet-20240229",
        "max_tokens": 500,
        "messages": []map[string]string{
            {"role": "user", "content": prompt},
        },
    }
    
    jsonBody, _ := json.Marshal(reqBody)
    
    req, _ := http.NewRequestWithContext(
        ctx,
        "POST",
        c.baseURL+"/v1/messages",
        bytes.NewReader(jsonBody),
    )
    
    req.Header.Set("x-api-key", c.apiKey)
    req.Header.Set("anthropic-version", "2023-06-01")
    req.Header.Set("content-type", "application/json")
    
    resp, err := c.httpClient.Do(req)
    // ... handle response
    
    var result AnthropicResponse
    json.NewDecoder(resp.Body).Decode(&result)
    
    return result.Content[0].Text, nil
}

Error Handling Strategy

Adapter Failures

// Graceful degradation: continue with partial data
waterBodies, err := adapter.Fetch(ctx, lat, lon)
if err != nil {
    log.Warn("Water body fetch failed", err)
    waterBodies = []WaterBody{} // Empty, not fatal
}

Rationale: Location data failures shouldn't crash assessment. Use defaults or skip optional factors.

Service Failures

// LLM failures fall back to rules
mitigations, err := llmService.Generate(...)
if err != nil {
    log.Error("LLM generation failed", err)
    mitigations = generateBasicMitigations() // Fallback
}

HTTP Errors

// Clear error responses
if err := validate(req); err != nil {
    respondWithError(w, 400, "Validation error: "+err.Error())
    return
}

// Internal errors don't leak details
if err := service.Assess(ctx, req); err != nil {
    log.Error("Assessment failed", err)
    respondWithError(w, 500, "Assessment failed")
    return
}

Scalability Considerations

Current Limitations

  • Single-threaded request handling
  • No connection pooling
  • No caching
  • Sequential adapter calls

Scaling Strategies

1. Horizontal Scaling

Load Balancer
    ├── API Instance 1
    ├── API Instance 2
    └── API Instance 3
         ↓
    Shared Redis Cache
         ↓
    External APIs

2. Caching Layer

type CachedLocationService struct {
    locationService *LocationService
    cache          *redis.Client
}

func (s *CachedLocationService) FetchLocationContext(ctx context.Context, lat, lon float64) {
    // Round to grid (0.01° ≈ 1km)
    gridLat := math.Round(lat*100) / 100
    gridLon := math.Round(lon*100) / 100
    
    cacheKey := fmt.Sprintf("loc:%f:%f", gridLat, gridLon)
    
    // Try cache first
    if cached, err := s.cache.Get(ctx, cacheKey).Result(); err == nil {
        return parseLocationContext(cached), nil
    }
    
    // Fetch fresh data
    locationCtx, err := s.locationService.FetchLocationContext(ctx, lat, lon)
    
    // Cache for 24 hours
    s.cache.Set(ctx, cacheKey, serialize(locationCtx), 24*time.Hour)
    
    return locationCtx, err
}

3. Async Processing

// For non-urgent assessments
type AssessmentJob struct {
    ID      string
    Request AssessmentRequest
    Status  string // "pending", "processing", "complete"
}

// Client submits job
POST /api/v1/assess/asyncReturns: {"job_id": "abc123", "status": "pending"}

// Client polls for results
GET /api/v1/assess/async/abc123Returns: {"status": "complete", "result": {...}}

Security Architecture

Current State

  • No authentication
  • No rate limiting
  • No input sanitization beyond validation
  • No HTTPS enforcement

Production Requirements

1. Authentication

type APIKey struct {
    Key       string
    UserID    string
    RateLimit int
    ExpiresAt time.Time
}

func authMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        apiKey := r.Header.Get("X-API-Key")
        
        key, err := validateAPIKey(apiKey)
        if err != nil {
            http.Error(w, "Unauthorized", 401)
            return
        }
        
        // Add user context to request
        ctx := context.WithValue(r.Context(), "userID", key.UserID)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

2. Rate Limiting

import "golang.org/x/time/rate"

type RateLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.Mutex
}

func (rl *RateLimiter) Allow(userID string) bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    
    limiter, exists := rl.limiters[userID]
    if !exists {
        limiter = rate.NewLimiter(10, 100) // 10 req/sec, burst 100
        rl.limiters[userID] = limiter
    }
    
    return limiter.Allow()
}

3. Input Sanitization

func sanitizeIndustryType(input string) string {
    // Remove special characters
    input = strings.ToLower(input)
    input = regexp.MustCompile(`[^a-z0-9-]`).ReplaceAllString(input, "")
    
    // Limit length
    if len(input) > 50 {
        input = input[:50]
    }
    
    return input
}

Monitoring & Observability

Metrics to Track

// Request metrics
requestCount := prometheus.NewCounterVec(
    prometheus.CounterOpts{
        Name: "assessment_requests_total",
        Help: "Total number of assessment requests",
    },
    []string{"industry_type", "risk_level"},
)

requestDuration := prometheus.NewHistogramVec(
    prometheus.HistogramOpts{
        Name:    "assessment_duration_seconds",
        Help:    "Assessment request duration",
        Buckets: prometheus.DefBuckets,
    },
    []string{"endpoint"},
)

// Adapter metrics
adapterFailures := prometheus.NewCounterVec(
    prometheus.CounterOpts{
        Name: "adapter_failures_total",
        Help: "Total adapter fetch failures",
    },
    []string{"adapter_type"},
)

Logging Strategy

import "github.com/sirupsen/logrus"

log.WithFields(logrus.Fields{
    "request_id":    requestID,
    "industry_type": req.IndustryType,
    "latitude":      req.Latitude,
    "longitude":     req.Longitude,
    "risk_score":    response.RiskScore,
    "duration_ms":   duration.Milliseconds(),
}).Info("Assessment completed")

Health Checks

type HealthStatus struct {
    Status     string            `json:"status"`
    Timestamp  time.Time         `json:"timestamp"`
    Components map[string]string `json:"components"`
}

func healthCheck(w http.ResponseWriter, r *http.Request) {
    status := HealthStatus{
        Status:    "healthy",
        Timestamp: time.Now(),
        Components: map[string]string{
            "api":       "up",
            "adapters":  checkAdapters(),
            "estimator": "up",
        },
    }
    
    json.NewEncoder(w).Encode(status)
}

Testing Strategy

Unit Tests

go test ./internal/estimators/... -v
go test ./internal/adapters/... -v

Integration Tests

go test ./internal/services/... -v

End-to-End Tests

# Start server
go run cmd/server/main.go &

# Run tests
curl -X POST http://localhost:8080/api/v1/assess \
  -d @testdata/request1.json

# Cleanup
pkill -f "go run"

Future Enhancements

  1. Machine Learning Risk Model

    • Train on historical contamination incidents
    • Predict risk from larger feature set
    • Continuous learning from new data
  2. Geographic Information System (GIS)

    • Interactive map visualization
    • Watershed analysis
    • Contamination plume modeling
  3. Real-time Monitoring Integration

    • IoT sensor data ingestion
    • Anomaly detection
    • Automated alerting
  4. Multi-tenant Support

    • Separate data per organization
    • Role-based access control
    • Custom risk models per tenant
  5. Regulatory Compliance

    • Jurisdiction-specific rules
    • Automated reporting generation
    • Permit application assistance

This architecture supports evolution from MVP to production-scale system while maintaining clean separation of concerns and testability.