Skip to content

Commit d45cd1e

Browse files
committed
2 parents 5e75fc9 + 091d270 commit d45cd1e

File tree

7 files changed

+137
-45
lines changed

7 files changed

+137
-45
lines changed

cli/src/daemon/server.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ use axum::{
1919
Json, Router,
2020
};
2121

22-
/// Maximum number of retries when attempting to establish the Temporal connection.
23-
/// Previously this was an inline magic number (`30`) in the connection retry loop.
24-
const TEMPORAL_CONNECTION_MAX_RETRIES: i32 = 30;
2522
use std::sync::Arc;
2623

2724
// Type alias for repository tuple to avoid clippy "very complex type" lint
@@ -80,11 +77,7 @@ fn default_local_host_mount_point() -> String {
8077
return path;
8178
}
8279

83-
let default_path = PathBuf::from("/")
84-
.join("var")
85-
.join("lib")
86-
.join("aegis")
87-
.join("local-host-volumes");
80+
let default_path = PathBuf::from("/var/lib/aegis/local-host-volumes");
8881
default_path.to_string_lossy().into_owned()
8982
}
9083

@@ -618,6 +611,10 @@ pub async fn start_daemon(config_path: Option<PathBuf>, port: u16) -> Result<()>
618611
let temporal_address_clone = temporal_address.clone();
619612
let worker_http_endpoint_clone = worker_http_endpoint.clone();
620613

614+
/// Maximum number of retries when attempting to establish the Temporal connection.
615+
/// Previously this was an inline magic number (`30`) in the connection retry loop.
616+
const TEMPORAL_CONNECTION_MAX_RETRIES: i32 = 30;
617+
621618
// Spawn background task to connect
622619
tokio::spawn(async move {
623620
let mut retries: i32 = 0;
@@ -959,7 +956,7 @@ pub async fn start_daemon(config_path: Option<PathBuf>, port: u16) -> Result<()>
959956
Arc::new(manager)
960957
}
961958
Err(e) => {
962-
return Err(anyhow::anyhow!("Failed to initialise OpenBao secrets manager: {}", e));
959+
return Err(anyhow::anyhow!("Failed to initialize OpenBao secrets manager: {}", e));
963960
}
964961
}
965962
}
@@ -1651,16 +1648,26 @@ async fn stream_events_handler(
16511648

16521649
// Execution Terminal State
16531650
if let Some(ended_at) = execution.ended_at {
1651+
/// Extract the final output from the last iteration of an execution.
1652+
///
1653+
/// Returns `None` if the execution has no iterations or if the last iteration
1654+
/// produced no output.
1655+
fn final_output_from_execution(
1656+
execution: &aegis_orchestrator_core::domain::execution::Execution,
1657+
) -> Option<String> {
1658+
execution
1659+
.iterations()
1660+
.last()
1661+
.and_then(|i| i.output.clone())
1662+
}
1663+
16541664
match execution.status {
16551665
aegis_orchestrator_core::domain::execution::ExecutionStatus::Completed => {
16561666
// NOTE: The Execution struct does not expose an explicit `final_output` field.
16571667
// By convention, we treat the last iteration's `output` (if any) as the final result.
16581668
// If no such output exists, we surface `null` and keep the semantics explicit instead of
16591669
// silently defaulting to an empty string.
1660-
let final_output = execution
1661-
.iterations()
1662-
.last()
1663-
.and_then(|i| i.output.clone());
1670+
let final_output = final_output_from_execution(&execution);
16641671

16651672
let exec_end = serde_json::json!({
16661673
"event_type": "ExecutionCompleted",
@@ -1932,7 +1939,7 @@ async fn stream_agent_events_handler(
19321939
if let Some(ended_at) = execution.ended_at {
19331940
match execution.status {
19341941
aegis_orchestrator_core::domain::execution::ExecutionStatus::Completed => {
1935-
let result = execution.iterations().last().and_then(|i| i.output.clone()).unwrap_or_default();
1942+
let result = execution.iterations().last().and_then(|i| i.output.clone());
19361943
let exec_end = serde_json::json!({
19371944
"event_type": "ExecutionCompleted",
19381945
"execution_id": execution.id.0,

docker/runtime-entrypoint.sh

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,17 @@ else
1111
fi
1212
if [ -n "$SOCK_GID" ]; then
1313
if ! getent group "$SOCK_GID" >/dev/null 2>&1; then
14-
groupadd -f -g "$SOCK_GID" hostdocker >/dev/null 2>&1 || true
14+
if ! groupadd -f -g "$SOCK_GID" hostdocker >/dev/null 2>&1; then
15+
# If group creation failed, re-check whether a group with this GID now exists.
16+
if ! getent group "$SOCK_GID" >/dev/null 2>&1; then
17+
echo "Failed to create group with GID $SOCK_GID for Docker socket access" >&2
18+
exit 1
19+
fi
20+
fi
21+
fi
22+
if ! usermod -aG "$SOCK_GID" aegis >/dev/null 2>&1; then
23+
echo "Warning: failed to add user 'aegis' to group GID ${SOCK_GID}; Docker socket access may not work." >&2
1524
fi
16-
usermod -aG "$SOCK_GID" aegis >/dev/null 2>&1 || true
1725
fi
1826

1927
if [ "$#" -eq 0 ]; then

orchestrator/core/src/application/tool_invocation_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl ToolInvocationService {
118118
envelope: &impl EnvelopeVerifier,
119119
) -> Result<Value, SmcpSessionError> {
120120
// 1. Get active session for agent
121-
let session = self
121+
let mut session = self
122122
.smcp_session_repo
123123
.find_active_by_agent(agent_id)
124124
.await
@@ -130,7 +130,7 @@ impl ToolInvocationService {
130130
))?;
131131

132132
// 2. Middleware verifies signature and evaluates against SecurityContext
133-
let args = self.smcp_middleware.verify_and_unwrap(&session, envelope)?;
133+
let args = self.smcp_middleware.verify_and_unwrap(&mut session, envelope)?;
134134
let tool_name = envelope
135135
.extract_tool_name()
136136
.ok_or(SmcpSessionError::MalformedPayload(

orchestrator/core/src/domain/smcp_session.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
// SPDX-License-Identifier: AGPL-3.0
33
//! # SMCP Session Aggregate (BC-12, ADR-035)
44
//!
5+
//! `BC-12` refers to the bounded context that owns SMCP session lifecycle logic, and
6+
//! `ADR-035` is the architecture decision record that defines the design of this
7+
//! aggregate and its invariants (see the project's architecture decision records).
8+
//!
59
//! Domain model for the **Secure Model Context Protocol** session lifecycle.
610
//! Each agent execution that uses MCP tools goes through an attestation handshake
711
//! (see [`crate::application::attestation_service`]) to receive a [`SmcpSession`],
@@ -53,6 +57,11 @@ pub struct SessionId(pub Uuid);
5357

5458
impl SessionId {
5559
/// Generate a new random session ID.
60+
///
61+
/// This uses a UUID v4 and relies on its statistical uniqueness; the probability of
62+
/// collision is negligible for realistic volumes of sessions. Any additional collision
63+
/// handling (for example, enforcing a unique constraint at the persistence layer) is
64+
/// expected to be performed outside this constructor.
5665
pub fn new() -> Self {
5766
Self(Uuid::new_v4())
5867
}
@@ -206,7 +215,7 @@ impl SmcpSession {
206215
security_context,
207216
status: SessionStatus::Active,
208217
created_at: now,
209-
expires_at: now + chrono::Duration::hours(SESSION_TTL_HOURS),
218+
expires_at: now + chrono::TimeDelta::hours(SESSION_TTL_HOURS),
210219
}
211220
}
212221

@@ -232,7 +241,7 @@ impl SmcpSession {
232241
/// This is the **single enforcement point** for all SMCP policy checks. Every
233242
/// tool call from any agent must pass through this method before being forwarded
234243
/// to the MCP server. See ADR-035 §4 (Enforcement Architecture).
235-
pub fn evaluate_call(&self, envelope: &impl EnvelopeVerifier) -> Result<(), SmcpSessionError> {
244+
pub fn evaluate_call(&mut self, envelope: &impl EnvelopeVerifier) -> Result<(), SmcpSessionError> {
236245
let now = Utc::now();
237246

238247
// 1. Check session is active
@@ -242,6 +251,9 @@ impl SmcpSession {
242251

243252
// 2. Check not expired
244253
if now > self.expires_at {
254+
// Once the session is past its expiry time, transition it to a terminal
255+
// `Expired` state so that future calls observe a consistent status.
256+
self.status = SessionStatus::Expired;
245257
return Err(SmcpSessionError::SessionExpired);
246258
}
247259

orchestrator/core/src/infrastructure/repositories/postgres_execution.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,13 @@ impl ExecutionRepository for PostgresExecutionRepository {
206206
// NOTE: Only `parent_execution_id` is persisted, so we cannot
207207
// reliably reconstruct full multi-level ancestry here without
208208
// additional queries. We therefore preserve the direct parent link
209-
// but treat depth/path as local to this execution.
209+
// and use a minimal non-zero depth to indicate that this execution
210+
// is at least one level below some root. The true depth and full
211+
// path may be greater and cannot be reconstructed without further
212+
// queries.
210213
ExecutionHierarchy {
211214
parent_execution_id: Some(ExecutionId(parent_id)),
212-
depth: 0,
215+
depth: 1,
213216
path: vec![ExecutionId(id)],
214217
}
215218
}
@@ -301,7 +304,12 @@ impl ExecutionRepository for PostgresExecutionRepository {
301304
}?;
302305

303306
let input: ExecutionInput =
304-
serde_json::from_value(input_val).map_err(RepositoryError::from)?;
307+
serde_json::from_value(input_val).map_err(|e| {
308+
RepositoryError::Serialization(format!(
309+
"Failed to deserialize execution input: {}",
310+
e
311+
))
312+
})?;
305313
let iterations: Vec<Iteration> =
306314
serde_json::from_value(iterations_val).map_err(|e| {
307315
RepositoryError::Serialization(format!(
@@ -320,7 +328,7 @@ impl ExecutionRepository for PostgresExecutionRepository {
320328
let hierarchy = match parent_execution_id {
321329
Some(parent_id) => ExecutionHierarchy {
322330
parent_execution_id: Some(ExecutionId(parent_id)),
323-
depth: 0,
331+
depth: 1,
324332
path: vec![ExecutionId(id)],
325333
},
326334
None => ExecutionHierarchy::root(ExecutionId(id)),
@@ -404,7 +412,12 @@ impl ExecutionRepository for PostgresExecutionRepository {
404412
}?;
405413

406414
let input: ExecutionInput =
407-
serde_json::from_value(input_val).map_err(RepositoryError::from)?;
415+
serde_json::from_value(input_val).map_err(|e| {
416+
RepositoryError::Serialization(format!(
417+
"Failed to deserialize execution input: {}",
418+
e
419+
))
420+
})?;
408421
let iterations: Vec<Iteration> =
409422
serde_json::from_value(iterations_val).map_err(|e| {
410423
RepositoryError::Serialization(format!(
@@ -423,7 +436,7 @@ impl ExecutionRepository for PostgresExecutionRepository {
423436
let hierarchy = match parent_execution_id {
424437
Some(parent_id) => ExecutionHierarchy {
425438
parent_execution_id: Some(ExecutionId(parent_id)),
426-
depth: 0,
439+
depth: 1,
427440
path: vec![ExecutionId(id)],
428441
},
429442
None => ExecutionHierarchy::root(ExecutionId(id)),

orchestrator/core/src/infrastructure/smcp/middleware.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl SmcpMiddleware {
5757
/// the tool server, preserving credential isolation (ADR-033).
5858
pub fn verify_and_unwrap(
5959
&self,
60-
session: &SmcpSession,
60+
session: &mut SmcpSession,
6161
envelope: &impl EnvelopeVerifier,
6262
) -> Result<Value, SmcpSessionError> {
6363
info!("Verifying SMCP envelope for session {}", session.id);

orchestrator/core/src/infrastructure/temporal_event_listener.rs

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ use serde::{Deserialize, Serialize};
6666
use std::sync::Arc;
6767
use uuid::Uuid;
6868

69+
/// Canonical file name used to store validation feedback artifacts produced by refinement
70+
/// workflows. This is consumed by the `RefinementApplied` event handler, which expects any
71+
/// validation feedback generated during a refinement iteration to be written under this
72+
/// name within the associated artifact set or storage location.
6973
const VALIDATION_FEEDBACK_FILE_NAME: &str = "validation_feedback";
7074

7175
/// External event payload from Temporal worker
@@ -294,6 +298,47 @@ impl TemporalEventListener {
294298
}
295299
}
296300

301+
/// Persist an execution-scoped event to the repository and publish it to the event bus.
302+
///
303+
/// This helper encapsulates the two-step pattern used for all execution events:
304+
/// 1. Serialise the raw payload and append it to the event log via the repository.
305+
/// 2. Publish the mapped domain event to the in-process event bus for subscribers.
306+
///
307+
/// # Arguments
308+
///
309+
/// * `execution_id` - The execution this event belongs to.
310+
/// * `temporal_sequence_number` - The Temporal sequence number for ordering.
311+
/// * `event_type` - The string event type name.
312+
/// * `raw_payload` - The original Temporal payload to persist as JSON.
313+
/// * `iteration_number` - Optional iteration number associated with this event.
314+
/// * `domain_event` - The mapped domain event to publish after persistence.
315+
async fn persist_and_publish_execution_event(
316+
&self,
317+
execution_id: ExecutionId,
318+
temporal_sequence_number: i64,
319+
event_type: String,
320+
raw_payload: &TemporalEventPayload,
321+
iteration_number: Option<u8>,
322+
domain_event: crate::domain::events::ExecutionEvent,
323+
) -> Result<()> {
324+
let serialized_payload = serde_json::to_value(raw_payload)
325+
.context("Failed to serialize TemporalEventPayload for persistence")?;
326+
327+
self.execution_repository
328+
.append_event(
329+
execution_id,
330+
temporal_sequence_number,
331+
event_type,
332+
serialized_payload,
333+
iteration_number,
334+
)
335+
.await
336+
.context("Failed to persist execution event")?;
337+
338+
self.event_bus.publish_execution_event(domain_event);
339+
Ok(())
340+
}
341+
297342
/// Process incoming event from Temporal worker
298343
///
299344
/// # Arguments
@@ -331,18 +376,22 @@ impl TemporalEventListener {
331376
.ok_or_else(|| anyhow!("Missing code_diff for RefinementApplied event"))?;
332377
let diff_str = match diff_val {
333378
serde_json::Value::String(s) => s,
334-
_ => diff_val.to_string(),
379+
other => {
380+
return Err(anyhow!(
381+
"Invalid code_diff format for RefinementApplied event: expected string, got {}",
382+
other
383+
));
384+
}
335385
};
336386

337387
let code_diff = crate::domain::execution::CodeDiff {
338388
file_path: VALIDATION_FEEDBACK_FILE_NAME.to_string(),
339389
diff: diff_str,
340390
};
341391

342-
let applied_at = match DateTime::parse_from_rfc3339(&payload.timestamp) {
343-
Ok(dt) => dt.with_timezone(&Utc),
344-
Err(_) => Utc::now(),
345-
};
392+
let applied_at = DateTime::parse_from_rfc3339(&payload.timestamp)
393+
.context("Failed to parse timestamp as RFC3339 for RefinementApplied event")?
394+
.with_timezone(&Utc);
346395

347396
let domain_event = crate::domain::events::ExecutionEvent::RefinementApplied {
348397
execution_id,
@@ -352,18 +401,16 @@ impl TemporalEventListener {
352401
applied_at,
353402
};
354403

355-
self.execution_repository
356-
.append_event(
357-
execution_id,
358-
payload.temporal_sequence_number,
359-
payload.event_type.clone(),
360-
serde_json::to_value(&payload)?,
361-
Some(iteration_number),
362-
)
363-
.await
364-
.context("Failed to persist execution event")?;
365-
366-
self.event_bus.publish_execution_event(domain_event);
404+
self.persist_and_publish_execution_event(
405+
execution_id,
406+
payload.temporal_sequence_number,
407+
payload.event_type.clone(),
408+
&payload,
409+
Some(iteration_number),
410+
domain_event,
411+
)
412+
.await?;
413+
367414
return Ok(payload.execution_id.clone());
368415
}
369416

@@ -389,7 +436,12 @@ impl TemporalEventListener {
389436
| WorkflowEvent::WorkflowExecutionCompleted { execution_id, .. }
390437
| WorkflowEvent::WorkflowExecutionFailed { execution_id, .. }
391438
| WorkflowEvent::WorkflowExecutionCancelled { execution_id, .. } => *execution_id,
392-
WorkflowEvent::WorkflowRegistered { .. } => unreachable!("handled above"),
439+
WorkflowEvent::WorkflowRegistered { .. } => {
440+
return Err(anyhow!(
441+
"WorkflowRegistered event unexpectedly reached execution-scoped handling; \
442+
this variant should be handled before deriving an execution_id"
443+
));
444+
}
393445
};
394446

395447
let execution_id_str = execution_id_obj.0.to_string();

0 commit comments

Comments
 (0)