Skip to content

Commit f3e3b9e

Browse files
committed
Add ACP IO Nats Bridge
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 3107329 commit f3e3b9e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+5023
-30
lines changed

rsworkspace/Cargo.lock

Lines changed: 1424 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.3"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
trogon-std = { path = "../trogon-std" }
13+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros"] }
14+
tracing = "0.1.44"
15+
16+
[dev-dependencies]
17+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# ACP NATS Bridge
2+
3+
A Rust bridge that connects ACP (Agent Client Protocol) clients to NATS-based backends.
4+
5+
```mermaid
6+
graph LR
7+
A[IDE] <-->|stdio| B[acp-nats-bridge]
8+
B <-->|NATS| C[Backend]
9+
10+
style A fill:#e1f5ff,stroke:#0288d1
11+
style B fill:#fff3e0,stroke:#f57c00
12+
style C fill:#f3e5f5,stroke:#7b1fa2
13+
```
14+
15+
## Features
16+
17+
- Bidirectional ACP Bridge with request forwarding
18+
- Auto-reconnect with exponential backoff
19+
- OpenTelemetry integration (logs, metrics, traces)
20+
- Graceful shutdown (SIGINT/SIGTERM)
21+
- Custom prefix support for multi-tenancy
22+
23+
## Quick Start
24+
25+
```bash
26+
# Prerequisites: NATS server running
27+
docker run -p 4222:4222 nats:latest
28+
29+
# Build
30+
cargo build --release
31+
32+
# Run
33+
./target/release/acp-nats-stdio
34+
```
35+
36+
## Configuration
37+
38+
Configure via environment variables:
39+
40+
- `NATS_URL` - NATS server URL(s). Single server: `localhost:4222` or multiple for failover: `localhost:4222,localhost:4223,localhost:4224` (default: `localhost:4222`)
41+
- `NATS_USER` - NATS username (optional)
42+
- `NATS_PASSWORD` - NATS password (optional)
43+
- `NATS_TOKEN` - NATS token (optional)
44+
- `CUSTOM_PREFIX` - Custom subject prefix for multi-tenancy (optional)
45+
- `OTEL_EXPORTER_OTLP_ENDPOINT` - OpenTelemetry collector endpoint (optional)
46+
47+
See code documentation for additional configuration options.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use acp_nats::{Config, NatsConfig};
2+
use clap::Parser;
3+
use trogon_std::env::ReadEnv;
4+
5+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
6+
const DEFAULT_ACP_PREFIX: &str = "acp";
7+
8+
#[derive(Parser, Debug)]
9+
#[command(name = "acp-nats-stdio")]
10+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
11+
pub struct Args {
12+
#[arg(long = "acp-prefix", env = ENV_ACP_PREFIX)]
13+
pub acp_prefix: Option<String>,
14+
}
15+
16+
pub fn from_env_with_provider<E: ReadEnv>(env_provider: &E) -> Config {
17+
let args = Args::parse();
18+
Config::new(
19+
args.acp_prefix
20+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string()),
21+
NatsConfig::from_env(env_provider),
22+
)
23+
}
24+
25+
#[cfg(test)]
26+
mod tests {
27+
use super::*;
28+
use trogon_std::env::InMemoryEnv;
29+
30+
#[test]
31+
fn test_default_config() {
32+
let env = InMemoryEnv::new();
33+
let config = from_env_with_provider(&env);
34+
assert_eq!(config.acp_prefix, DEFAULT_ACP_PREFIX);
35+
assert_eq!(config.nats.servers, vec!["localhost:4222"]);
36+
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::None));
37+
}
38+
39+
#[test]
40+
fn test_nats_config_from_env() {
41+
let env = InMemoryEnv::new();
42+
env.set("NATS_URL", "host1:4222,host2:4222");
43+
env.set("NATS_TOKEN", "my-token");
44+
let config = from_env_with_provider(&env);
45+
assert_eq!(config.nats.servers, vec!["host1:4222", "host2:4222"]);
46+
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
47+
}
48+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
mod config;
2+
mod signal;
3+
4+
use acp_nats::{agent::Bridge, client, nats, otel};
5+
use agent_client_protocol::AgentSideConnection;
6+
use async_nats::Client as NatsAsyncClient;
7+
use std::sync::Arc;
8+
use tracing::{error, info};
9+
use trogon_std::env::SystemEnv;
10+
11+
#[tokio::main]
12+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
13+
let config = config::from_env_with_provider(&SystemEnv);
14+
otel::init_logger(&config)?;
15+
16+
let log_file = otel::get_log_file_path()?;
17+
info!(log_file = %log_file.display(), "ACP bridge starting");
18+
19+
let nats_client = match nats::connect(&config.nats).await {
20+
Ok(client) => Some(client),
21+
Err(e) => {
22+
error!(error = %e, "Failed to connect to NATS");
23+
None
24+
}
25+
};
26+
let local = tokio::task::LocalSet::new();
27+
28+
local
29+
.run_until(async {
30+
let stdin = async_compat::Compat::new(tokio::io::stdin());
31+
let stdout = async_compat::Compat::new(tokio::io::stdout());
32+
33+
let bridge = Arc::new(Bridge::<NatsAsyncClient>::new(
34+
nats_client.clone(),
35+
config.acp_prefix.clone(),
36+
));
37+
38+
let (connection, io_task) =
39+
AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
40+
tokio::task::spawn_local(fut);
41+
});
42+
43+
let connection = Arc::new(connection);
44+
45+
if let Some(nats_instance) = nats_client {
46+
let client_connection = connection.clone();
47+
let bridge_for_client = bridge.clone();
48+
tokio::task::spawn_local(async move {
49+
client::run::<NatsAsyncClient, _>(
50+
nats_instance,
51+
client_connection,
52+
bridge_for_client,
53+
)
54+
.await;
55+
});
56+
info!("ACP bridge running on stdio with NATS client proxy");
57+
} else {
58+
info!("ACP bridge running on stdio (no NATS)");
59+
}
60+
61+
tokio::select! {
62+
result = io_task => {
63+
if let Err(e) = result {
64+
error!(error = %e, "IO task error");
65+
}
66+
info!("ACP bridge shutting down (IO closed)");
67+
}
68+
_ = signal::shutdown_signal() => {
69+
info!("ACP bridge shutting down (signal received)");
70+
}
71+
}
72+
})
73+
.await;
74+
75+
info!("Flushing telemetry...");
76+
otel::shutdown_otel().await;
77+
info!("ACP bridge stopped");
78+
79+
Ok(())
80+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use tokio::signal;
2+
use tracing::warn;
3+
4+
pub async fn shutdown_signal() {
5+
let ctrl_c = async {
6+
signal::ctrl_c()
7+
.await
8+
.expect("Failed to install Ctrl+C handler");
9+
};
10+
11+
#[cfg(unix)]
12+
let terminate = async {
13+
signal::unix::signal(signal::unix::SignalKind::terminate())
14+
.expect("Failed to install SIGTERM handler")
15+
.recv()
16+
.await;
17+
};
18+
19+
#[cfg(not(unix))]
20+
let terminate = std::future::pending::<()>();
21+
22+
tokio::select! {
23+
_ = ctrl_c => {
24+
warn!("Received SIGINT (Ctrl+C)");
25+
}
26+
_ = terminate => {
27+
warn!("Received SIGTERM");
28+
}
29+
}
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[package]
2+
name = "acp-nats"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
agent-client-protocol = "0.9.3"
8+
async-nats = "0.45.0"
9+
async-trait = "0.1.89"
10+
bytes = "1"
11+
futures = "0.3.31"
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics", "testing"] }
16+
serde = { version = "1.0.228", features = ["derive"] }
17+
serde_json = "1.0.149"
18+
tokio = { version = "1.49.0", features = ["process", "io-util", "io-std", "rt-multi-thread", "macros", "sync", "signal"] }
19+
tracing = "0.1.44"
20+
tracing-opentelemetry = "0.32.1"
21+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
22+
uuid = { version = "1.16", features = ["v4"] }
23+
24+
# Shared NATS infrastructure
25+
trogon-nats = { path = "../trogon-nats" }
26+
27+
[dev-dependencies]
28+
trogon-nats = { path = "../trogon-nats", features = ["test-support"] }
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use super::Bridge;
2+
use crate::nats::{self, agent, FlushClient, PublishClient, RequestClient, SubscribeClient};
3+
use agent_client_protocol::{AuthenticateRequest, AuthenticateResponse, Error, Result};
4+
use std::time::Instant;
5+
use tracing::{info, instrument};
6+
7+
#[instrument(
8+
name = "acp.authenticate",
9+
skip(bridge, args),
10+
fields(method_id = %args.method_id)
11+
)]
12+
pub async fn handle<N: SubscribeClient + RequestClient + PublishClient + FlushClient>(
13+
bridge: &Bridge<N>,
14+
args: AuthenticateRequest,
15+
) -> Result<AuthenticateResponse> {
16+
let start = Instant::now();
17+
18+
info!(method_id = %args.method_id, "Authenticate request");
19+
20+
let nats = bridge.require_nats()?;
21+
22+
let result = nats::request::<N, AuthenticateRequest, AuthenticateResponse>(
23+
nats,
24+
&agent::authenticate(&bridge.acp_prefix),
25+
&args,
26+
)
27+
.await
28+
.map_err(|e| Error::new(-32603, e.to_string()));
29+
30+
bridge.metrics.record_request(
31+
"authenticate",
32+
start.elapsed().as_secs_f64(),
33+
result.is_ok(),
34+
);
35+
36+
result
37+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use super::Bridge;
2+
use crate::nats::{self, agent, FlushClient, PublishClient, RequestClient, SubscribeClient};
3+
use agent_client_protocol::{CancelNotification, Result};
4+
use std::time::Instant;
5+
use tracing::{info, instrument, warn};
6+
7+
#[instrument(
8+
name = "acp.session.cancel",
9+
skip(bridge, args),
10+
fields(session_id = %args.session_id)
11+
)]
12+
pub async fn handle<N: SubscribeClient + RequestClient + PublishClient + FlushClient>(bridge: &Bridge<N>, args: CancelNotification) -> Result<()> {
13+
let start = Instant::now();
14+
15+
info!(session_id = %args.session_id, "Cancel notification");
16+
17+
bridge
18+
.cancelled_sessions
19+
.mark_cancelled(args.session_id.clone());
20+
21+
if let Some(nats) = &bridge.nats {
22+
let subject = agent::session_cancel(&bridge.acp_prefix, &args.session_id.to_string());
23+
24+
if let Err(e) = nats::publish(nats, &subject, &args, nats::PublishOptions::simple()).await {
25+
warn!(error = %e, session_id = %args.session_id, "Failed to publish cancel notification");
26+
}
27+
}
28+
29+
bridge
30+
.metrics
31+
.record_request("cancel", start.elapsed().as_secs_f64(), true);
32+
33+
Ok(())
34+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use super::Bridge;
2+
use crate::nats::{self, agent, FlushClient, PublishClient, RequestClient, SubscribeClient};
3+
use agent_client_protocol::{Error, ExtRequest, ExtResponse, Result};
4+
use std::time::Instant;
5+
use tracing::{info, instrument};
6+
7+
#[instrument(
8+
name = "acp.ext",
9+
skip(bridge, args),
10+
fields(method = %args.method)
11+
)]
12+
pub async fn handle<N: SubscribeClient + RequestClient + PublishClient + FlushClient>(bridge: &Bridge<N>, args: ExtRequest) -> Result<ExtResponse> {
13+
let start = Instant::now();
14+
15+
info!(method = %args.method, "Extension method request");
16+
17+
let nats = bridge.require_nats()?;
18+
let subject = agent::ext(&bridge.acp_prefix, &args.method);
19+
20+
let result = nats::request::<N, ExtRequest, ExtResponse>(nats, &subject, &args)
21+
.await
22+
.map_err(|e| Error::new(-32603, e.to_string()));
23+
24+
bridge
25+
.metrics
26+
.record_request("ext_method", start.elapsed().as_secs_f64(), result.is_ok());
27+
28+
result
29+
}

0 commit comments

Comments
 (0)