Skip to content

Commit ccb586f

Browse files
committed
fill a2a gaps
1 parent 509bf73 commit ccb586f

File tree

11 files changed

+2303
-15
lines changed

11 files changed

+2303
-15
lines changed

src/adapters/mcp/a2a_http.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ impl A2AErrorCode {
8989
pub enum A2ATaskState {
9090
Submitted,
9191
Working,
92+
#[serde(rename = "input-required")]
9293
InputRequired,
9394
Completed,
9495
Failed,
@@ -242,6 +243,7 @@ pub struct PushNotificationConfig {
242243
/// Artifact structure.
243244
#[derive(Debug, Clone, Serialize, Deserialize)]
244245
pub struct A2AArtifact {
246+
#[serde(rename = "artifactId")]
245247
pub id: String,
246248
pub name: String,
247249
#[serde(skip_serializing_if = "Option::is_none")]
@@ -522,7 +524,7 @@ pub struct A2AState {
522524
/// Registered agent cards.
523525
pub agent_cards: RwLock<HashMap<String, A2AAgentCard>>,
524526
/// In-memory tasks.
525-
pub tasks: RwLock<HashMap<String, InMemoryTask>>,
527+
pub tasks: Arc<RwLock<HashMap<String, InMemoryTask>>>,
526528
/// Sessions.
527529
pub sessions: RwLock<HashMap<String, A2ASession>>,
528530
/// Messages between agents.
@@ -533,18 +535,22 @@ pub struct A2AState {
533535
pub config: A2AHttpConfig,
534536
/// Optional federation service for inter-swarm communication.
535537
pub federation_service: Option<Arc<crate::services::federation::FederationService>>,
538+
/// Handle for the convergence publisher daemon (lazily spawned).
539+
pub convergence_publisher_handle:
540+
RwLock<Option<crate::services::federation::convergence_publisher::ConvergencePublisherHandle>>,
536541
}
537542

538543
impl A2AState {
539544
pub fn new(config: A2AHttpConfig) -> Self {
540545
Self {
541546
agent_cards: RwLock::new(HashMap::new()),
542-
tasks: RwLock::new(HashMap::new()),
547+
tasks: Arc::new(RwLock::new(HashMap::new())),
543548
sessions: RwLock::new(HashMap::new()),
544549
messages: RwLock::new(HashMap::new()),
545550
delegations: RwLock::new(HashMap::new()),
546551
config,
547552
federation_service: None,
553+
convergence_publisher_handle: RwLock::new(None),
548554
}
549555
}
550556

@@ -1416,6 +1422,21 @@ async fn handle_federation_routing(
14161422
tasks.insert(task_id.clone(), local_task);
14171423
}
14181424

1425+
// Lazily spawn the convergence publisher on first goal_delegate.
1426+
{
1427+
let mut publisher_guard = state.convergence_publisher_handle.write().await;
1428+
if publisher_guard.is_none() {
1429+
use crate::services::federation::convergence_publisher::ConvergencePublisher;
1430+
let publisher = ConvergencePublisher::new(
1431+
state.tasks.clone(),
1432+
std::time::Duration::from_secs(10),
1433+
);
1434+
let handle = publisher.spawn();
1435+
*publisher_guard = Some(handle);
1436+
tracing::info!("Spawned convergence publisher for goal_delegate tasks");
1437+
}
1438+
}
1439+
14191440
let response_task = A2ATask {
14201441
id: task_id.clone(),
14211442
session_id,

0 commit comments

Comments
 (0)