Skip to content

Commit 2cc5c4a

Browse files
committed
feat: add /api/topology endpoint to API
1 parent 4d7c1e0 commit 2cc5c4a

File tree

7 files changed

+209
-104
lines changed

7 files changed

+209
-104
lines changed

config/default.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ simulation:
55
speed_factor: 300
66
window_size_minutes: 5
77
heartbeat_cadence_minutes: 1
8+
experiment_mode: false
89

910
features:
1011
calibration_enabled: false
@@ -33,6 +34,12 @@ kafka:
3334
name: "sys.config"
3435
config:
3536
cleanup.policy: "compact"
37+
sim_topology:
38+
name: "sim.topology"
39+
config:
40+
# Compacted topic - keeps latest simulated topology
41+
cleanup.policy: "compact"
42+
min.compaction.lag.ms: "0"
3643
results:
3744
name: "sim.results"
3845
config:

config/experiments/experiment_1.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ simulation:
55
speed_factor: 300
66
window_size_minutes: 15
77
heartbeat_cadence_minutes: 1
8-
# Experiment mode: write results to parquet instead of Kafka
8+
# Experiment mode: write results to parquet
99
experiment_mode: true
1010

1111
features:
@@ -35,6 +35,12 @@ kafka:
3535
name: "sys.config"
3636
config:
3737
cleanup.policy: "compact"
38+
sim_topology:
39+
name: "sim.topology"
40+
config:
41+
# Compacted topic - keeps latest simulated topology
42+
cleanup.policy: "compact"
43+
min.compaction.lag.ms: "0"
3844
results:
3945
name: "sim.results"
4046
config:

services/opendt-api/opendt_api/main.py

Lines changed: 123 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22

33
import logging
44
from contextlib import asynccontextmanager
5-
from fastapi import FastAPI
6-
from fastapi.middleware.cors import CORSMiddleware
5+
from typing import Annotated
76

8-
# Import shared models from common library
9-
from opendt_common import Task, Fragment, Consumption
7+
from fastapi import Body, FastAPI, HTTPException
8+
from fastapi.middleware.cors import CORSMiddleware
9+
from opendt_common import load_config_from_env
10+
from opendt_common.models.topology import CPU, Cluster, CPUPowerModel, Host, Memory, Topology
1011
from opendt_common.utils import get_kafka_producer
12+
from opendt_common.utils.kafka import send_message
1113

1214
logging.basicConfig(
13-
level=logging.INFO,
14-
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
15+
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
1516
)
1617
logger = logging.getLogger(__name__)
1718

@@ -21,20 +22,25 @@ async def lifespan(app: FastAPI):
2122
"""Lifespan context manager for startup and shutdown events."""
2223
# Startup
2324
logger.info("Starting OpenDT API service...")
24-
25+
26+
# Load configuration
27+
try:
28+
app.state.config = load_config_from_env()
29+
logger.info(f"Configuration loaded for workload: {app.state.config.workload}")
30+
except Exception as e:
31+
logger.error(f"Failed to load configuration: {e}")
32+
app.state.config = None
33+
2534
# Initialize Kafka producer (stored in app state for reuse)
2635
try:
2736
app.state.kafka_producer = get_kafka_producer()
2837
logger.info("Kafka producer initialized")
2938
except Exception as e:
3039
logger.error(f"Failed to initialize Kafka producer: {e}")
3140
app.state.kafka_producer = None
32-
33-
# TODO: Initialize database connection
34-
# TODO: Run database migrations
35-
41+
3642
yield
37-
43+
3844
# Shutdown
3945
logger.info("Shutting down OpenDT API service...")
4046
if app.state.kafka_producer:
@@ -62,83 +68,129 @@ async def lifespan(app: FastAPI):
6268
)
6369

6470

65-
@app.get("/")
66-
async def root():
67-
"""Root endpoint."""
68-
return {
69-
"service": "OpenDT API",
70-
"version": "0.1.0",
71-
"status": "running",
72-
"docs": "/docs",
73-
}
74-
75-
7671
@app.get("/health")
7772
async def health_check():
7873
"""Health check endpoint."""
7974
kafka_status = "connected" if app.state.kafka_producer else "disconnected"
80-
75+
config_status = "loaded" if app.state.config else "not loaded"
76+
8177
return {
8278
"status": "healthy",
8379
"kafka": kafka_status,
84-
# TODO: Add database health check
85-
# "database": db_status,
80+
"config": config_status,
8681
}
8782

8883

8984
# ============================================================================
90-
# EXAMPLE ENDPOINTS (Using Shared Models)
85+
# TOPOLOGY MANAGEMENT
9186
# ============================================================================
9287

93-
@app.get("/api/tasks", response_model=list[Task])
94-
async def list_tasks():
95-
"""List all tasks (example endpoint)."""
96-
# TODO: Fetch from database
97-
return []
98-
99-
100-
@app.post("/api/tasks", response_model=Task)
101-
async def create_task(task: Task):
102-
"""Create a new task (example endpoint)."""
103-
logger.info(f"Creating task: {task.id}")
104-
105-
# TODO: Save to database
106-
107-
# Send to Kafka (example)
108-
if app.state.kafka_producer:
109-
from opendt_common.utils.kafka import send_message
110-
send_message(
111-
app.state.kafka_producer,
112-
topic="tasks",
113-
message=task.model_dump(mode="json"),
114-
key=task.id
115-
)
116-
117-
return task
118-
119-
120-
@app.get("/api/fragments", response_model=list[Fragment])
121-
async def list_fragments():
122-
"""List all fragments (example endpoint)."""
123-
# TODO: Fetch from database
124-
return []
12588

89+
# Default topology for Swagger UI (matches SURF data)
90+
DEFAULT_TOPOLOGY = Topology(
91+
clusters=[
92+
Cluster(
93+
name="A01",
94+
hosts=[
95+
Host(
96+
name="A01",
97+
count=277,
98+
cpu=CPU(coreCount=16, coreSpeed=2100.0),
99+
memory=Memory(memorySize=128000000), # ~128 MB
100+
cpuPowerModel=CPUPowerModel(
101+
modelType="asymptotic",
102+
power=400.0,
103+
idlePower=32.0,
104+
maxPower=180.0,
105+
asymUtil=0.3,
106+
dvfs=False,
107+
),
108+
)
109+
],
110+
)
111+
]
112+
)
126113

127-
@app.get("/api/consumption", response_model=list[Consumption])
128-
async def list_consumption():
129-
"""List consumption data (example endpoint)."""
130-
# TODO: Fetch from database
131-
return []
114+
# Example for OpenAPI docs
115+
DEFAULT_TOPOLOGY_EXAMPLE = DEFAULT_TOPOLOGY.model_dump(mode="json")
116+
117+
118+
@app.put("/api/topology")
119+
async def update_topology(
120+
topology: Annotated[
121+
Topology,
122+
Body(
123+
description="Datacenter topology configuration",
124+
openapi_examples={
125+
"default": {
126+
"summary": "SURF datacenter topology",
127+
"description": "Default SURF topology: 277 hosts, 16 cores each @ 2.1 GHz",
128+
"value": DEFAULT_TOPOLOGY_EXAMPLE,
129+
}
130+
},
131+
),
132+
] = DEFAULT_TOPOLOGY,
133+
):
134+
"""Update the simulated datacenter topology.
135+
136+
This endpoint validates the topology structure and publishes it to Kafka.
137+
The sim-worker will pick it up and use it for future simulations.
138+
139+
Args:
140+
topology: Datacenter topology configuration with cluster details
141+
142+
Returns:
143+
Success confirmation with topology details
144+
145+
Raises:
146+
HTTPException: 500 if Kafka producer is not available
147+
HTTPException: 500 if publishing to Kafka fails
148+
"""
149+
# Check if Kafka producer is available
150+
if not app.state.kafka_producer:
151+
logger.error("Kafka producer not initialized")
152+
raise HTTPException(status_code=500, detail="Kafka producer not available")
153+
154+
# Check if config is loaded (to get topic name)
155+
if not app.state.config:
156+
logger.error("Configuration not loaded")
157+
raise HTTPException(status_code=500, detail="Configuration not loaded")
158+
159+
# Topology already validated by Pydantic
160+
logger.info(f"Topology validated: {len(topology.clusters)} cluster(s)")
161+
162+
# Get sim.topology topic name from config
163+
sim_topology_topic = app.state.config.kafka.topics.get("sim_topology")
164+
if not sim_topology_topic:
165+
logger.error("sim.topology topic not configured")
166+
raise HTTPException(status_code=500, detail="sim.topology topic not configured")
167+
168+
topic_name = sim_topology_topic.name
169+
170+
# Publish to sim.topology Kafka topic with compacted key
171+
try:
172+
send_message(
173+
producer=app.state.kafka_producer,
174+
topic=topic_name,
175+
message=topology.model_dump(mode="json"),
176+
key="datacenter",
177+
)
178+
logger.info(f"Topology published to {topic_name}")
179+
except Exception as e:
180+
logger.error(f"Failed to publish topology to Kafka: {e}")
181+
raise HTTPException(status_code=500, detail=f"Failed to publish topology: {e}") from e
132182

183+
return {
184+
"status": "updated",
185+
"message": f"Topology published to {topic_name}",
186+
"clusters": len(topology.clusters),
187+
"total_hosts": topology.total_host_count(),
188+
"total_cores": topology.total_core_count(),
189+
"topic": topic_name,
190+
}
133191

134-
# ============================================================================
135-
# TODO: Add more endpoints for:
136-
# - Simulations management
137-
# - Workload submission
138-
# - Results querying
139-
# - Monitoring & metrics
140-
# ============================================================================
141192

142193
if __name__ == "__main__":
143194
import uvicorn
195+
144196
uvicorn.run(app, host="0.0.0.0", port=8000)

services/sim-worker/sim_worker/experiment_manager.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -216,35 +216,24 @@ def archive_opendc_files(
216216
if input_file.exists():
217217
shutil.copy2(input_file, input_dir / input_file.name)
218218

219-
# Copy OpenDC output parquet files from the actual output directory
219+
# Copy entire OpenDC output directory from temp to archive
220220
output_dir_src = (
221221
Path(simulated_results.opendc_output_dir)
222222
if simulated_results.opendc_output_dir
223223
else None
224224
)
225225

226-
copied_count = 0
227226
if output_dir_src and output_dir_src.exists():
228-
output_files = [
229-
"powerSource.parquet",
230-
"host.parquet",
231-
"service.parquet",
232-
]
227+
# Copy all files from OpenDC output directory
228+
for item in output_dir_src.iterdir():
229+
if item.is_file():
230+
shutil.copy2(item, output_dir_dest / item.name)
231+
logger.debug(f"Copied {item.name} to {output_dir_dest}")
233232

234-
for filename in output_files:
235-
output_file = output_dir_src / filename
236-
if output_file.exists():
237-
dest_file = output_dir_dest / filename
238-
shutil.copy2(output_file, dest_file)
239-
copied_count += 1
240-
logger.debug(f"Copied {filename} from {output_file} to {dest_file}")
241-
else:
242-
logger.warning(f"OpenDC output file not found: {output_file}")
233+
logger.debug(f"Copied OpenDC output directory: {output_dir_src} -> {output_dir_dest}")
243234
else:
244235
logger.warning(f"OpenDC output directory not found or not set: {output_dir_src}")
245236

246-
logger.debug(f"Copied {copied_count}/3 OpenDC output parquet files")
247-
248237
# Create output/summary.json with power draw results and summary stats
249238
output_summary = {
250239
"window_id": window.window_id,

0 commit comments

Comments
 (0)