Skip to content

Commit 5989e2f

Browse files
committed
Refactor initialization into modular submodules
Split initialization.rs into multiple focused modules under src/initialization, improving maintainability and clarity. Updated main.rs to use new initialization API and removed direct Rocket server construction. Cargo.toml dependency section was reorganized for clarity, grouping dependencies by category. Refactor server initialization and modularize API Split main.rs into modular components for CORS, endpoints, logging, initialization, and server setup. Added new modules: api_models, cors, endpoints, initialization, logging, and server. Refactored cluster management and peer discovery logic, improved documentation, and updated dependency versions in Cargo.toml. Replaced scale..rs with scale.rs and updated control API module structure. Enhanced SharedState struct documentation and usage. Refactor apps API into modular route files Split the monolithic apps.rs file into multiple focused modules for CRUD, control, listing, release, and instance management. Updated mod.rs and main API routing to use the new modular structure for improved maintainability and clarity. Refactor alerts API into modular route files Split the monolithic alerts.rs file into multiple focused modules: actions, app_alerts, auto_resolve, bulk, create, escalation, get, list, org_alerts, search, types, and update. This improves maintainability and clarity by grouping related API endpoints and types into separate files. Also applied similar modularization to audit_log.rs. No functional changes to endpoint logic. Refactor cost API module and update alerts imports Removed the monolithic cost.rs API module and split its functionality into multiple files for allocation tags, analysis, budgets, metrics, pricing, projections, resource types, and types. Updated all alert API modules to use the new import path for db::queries, reflecting the deeper module nesting. Refactor API schemas into modular route files Split monolithic API route files into modular submodules for builds, deployments, instances, metrics, notifications, and permissions. Each resource now has dedicated files for CRUD operations and route registration, improving maintainability and clarity of the codebase.
1 parent b4540b4 commit 5989e2f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+6138
-5426
lines changed

Cargo.toml

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,50 @@ name = "omni-orchestrator"
33
version = "0.1.4"
44
edition = "2021"
55

6+
67
[dependencies]
7-
rocket = { version = "0.5.1", features = ["json", "uuid"] }
8-
uuid = { version = "1.17.0", features = ["v4", "serde"] }
9-
tokio = { version = "1.45.1", features = ["full"] }
10-
serde = { version = "1.0.219", features = ["derive", "rc"] }
11-
serde_json5 = "0.2.1"
12-
serde_json = "1.0.140"
8+
# Core async/runtime
9+
tokio = { version = "1.45", features = ["full"] }
1310
async-trait = "0.1.88"
14-
reqwest = { version = "0.12.18", features = ["json", "native-tls-vendored", "rustls-tls"] }
15-
log = "0.4.27"
16-
env_logger = "0.11.8"
17-
thiserror = "2.0.12"
18-
parking_lot = "0.12.4"
19-
chrono = { version = "0.4.41", features = ["serde"] }
2011
futures = "0.3.31"
21-
colored = "3.0.0"
22-
anyhow = "1.0.98"
23-
lazy_static = "1.5.0"
12+
13+
# Web framework & API
14+
rocket = { version = "0.5.1", features = ["json", "uuid"] }
2415
rocket-multipart-form-data = "0.10.7"
16+
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "rustls-tls"] }
17+
18+
# Serialization
19+
serde = { version = "1.0", features = ["derive", "rc"] }
20+
serde_json = "1.0"
21+
serde_json5 = "0.2.1"
22+
23+
# Database
24+
mysql = "26.0.0"
25+
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "mysql", "chrono", "uuid"] }
26+
clickhouse = { version = "0.13.3"}
27+
libomni = { path = "../LibOmni" }
28+
lighthouse = { git = "https://github.com/OmniCloudOrg/Lighthouse", features = ["metrics-persistence", "predictive-scaling"], rev = "de72b5365bdc1a28e89ffbf06b3c92957ad3403d"}
29+
30+
# Crypto & Security
2531
sha2 = "0.10.9"
2632
rand = "0.9.1"
2733
hex = "0.4.3"
28-
mysql = "26.0.0"
34+
jsonwebtoken = "9.3.1"
35+
36+
# Utilities
37+
anyhow = "1.0.98"
38+
thiserror = "2.0.12"
39+
colored = "3.0.0"
40+
lazy_static = "1.5.0"
2941
once_cell = "1.21.3"
30-
sqlx = { version = "0.8.6", features = ["runtime-tokio-rustls", "mysql", "chrono", "uuid"] }
42+
dotenv = { version = "0.15.0" }
43+
parking_lot = "0.12.4"
44+
chrono = { version = "0.4.41", features = ["serde"] }
45+
uuid = { version = "1.17.0", features = ["v4", "serde"] }
3146
tracing = { version = "0.1.41", features = ["log"] }
47+
env_logger = "0.11.8"
48+
log = "0.4.27"
3249
hadris-iso = "0.0.2"
3350
ssh2 = { version = "0.9.5" }
34-
libomni = { path = "../LibOmni" }
35-
dotenv = { version = "0.15.0"}
3651
paxakos = "0.13.0"
37-
jsonwebtoken = "9.3.1"
3852
rust_decimal = "1.37.1"
39-
clickhouse = { version = "0.13.3"}
40-
41-
[profile.dev]
42-
codegen-units = 32
43-
incremental = true
44-
debug-assertions = false
45-
overflow-checks = false
46-
opt-level = 0
47-
lto = "thin"

src/api_models.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//! API models for the OmniOrchestrator
2+
//!
3+
//! These are the general API models used by the OmniOrchestrator
4+
//! service outside the primary platform routes located in
5+
//! /scr/schema/VERSION/api
6+
7+
use serde::{Deserialize, Serialize};
8+
use crate::cluster::NodeInfo;
9+
10+
#[derive(Debug, Serialize, Deserialize)]
11+
pub struct ClusterStatusMessage {
12+
pub node_roles: String,
13+
pub cluster_nodes: Vec<NodeInfo>,
14+
}
15+
16+
#[derive(Serialize, Deserialize)]
17+
pub struct ApiResponse {
18+
pub status: String,
19+
pub message: ClusterStatusMessage,
20+
}

src/cluster.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
use anyhow::{anyhow, Result};
12
use colored::Colorize;
23
use log::debug;
4+
use reqwest::Client;
35
use serde::{Deserialize, Serialize};
46
use std::collections::HashMap;
57
use std::sync::Arc;
8+
use std::time::Duration;
69
use tokio::sync::RwLock;
710

11+
use crate::config::ServerConfig;
812
use crate::state::SharedState;
913

1014
/// Represents a node in the OmniOrchestrator cluster.
@@ -239,4 +243,70 @@ impl ClusterManager {
239243
let nodes = self.nodes.read().await;
240244
nodes.contains_key(&node_uid)
241245
}
246+
247+
pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
248+
let client = Client::new();
249+
log::info!("{}", "Starting peer discovery...".cyan());
250+
251+
for instance in &config.instances {
252+
let string = format!("{:#?}", instance);
253+
log::info!("{}", format!("Discovered: {}", string).blue().bold());
254+
if instance.port == my_port {
255+
log::debug!("Skipping self-connection at port {}", my_port);
256+
continue;
257+
}
258+
259+
let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
260+
let node_uri = format!("{}", node_address);
261+
262+
match self.connect_to_peer(&client, &node_uri.clone()).await {
263+
Ok(_) => log::info!(
264+
"{}",
265+
format!("Successfully connected to peer: {}", node_uri).green()
266+
),
267+
Err(e) => {
268+
log::warn!(
269+
"{}",
270+
format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
271+
);
272+
self.remove_node(node_uri.into()).await;
273+
}
274+
}
275+
}
276+
277+
log::info!("{}", "Peer discovery completed".cyan());
278+
Ok(())
279+
}
280+
281+
async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
282+
let health_url = format!("{}/health", node_address);
283+
log::debug!("Checking health at: {}", health_url);
284+
285+
let response = client
286+
.get(&health_url)
287+
.timeout(Duration::from_secs(5))
288+
.send()
289+
.await?;
290+
291+
if response.status().is_success() {
292+
let port = node_address
293+
.split(':')
294+
.next_back()
295+
.unwrap_or("80")
296+
.parse::<u16>()
297+
.unwrap_or(80);
298+
299+
let node_info = NodeInfo {
300+
id: node_address.into(),
301+
address: node_address.into(),
302+
port,
303+
};
304+
305+
self.register_node(node_info).await;
306+
log::debug!("Node registered: {}", node_address);
307+
Ok(())
308+
} else {
309+
Err(anyhow!("Node health check failed"))
310+
}
311+
}
242312
}

src/cors.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use rocket::{Request, Response, fairing::{Fairing, Info, Kind}, http::Header};
2+
3+
pub struct CORS;
4+
5+
#[rocket::async_trait]
6+
impl Fairing for CORS {
7+
fn info(&self) -> Info {
8+
Info {
9+
name: "Add comprehensive CORS headers to responses",
10+
kind: Kind::Response,
11+
}
12+
}
13+
14+
async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
15+
response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
16+
response.set_header(Header::new(
17+
"Access-Control-Allow-Methods",
18+
"GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD",
19+
));
20+
response.set_header(Header::new(
21+
"Access-Control-Allow-Headers",
22+
"Authorization, Content-Type, Accept, Origin, X-Requested-With",
23+
));
24+
response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
25+
response.set_header(Header::new("Access-Control-Max-Age", "86400"));
26+
}
27+
}
28+
29+
#[options("/<_..>")]
30+
pub fn cors_preflight() -> &'static str {
31+
""
32+
}

src/db_manager/manager.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ impl DatabaseManager {
5353
.platform_pool(platform_id, &platform_name)
5454
.await?;
5555

56-
// TODO: @Caznix @tristanpoland We need to find a new home for this
57-
// Initialize the schema
56+
// TODO: Platform schema initialization needs to be relocated
57+
// Currently commented out pending architectural decisions about where
58+
// platform-specific schema initialization should be handled.
59+
// Consider moving to a dedicated platform management service.
5860
// MigrationManager::initialize_platform_schema(&pool, platform).await?;
5961

6062
Ok(pool)

src/endpoints.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
//! Core API endpoints for the OmniOrchestrator server.
2+
//!
3+
//! This module provides essential HTTP endpoints for monitoring and managing
4+
//! the OmniOrchestrator cluster. These endpoints serve as the primary interface
5+
//! for external systems to query the health and status of the cluster.
6+
7+
use rocket;
8+
use std::sync::Arc;
9+
use tokio::sync::RwLock;
10+
use crate::cluster::ClusterManager;
11+
use crate::state::SharedState;
12+
use crate::api_models::{ApiResponse, ClusterStatusMessage};
13+
14+
/// Health check endpoint that provides basic service availability status.
15+
///
16+
/// This endpoint is used by load balancers, monitoring systems, and other external
17+
/// services to determine if the OmniOrchestrator service is running and responding
18+
/// to requests. It returns a simple JSON response indicating service availability.
19+
///
20+
/// # Returns
21+
///
22+
/// A JSON response with status "ok" and basic cluster information.
23+
#[get("/health")]
24+
pub async fn health_check() -> rocket::serde::json::Json<ApiResponse> {
25+
log::debug!("Health check endpoint called");
26+
rocket::serde::json::Json(ApiResponse {
27+
status: "ok".to_string(),
28+
message: ClusterStatusMessage {
29+
node_roles: "unknown".to_string(),
30+
cluster_nodes: vec![],
31+
},
32+
})
33+
}
34+
35+
/// Provides detailed cluster status information including node roles and membership.
36+
///
37+
/// This endpoint returns comprehensive information about the current state of the
38+
/// OmniOrchestrator cluster, including which node is the leader, cluster membership,
39+
/// and the role of the current node. This information is crucial for cluster
40+
/// monitoring and debugging distributed system issues.
41+
///
42+
/// # Arguments
43+
///
44+
/// * `state` - Shared state containing node role and cluster information
45+
/// * `cluster` - Cluster manager with information about all known nodes
46+
///
47+
/// # Returns
48+
///
49+
/// A JSON response containing:
50+
/// - Overall cluster status
51+
/// - Current node's role (leader/follower)
52+
/// - List of all known cluster nodes
53+
#[get("/cluster/status")]
54+
pub async fn cluster_status(
55+
state: &rocket::State<Arc<RwLock<SharedState>>>,
56+
cluster: &rocket::State<Arc<RwLock<ClusterManager>>>,
57+
) -> rocket::serde::json::Json<ApiResponse> {
58+
log::debug!("Cluster status endpoint called");
59+
let state = state.read().await;
60+
let nodes = cluster.read().await;
61+
62+
let role = if state.is_leader {
63+
"leader".to_string()
64+
} else {
65+
"follower".to_string()
66+
};
67+
68+
log::info!("{}", format!("Current node role: {}", role));
69+
70+
let response = ApiResponse {
71+
status: "ok".to_string(),
72+
message: ClusterStatusMessage {
73+
node_roles: role,
74+
cluster_nodes: nodes.get_nodes().await,
75+
},
76+
};
77+
78+
rocket::serde::json::Json(response)
79+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use libomni::types::db::auth::AuthConfig;
2+
3+
/// Constructs the authentication configuration from environment variables.
4+
///
5+
/// - Loads the JWT secret and token expiry hours from environment variables.
6+
/// - Panics if `JWT_SECRET` is not set or if `TOKEN_EXPIRY_HOURS` is invalid.
7+
///
8+
/// # Returns
9+
/// Returns an `AuthConfig` struct with the loaded values.
10+
pub fn create_auth_config() -> AuthConfig {
11+
AuthConfig {
12+
jwt_secret: std::env::var("JWT_SECRET")
13+
.expect("Environment variable JWT_SECRET must be set for secure operation."),
14+
token_expiry_hours: std::env::var("TOKEN_EXPIRY_HOURS")
15+
.unwrap_or_else(|_| "24".to_string())
16+
.parse()
17+
.expect("Invalid value for TOKEN_EXPIRY_HOURS"),
18+
}
19+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use crate::server::build_rocket;
2+
// use crate::{CLUSTER_MANAGER}; // removed unused import
3+
use crate::db_manager::DatabaseManager;
4+
use crate::state::SharedState;
5+
// use libomni::types::db::auth::AuthConfig; // removed unused import
6+
use std::sync::Arc;
7+
use tokio::sync::RwLock;
8+
use colored::Colorize;
9+
10+
/// Builds and launches the Rocket server with the provided configuration and dependencies.
11+
///
12+
/// # Arguments
13+
/// * `port` - The port to bind the server to.
14+
/// * `db_manager` - Shared database manager instance.
15+
/// * `pool` - Main database pool.
16+
/// * `cluster_manager` - Shared cluster manager instance.
17+
/// * `clickhouse_client` - ClickHouse client instance.
18+
/// * `shared_state_for_server` - Shared state for the server.
19+
///
20+
/// # Errors
21+
/// Returns an error if the Rocket server fails to launch.
22+
pub async fn launch_server(
23+
port: u16,
24+
db_manager: Arc<DatabaseManager>,
25+
pool: sqlx::MySqlPool,
26+
cluster_manager: Arc<RwLock<crate::cluster::ClusterManager>>,
27+
clickhouse_client: clickhouse::Client,
28+
shared_state_for_server: Arc<RwLock<SharedState>>,
29+
) -> Result<(), Box<dyn std::error::Error>> {
30+
let auth_config = super::create_auth_config();
31+
let rocket_with_routes = build_rocket(
32+
port,
33+
db_manager,
34+
pool,
35+
cluster_manager,
36+
clickhouse_client,
37+
shared_state_for_server,
38+
auth_config,
39+
);
40+
log::info!("{}", "🚀 LAUNCHING SERVER...".bright_cyan().bold());
41+
rocket_with_routes.launch().await?;
42+
Ok(())
43+
}

0 commit comments

Comments
 (0)