Skip to content

Commit ad3bbe7

Browse files
committed
feat: kubernetes discovery
1 parent 6f46a1c commit ad3bbe7

File tree

5 files changed

+160
-12
lines changed

5 files changed

+160
-12
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ url = "2.5"
116116
# Validation
117117
validator = { version = "0.20", features = ["derive"] }
118118

119+
# Kubernetes client
120+
kube = { version = "0.97", features = ["runtime", "client", "derive"] }
121+
k8s-openapi = { version = "0.23", features = ["v1_31"] }
122+
119123
[profile.release]
120124
codegen-units = 1
121125
lto = "thin"

crates/infera-management-core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ validator = { workspace = true }
5353
reqwest = { workspace = true }
5454
url = { workspace = true }
5555

56+
# Kubernetes client (for service discovery)
57+
kube = { workspace = true }
58+
k8s-openapi = { workspace = true }
59+
5660
# Metrics
5761
metrics = { workspace = true }
5862

crates/infera-management-core/src/config.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,52 @@ pub struct CacheInvalidationConfig {
255255
/// Number of retry attempts on webhook failure
256256
#[serde(default = "default_webhook_retry_attempts")]
257257
pub retry_attempts: u8,
258+
259+
/// Service discovery configuration
260+
#[serde(default)]
261+
pub discovery: DiscoveryConfig,
262+
}
263+
264+
/// Service discovery configuration
265+
#[derive(Debug, Clone, Serialize, Deserialize)]
266+
pub struct DiscoveryConfig {
267+
/// Discovery mode (none or kubernetes)
268+
#[serde(default)]
269+
pub mode: DiscoveryMode,
270+
271+
/// Cache TTL for discovered endpoints (in seconds)
272+
#[serde(default = "default_discovery_cache_ttl")]
273+
pub cache_ttl_seconds: u64,
274+
275+
/// Whether to enable health checking of endpoints
276+
#[serde(default = "default_discovery_health_check")]
277+
pub enable_health_check: bool,
278+
279+
/// Health check interval (in seconds)
280+
#[serde(default = "default_discovery_health_check_interval")]
281+
pub health_check_interval_seconds: u64,
282+
}
283+
284+
impl Default for DiscoveryConfig {
285+
fn default() -> Self {
286+
Self {
287+
mode: DiscoveryMode::None,
288+
cache_ttl_seconds: default_discovery_cache_ttl(),
289+
enable_health_check: default_discovery_health_check(),
290+
health_check_interval_seconds: default_discovery_health_check_interval(),
291+
}
292+
}
293+
}
294+
295+
/// Service discovery mode
296+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
297+
#[serde(rename_all = "lowercase")]
298+
pub enum DiscoveryMode {
299+
/// No service discovery - use service URL directly
300+
#[default]
301+
None,
302+
/// Kubernetes service discovery - resolve to pod IPs
303+
Kubernetes,
258304
}
259305

260306
// Default value functions
@@ -379,6 +425,7 @@ fn default_cache_invalidation() -> CacheInvalidationConfig {
379425
http_endpoints: vec![], // No webhooks by default
380426
timeout_ms: 5000, // 5 seconds
381427
retry_attempts: 0, // Fire-and-forget (no retries)
428+
discovery: DiscoveryConfig::default(),
382429
}
383430
}
384431

@@ -402,6 +449,18 @@ fn default_webhook_retry_attempts() -> u8 {
402449
0 // Fire-and-forget
403450
}
404451

452+
fn default_discovery_cache_ttl() -> u64 {
453+
300 // 5 minutes
454+
}
455+
456+
fn default_discovery_health_check() -> bool {
457+
false
458+
}
459+
460+
fn default_discovery_health_check_interval() -> u64 {
461+
30 // 30 seconds
462+
}
463+
405464
impl ManagementConfig {
406465
/// Load configuration from a file with environment variable overrides
407466
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {

crates/infera-management-core/src/webhook_client.rs

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,9 @@ fn is_kubernetes_service(endpoint: &str) -> bool {
369369
///
370370
/// List of pod endpoints (e.g., ["http://10.0.1.2:8080", "http://10.0.1.3:8080"])
371371
async fn discover_k8s_service_endpoints(service_endpoint: &str) -> Result<Vec<String>, String> {
372+
use k8s_openapi::api::core::v1::Endpoints as K8sEndpoints;
373+
use kube::{Api, Client};
374+
372375
// Parse the service URL
373376
let url =
374377
url::Url::parse(service_endpoint).map_err(|e| format!("Invalid service URL: {}", e))?;
@@ -379,7 +382,7 @@ async fn discover_k8s_service_endpoints(service_endpoint: &str) -> Result<Vec<St
379382
let service_port = url
380383
.port_or_known_default()
381384
.ok_or_else(|| "No port in service URL".to_string())?;
382-
let _scheme = url.scheme();
385+
let scheme = url.scheme();
383386

384387
// Extract service name and namespace from hostname
385388
// Formats supported:
@@ -402,23 +405,58 @@ async fn discover_k8s_service_endpoints(service_endpoint: &str) -> Result<Vec<St
402405
"Discovering Kubernetes service endpoints"
403406
);
404407

405-
// In a real implementation, you would use the kube-rs library to query the Kubernetes API
406-
// For now, we'll return a placeholder that falls back to the service URL
407-
// TODO: Implement actual Kubernetes API integration using kube-rs
408+
// Create Kubernetes client
409+
let client = Client::try_default()
410+
.await
411+
.map_err(|e| format!("Failed to create Kubernetes client: {}", e))?;
412+
413+
// Get the Endpoints resource for this service
414+
let endpoints_api: Api<K8sEndpoints> = Api::namespaced(client, namespace);
415+
416+
let endpoints = endpoints_api.get(service_name).await.map_err(|e| {
417+
if e.to_string().contains("404") {
418+
format!("Service {}.{} not found", service_name, namespace)
419+
} else {
420+
format!(
421+
"Failed to get endpoints for {}.{}: {}",
422+
service_name, namespace, e
423+
)
424+
}
425+
})?;
426+
427+
// Extract pod IPs from the Endpoints resource
428+
let mut pod_endpoints = Vec::new();
429+
430+
if let Some(subsets) = endpoints.subsets {
431+
for subset in subsets {
432+
// Only use ready addresses (healthy pods)
433+
if let Some(addresses) = subset.addresses {
434+
for address in addresses {
435+
let pod_ip = &address.ip;
436+
let endpoint_url = format!("{}://{}:{}", scheme, pod_ip, service_port);
437+
pod_endpoints.push(endpoint_url);
438+
}
439+
}
440+
}
441+
}
408442

409-
// Placeholder: In production, this would use kube-rs to:
410-
// 1. Get the Endpoints resource for the service
411-
// 2. Extract all pod IPs
412-
// 3. Return list of "http://pod-ip:port" URLs
443+
if pod_endpoints.is_empty() {
444+
warn!(
445+
service_name = %service_name,
446+
namespace = %namespace,
447+
"No ready endpoints found for service - falling back to service URL"
448+
);
449+
return Ok(vec![service_endpoint.to_string()]);
450+
}
413451

414-
// For now, just return the original service endpoint as fallback
415-
warn!(
452+
info!(
416453
service_name = %service_name,
417454
namespace = %namespace,
418-
"Kubernetes service discovery not yet fully implemented - using service URL as fallback"
455+
endpoint_count = pod_endpoints.len(),
456+
"Discovered Kubernetes service endpoints"
419457
);
420458

421-
Ok(vec![service_endpoint.to_string()])
459+
Ok(pod_endpoints)
422460
}
423461

424462
#[cfg(test)]

k8s/rbac.yaml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
---
2+
# ServiceAccount for InferaDB Management API
3+
apiVersion: v1
4+
kind: ServiceAccount
5+
metadata:
6+
name: inferadb-management
7+
namespace: inferadb
8+
labels:
9+
app.kubernetes.io/name: inferadb-management
10+
app.kubernetes.io/component: control-plane
11+
12+
---
13+
# ClusterRole for service discovery
14+
# Allows reading Endpoints resources across all namespaces
15+
apiVersion: rbac.authorization.k8s.io/v1
16+
kind: ClusterRole
17+
metadata:
18+
name: inferadb-management-discovery
19+
labels:
20+
app.kubernetes.io/name: inferadb-management
21+
app.kubernetes.io/component: control-plane
22+
rules:
23+
- apiGroups: [""]
24+
resources: ["endpoints"]
25+
verbs: ["get", "list", "watch"]
26+
27+
---
28+
# ClusterRoleBinding for service discovery
29+
apiVersion: rbac.authorization.k8s.io/v1
30+
kind: ClusterRoleBinding
31+
metadata:
32+
name: inferadb-management-discovery
33+
labels:
34+
app.kubernetes.io/name: inferadb-management
35+
app.kubernetes.io/component: control-plane
36+
roleRef:
37+
apiGroup: rbac.authorization.k8s.io
38+
kind: ClusterRole
39+
name: inferadb-management-discovery
40+
subjects:
41+
- kind: ServiceAccount
42+
name: inferadb-management
43+
namespace: inferadb

0 commit comments

Comments
 (0)