-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmicroservice_template.py
More file actions
115 lines (97 loc) · 3.33 KB
/
microservice_template.py
File metadata and controls
115 lines (97 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Microservice template for business applications.
This template gets customized during the build process based on the app name.
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import os
from typing import Dict, Any
import uvicorn
# Import the shared resource manager
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from shared_resources import ResourceManager
# App name will be set via environment variable
APP_NAME = os.getenv("PLATFORM_APPLICATION_NAME", "microservice")
APP_PORT = int(os.getenv("PORT", 8000))
app = FastAPI(title=f"{APP_NAME} Service", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize resource manager
resource_manager = ResourceManager(APP_NAME)
# Get API Gateway URL for traffic simulation
def get_api_gateway_url():
"""Get API Gateway URL based on environment"""
if os.getenv("PLATFORM_APPLICATION_NAME"): # Running on Upsun
return os.getenv("PLATFORM_RELATIONSHIPS_API_GATEWAY_0_URL", "http://api-gateway.internal")
else: # Local development
return "http://localhost:8004"
@app.get("/")
async def root():
return {
"message": f"{APP_NAME} Service",
"status": "running",
"app_name": APP_NAME
}
@app.get("/health")
async def health():
"""Health check endpoint"""
return resource_manager.get_health()
@app.get("/metrics")
async def metrics():
"""Get resource metrics"""
return resource_manager.get_metrics()
@app.get("/system")
async def system_info():
"""Get system information"""
return resource_manager.get_system_info()
@app.post("/resources")
async def update_resources(resource_data: Dict[str, Any]):
"""Update resource levels for this service"""
try:
levels = {
"processing": resource_data.get("processing", 0),
"storage": resource_data.get("storage", 0),
"traffic": resource_data.get("traffic", 0),
"orders": resource_data.get("orders", 0),
"completions": resource_data.get("completions", 0)
}
api_gateway_url = get_api_gateway_url()
print(f"[{APP_NAME}] Calling resource_manager.update_resources with levels: {levels}")
await resource_manager.update_resources(levels, api_gateway_url)
print(f"[{APP_NAME}] resource_manager.update_resources completed")
return {
"status": "success",
"app_name": APP_NAME,
"levels": levels
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/resources/reset")
async def reset_resources():
"""Reset all resource levels to zero"""
try:
levels = {
"processing": 0,
"storage": 0,
"traffic": 0,
"orders": 0,
"completions": 0
}
await resource_manager.update_resources(levels)
return {
"status": "success",
"app_name": APP_NAME,
"message": "Resources reset"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=APP_PORT)