Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions engine/src/modules/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct WorkerInfo {
pub functions: Vec<String>,
pub active_invocations: usize,
pub latest_metrics: Option<WorkerMetrics>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
}

#[derive(Debug, Clone, Deserialize)]
Expand All @@ -100,6 +102,8 @@ pub struct RegisterWorkerInput {
pub name: Option<String>,
pub os: Option<String>,
pub telemetry: Option<WorkerTelemetryMeta>,
#[serde(default)]
pub pid: Option<u32>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -203,6 +207,7 @@ impl WorkerModule {
functions,
active_invocations,
latest_metrics,
pid: w.pid,
});
}
worker_infos
Expand All @@ -226,6 +231,7 @@ impl WorkerModule {
input.name,
input.os,
input.telemetry,
input.pid,
);
}
}
Expand Down Expand Up @@ -488,6 +494,27 @@ mod tests {
assert_eq!(telemetry.framework.as_deref(), Some("express"));
}

#[test]
fn register_worker_input_accepts_pid() {
let json = serde_json::json!({
"_caller_worker_id": "550e8400-e29b-41d4-a716-446655440000",
"runtime": "node",
"pid": 9876
});
let input: RegisterWorkerInput = serde_json::from_value(json).expect("deserialize");
assert_eq!(input.pid, Some(9876u32));
}

#[test]
fn register_worker_input_pid_defaults_to_none() {
let json = serde_json::json!({
"_caller_worker_id": "550e8400-e29b-41d4-a716-446655440000",
"runtime": "python"
});
let input: RegisterWorkerInput = serde_json::from_value(json).expect("deserialize");
assert!(input.pid.is_none());
}

fn setup_engine_and_module() -> (Arc<Engine>, WorkerModule) {
ensure_default_meter();
let engine = Arc::new(Engine::new());
Expand Down Expand Up @@ -628,6 +655,7 @@ mod tests {
name: Some("test-worker".to_string()),
os: Some("linux".to_string()),
telemetry: None,
pid: None,
};

// Should not panic, just log an error and return
Expand All @@ -651,6 +679,7 @@ mod tests {
name: Some("my-worker".to_string()),
os: Some("darwin".to_string()),
telemetry: None,
pid: None,
};

module.register_worker_metadata(input).await;
Expand Down Expand Up @@ -1118,6 +1147,7 @@ mod tests {
name: Some("my-worker".to_string()),
os: Some("linux".to_string()),
telemetry: None,
pid: None,
})
.await;
assert!(matches!(result, FunctionResult::Success(Some(_))));
Expand Down
65 changes: 63 additions & 2 deletions engine/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ impl WorkerRegistry {
}

pub fn register_worker(&self, worker: Worker) {
tracing::info!(
worker_id = %worker.id,
ip_address = ?worker.ip_address,
"Worker registered"
);
self.workers.insert(worker.id, worker);
let count = self.workers.len() as i64;

Expand All @@ -69,7 +74,19 @@ impl WorkerRegistry {
}

pub fn unregister_worker(&self, worker_id: &Uuid) {
tracing::debug!("Unregistering worker: {}", worker_id);
let (ip_address, pid) = self
.workers
.get(worker_id)
.map(|w| (w.ip_address.clone(), w.pid))
.unwrap_or((None, None));

tracing::info!(
worker_id = %worker_id,
ip_address = ?ip_address,
pid = ?pid,
"Worker unregistered"
);

self.workers.remove(worker_id);
let count = self.workers.len() as i64;

Expand All @@ -91,6 +108,7 @@ impl WorkerRegistry {
.collect()
}

#[allow(clippy::too_many_arguments)]
pub fn update_worker_metadata(
&self,
worker_id: &Uuid,
Expand All @@ -99,6 +117,7 @@ impl WorkerRegistry {
name: Option<String>,
os: Option<String>,
telemetry: Option<WorkerTelemetryMeta>,
pid: Option<u32>,
) {
if let Some(mut worker) = self.workers.get_mut(worker_id) {
worker.runtime = Some(runtime);
Expand All @@ -110,6 +129,9 @@ impl WorkerRegistry {
worker.os = os;
}
worker.telemetry = telemetry;
if pid.is_some() {
worker.pid = pid;
}
}
}

Expand Down Expand Up @@ -181,6 +203,7 @@ pub struct Worker {
pub ip_address: Option<String>,
pub status: WorkerStatus,
pub telemetry: Option<WorkerTelemetryMeta>,
pub pid: Option<u32>,
}

impl Worker {
Expand All @@ -200,6 +223,7 @@ impl Worker {
ip_address: None,
status: WorkerStatus::Connected,
telemetry: None,
pid: None,
}
}

Expand All @@ -219,6 +243,7 @@ impl Worker {
ip_address: Some(ip_address),
status: WorkerStatus::Connected,
telemetry: None,
pid: None,
}
}

Expand Down Expand Up @@ -288,7 +313,6 @@ impl Worker {
self.invocations.write().await.remove(invocation_id);
}
}

#[cfg(test)]
mod tests {
use serde_json::json;
Expand All @@ -300,6 +324,12 @@ mod tests {
Worker::new(tx)
}

#[test]
fn worker_pid_defaults_to_none() {
let worker = make_worker();
assert!(worker.pid.is_none());
}

#[test]
fn worker_telemetry_meta_debug_includes_all_fields() {
let telemetry = WorkerTelemetryMeta {
Expand Down Expand Up @@ -373,6 +403,13 @@ mod tests {
assert_eq!(worker.invocation_count().await, 0);
}

#[test]
fn unregister_worker_does_not_panic_with_unknown_id() {
crate::modules::observability::metrics::ensure_default_meter();
let registry = WorkerRegistry::new();
registry.unregister_worker(&Uuid::new_v4());
}

#[tokio::test]
async fn worker_registry_registers_updates_and_unregisters_workers() {
crate::modules::observability::metrics::ensure_default_meter();
Expand All @@ -395,6 +432,7 @@ mod tests {
Some("worker-a".to_string()),
Some("linux".to_string()),
Some(telemetry.clone()),
None,
);
registry.update_worker_status(&worker_id, WorkerStatus::Busy);
registry.update_worker_status(&Uuid::new_v4(), WorkerStatus::Available);
Expand All @@ -405,6 +443,7 @@ mod tests {
assert_eq!(stored.name.as_deref(), Some("worker-a"));
assert_eq!(stored.os.as_deref(), Some("linux"));
assert_eq!(stored.status, WorkerStatus::Busy);
assert!(stored.pid.is_none());
assert_eq!(
serde_json::to_value(stored.telemetry).expect("serialize telemetry"),
json!(telemetry)
Expand All @@ -415,4 +454,26 @@ mod tests {
assert!(registry.get_worker(&worker_id).is_none());
assert!(registry.list_workers().is_empty());
}

#[test]
fn update_worker_metadata_stores_pid() {
crate::modules::observability::metrics::ensure_default_meter();
let registry = WorkerRegistry::new();
let worker = make_worker();
let worker_id = worker.id;
registry.register_worker(worker);

registry.update_worker_metadata(
&worker_id,
"node".to_string(),
Some("18.0.0".to_string()),
None,
None,
None,
Some(1234u32),
);

let stored = registry.get_worker(&worker_id).expect("worker exists");
assert_eq!(stored.pid, Some(1234u32));
}
}
62 changes: 62 additions & 0 deletions engine/tests/invocation_extensibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,65 @@ async fn test_engine_function_registration() {

assert!(engine.functions.get("another.test").is_some());
}

#[tokio::test]
async fn worker_pid_is_stored_and_listed() {
use iii::{
engine::Outbound,
modules::{
module::Module, observability::metrics::ensure_default_meter, worker::WorkerModule,
},
workers::Worker,
};

ensure_default_meter();
let engine = Arc::new(Engine::new());

let worker_module = WorkerModule::create(engine.clone(), None)
.await
.expect("create WorkerModule");
worker_module
.initialize()
.await
.expect("initialize WorkerModule");
worker_module.register_functions(engine.clone());

// Simulate worker connecting
let (tx, _rx) = tokio::sync::mpsc::channel::<Outbound>(8);
let worker = Worker::new(tx);
let worker_id = worker.id.to_string();
engine.worker_registry.register_worker(worker);

// Register with pid
engine
.call(
"engine::workers::register",
serde_json::json!({
"_caller_worker_id": worker_id,
"runtime": "node",
"version": "20.0.0",
"pid": 42000u32,
}),
)
.await
.expect("register call should succeed");

// List workers and verify pid is present
let list_result = engine
.call("engine::workers::list", serde_json::json!({}))
.await
.expect("list call succeeds")
.expect("result is Some");

let workers = list_result
.get("workers")
.and_then(|v| v.as_array())
.expect("workers array");

let found = workers
.iter()
.find(|w| w.get("id").and_then(|v| v.as_str()) == Some(worker_id.as_str()))
.expect("worker in list");

assert_eq!(found.get("pid").and_then(|v| v.as_u64()), Some(42000u64));
}
1 change: 1 addition & 0 deletions frameworks/motia/motia-py/playground/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ dependencies = [

[tool.uv.sources]
motia = { path = "../packages/motia" }
iii-sdk = { path = "../../../../sdk/packages/python/iii" }
2 changes: 1 addition & 1 deletion frameworks/motia/motia-py/playground/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sdk/packages/node/iii/src/iii.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ class Sdk implements ISdk {
version: SDK_VERSION,
name: this.workerName,
os: getOsInfo(),
pid: process.pid,
telemetry: {
language,
project_name: telemetryOpts?.project_name,
Expand Down
1 change: 1 addition & 0 deletions sdk/packages/python/iii/src/iii/iii.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ def _get_worker_metadata(self) -> dict[str, Any]:
"version": sdk_version,
"name": worker_name,
"os": f"{platform.system()} {platform.release()} ({platform.machine()})",
"pid": os.getpid(),
"telemetry": telemetry,
}

Expand Down
3 changes: 3 additions & 0 deletions sdk/packages/rust/iii/src/iii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct WorkerMetadata {
pub name: String,
pub os: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub telemetry: Option<WorkerTelemetryMeta>,
}

Expand Down Expand Up @@ -135,6 +137,7 @@ impl Default for WorkerMetadata {
version: SDK_VERSION.to_string(),
name: format!("{}:{}", hostname, pid),
os: os_info,
pid: Some(pid),
telemetry: Some(WorkerTelemetryMeta {
language,
..Default::default()
Expand Down