Skip to content

Commit 5f2ba1e

Browse files
committed
Add ACP IO Nats Bridge
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 049b087 commit 5f2ba1e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+5637
-508
lines changed

rsworkspace/Cargo.lock

Lines changed: 1422 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.3"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
trogon-std = { path = "../trogon-std" }
13+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros"] }
14+
tracing = "0.1.44"
15+
16+
[dev-dependencies]
17+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# ACP NATS Bridge
2+
3+
A Rust bridge that connects ACP (Agent Client Protocol) clients to NATS-based backends.
4+
5+
```mermaid
6+
graph LR
7+
A[IDE] <-->|stdio| B[acp-nats-bridge]
8+
B <-->|NATS| C[Backend]
9+
10+
style A fill:#e1f5ff,stroke:#0288d1
11+
style B fill:#fff3e0,stroke:#f57c00
12+
style C fill:#f3e5f5,stroke:#7b1fa2
13+
```
14+
15+
## Features
16+
17+
- Bidirectional ACP Bridge with request forwarding
18+
- Auto-reconnect with exponential backoff
19+
- OpenTelemetry integration (logs, metrics, traces)
20+
- Graceful shutdown (SIGINT/SIGTERM)
21+
- Custom prefix support for multi-tenancy
22+
23+
## Quick Start
24+
25+
```bash
26+
# Prerequisites: NATS server running
27+
docker run -p 4222:4222 nats:latest
28+
29+
# Build
30+
cargo build --release
31+
32+
# Run
33+
./target/release/acp-nats-stdio
34+
```
35+
36+
## Configuration
37+
38+
Configure via environment variables:
39+
40+
- `NATS_URL` - NATS server URL(s). Single server: `localhost:4222` or multiple for failover: `localhost:4222,localhost:4223,localhost:4224` (default: `localhost:4222`)
41+
- `NATS_USER` - NATS username (optional)
42+
- `NATS_PASSWORD` - NATS password (optional)
43+
- `NATS_TOKEN` - NATS token (optional)
44+
- `CUSTOM_PREFIX` - Custom subject prefix for multi-tenancy (optional)
45+
- `OTEL_EXPORTER_OTLP_ENDPOINT` - OpenTelemetry collector endpoint (optional)
46+
47+
See code documentation for additional configuration options.
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use acp_nats::{Config, NatsAuth};
2+
use clap::Parser;
3+
use std::path::PathBuf;
4+
use trogon_std::env::ReadEnv;
5+
6+
#[cfg(test)]
7+
use trogon_std::env::InMemoryEnv;
8+
9+
const ENV_NATS_URL: &str = "NATS_URL";
10+
const ENV_NATS_CREDS: &str = "NATS_CREDS";
11+
const ENV_NATS_NKEY: &str = "NATS_NKEY";
12+
const ENV_NATS_USER: &str = "NATS_USER";
13+
const ENV_NATS_PASSWORD: &str = "NATS_PASSWORD";
14+
const ENV_NATS_TOKEN: &str = "NATS_TOKEN";
15+
const ENV_OTEL_SERVICE_NAME: &str = "OTEL_SERVICE_NAME";
16+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
17+
18+
const DEFAULT_NATS_URL: &str = "localhost:4222";
19+
const DEFAULT_SERVICE_NAME: &str = "acp-nats-stdio";
20+
const DEFAULT_ACP_PREFIX: &str = "acp";
21+
22+
#[derive(Parser, Debug)]
23+
#[command(name = "acp-nats-stdio")]
24+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
25+
pub struct Args {
26+
/// Prefix for NATS subjects. Replaces "acp" as the root token to support multi-tenancy.
27+
#[arg(long = "acp-prefix", env = ENV_ACP_PREFIX)]
28+
pub acp_prefix: Option<String>,
29+
}
30+
31+
pub fn from_env_with_provider<E: ReadEnv>(env_provider: &E) -> Config {
32+
let args = Args::parse();
33+
Config::new(
34+
env_provider
35+
.var(ENV_OTEL_SERVICE_NAME)
36+
.unwrap_or_else(|_| DEFAULT_SERVICE_NAME.to_string()),
37+
args.acp_prefix
38+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string()),
39+
nats_servers_from_provider(env_provider),
40+
nats_auth_from_provider(env_provider),
41+
)
42+
}
43+
44+
fn nats_servers_from_provider<E: ReadEnv>(env_provider: &E) -> Vec<String> {
45+
let servers_str = env_provider
46+
.var(ENV_NATS_URL)
47+
.unwrap_or_else(|_| DEFAULT_NATS_URL.to_string());
48+
servers_str
49+
.split(',')
50+
.map(|s| s.trim().to_string())
51+
.filter(|s| !s.is_empty())
52+
.collect()
53+
}
54+
55+
fn nats_auth_from_provider<E: ReadEnv>(env_provider: &E) -> NatsAuth {
56+
if let Ok(creds_path) = env_provider.var(ENV_NATS_CREDS) {
57+
return NatsAuth::Credentials(PathBuf::from(creds_path));
58+
}
59+
if let Ok(nkey) = env_provider.var(ENV_NATS_NKEY) {
60+
return NatsAuth::NKey(nkey);
61+
}
62+
if let (Ok(user), Ok(password)) = (
63+
env_provider.var(ENV_NATS_USER),
64+
env_provider.var(ENV_NATS_PASSWORD),
65+
) {
66+
return NatsAuth::UserPassword { user, password };
67+
}
68+
if let Ok(token) = env_provider.var(ENV_NATS_TOKEN) {
69+
return NatsAuth::Token(token);
70+
}
71+
NatsAuth::None
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use super::*;
77+
78+
#[test]
79+
fn test_parse_single_server() {
80+
let env = InMemoryEnv::new();
81+
env.set(ENV_NATS_URL, "localhost:4222");
82+
83+
let servers = nats_servers_from_provider(&env);
84+
assert_eq!(servers, vec!["localhost:4222"]);
85+
}
86+
87+
#[test]
88+
fn test_parse_multiple_servers() {
89+
let env = InMemoryEnv::new();
90+
env.set(ENV_NATS_URL, "localhost:4222,localhost:4223,localhost:4224");
91+
92+
let servers = nats_servers_from_provider(&env);
93+
assert_eq!(
94+
servers,
95+
vec!["localhost:4222", "localhost:4223", "localhost:4224"]
96+
);
97+
}
98+
99+
#[test]
100+
fn test_parse_servers_with_spaces() {
101+
let env = InMemoryEnv::new();
102+
env.set(
103+
ENV_NATS_URL,
104+
"localhost:4222 , localhost:4223 , localhost:4224",
105+
);
106+
107+
let servers = nats_servers_from_provider(&env);
108+
assert_eq!(
109+
servers,
110+
vec!["localhost:4222", "localhost:4223", "localhost:4224"]
111+
);
112+
}
113+
114+
#[test]
115+
fn test_default_server() {
116+
let env = InMemoryEnv::new();
117+
118+
let servers = nats_servers_from_provider(&env);
119+
assert_eq!(servers, vec!["localhost:4222"]);
120+
}
121+
122+
#[test]
123+
fn test_nats_auth_credentials() {
124+
let env = InMemoryEnv::new();
125+
env.set(ENV_NATS_CREDS, "/path/to/creds.file");
126+
127+
let auth = nats_auth_from_provider(&env);
128+
assert!(matches!(auth, NatsAuth::Credentials(_)));
129+
}
130+
131+
#[test]
132+
fn test_nats_auth_nkey() {
133+
let env = InMemoryEnv::new();
134+
env.set(
135+
ENV_NATS_NKEY,
136+
"SUAIBDPBAUTWCWBKIO6XHQNINK5FWJW4OHLXC3HQ2KFE4PEJUA44CNHTC4",
137+
);
138+
139+
let auth = nats_auth_from_provider(&env);
140+
assert!(matches!(auth, NatsAuth::NKey(_)));
141+
}
142+
143+
#[test]
144+
fn test_nats_auth_user_password() {
145+
let env = InMemoryEnv::new();
146+
env.set(ENV_NATS_USER, "testuser");
147+
env.set(ENV_NATS_PASSWORD, "testpass");
148+
149+
let auth = nats_auth_from_provider(&env);
150+
assert!(matches!(auth, NatsAuth::UserPassword { .. }));
151+
}
152+
153+
#[test]
154+
fn test_nats_auth_token() {
155+
let env = InMemoryEnv::new();
156+
env.set(ENV_NATS_TOKEN, "test-token");
157+
158+
let auth = nats_auth_from_provider(&env);
159+
assert!(matches!(auth, NatsAuth::Token(_)));
160+
}
161+
162+
#[test]
163+
fn test_nats_auth_none() {
164+
let env = InMemoryEnv::new();
165+
166+
let auth = nats_auth_from_provider(&env);
167+
assert!(matches!(auth, NatsAuth::None));
168+
}
169+
170+
#[test]
171+
fn test_nats_auth_priority_credentials_over_nkey() {
172+
let env = InMemoryEnv::new();
173+
env.set(ENV_NATS_CREDS, "/path/to/creds.file");
174+
env.set(
175+
ENV_NATS_NKEY,
176+
"SUAIBDPBAUTWCWBKIO6XHQNINK5FWJW4OHLXC3HQ2KFE4PEJUA44CNHTC4",
177+
);
178+
179+
let auth = nats_auth_from_provider(&env);
180+
assert!(matches!(auth, NatsAuth::Credentials(_)));
181+
}
182+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
mod config;
2+
mod runtime;
3+
4+
use acp_nats::{agent::Bridge, client, nats, otel};
5+
use agent_client_protocol::AgentSideConnection;
6+
use async_nats::Client as NatsAsyncClient;
7+
use std::sync::Arc;
8+
use tracing::{error, info};
9+
use trogon_std::env::SystemEnv;
10+
11+
#[tokio::main]
12+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
13+
let config = config::from_env_with_provider(&SystemEnv);
14+
otel::init_logger(&config)?;
15+
16+
let log_file = otel::get_log_file_path()?;
17+
info!(log_file = %log_file.display(), "ACP bridge starting");
18+
19+
let nats_client = match nats::connect(&config.nats).await {
20+
Ok(client) => Some(client),
21+
Err(e) => {
22+
error!(error = %e, "Failed to connect to NATS");
23+
None
24+
}
25+
};
26+
let local = tokio::task::LocalSet::new();
27+
28+
local
29+
.run_until(async {
30+
let stdin = async_compat::Compat::new(tokio::io::stdin());
31+
let stdout = async_compat::Compat::new(tokio::io::stdout());
32+
33+
let bridge = Arc::new(Bridge::<NatsAsyncClient>::new(
34+
nats_client.clone(),
35+
config.acp_prefix.clone(),
36+
));
37+
38+
let (connection, io_task) =
39+
AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
40+
tokio::task::spawn_local(fut);
41+
});
42+
43+
let connection = Arc::new(connection);
44+
45+
if let Some(nats_instance) = nats_client {
46+
let client_connection = connection.clone();
47+
let bridge_for_client = bridge.clone();
48+
tokio::task::spawn_local(async move {
49+
client::run::<NatsAsyncClient, _>(
50+
nats_instance,
51+
client_connection,
52+
bridge_for_client,
53+
)
54+
.await;
55+
});
56+
info!("ACP bridge running on stdio with NATS client proxy");
57+
} else {
58+
info!("ACP bridge running on stdio (no NATS)");
59+
}
60+
61+
tokio::select! {
62+
result = io_task => {
63+
if let Err(e) = result {
64+
error!(error = %e, "IO task error");
65+
}
66+
info!("ACP bridge shutting down (IO closed)");
67+
}
68+
_ = runtime::shutdown_signal() => {
69+
info!("ACP bridge shutting down (signal received)");
70+
}
71+
}
72+
})
73+
.await;
74+
75+
info!("Flushing telemetry...");
76+
otel::shutdown_otel().await;
77+
info!("ACP bridge stopped");
78+
79+
Ok(())
80+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use tokio::signal;
2+
use tracing::warn;
3+
4+
pub async fn shutdown_signal() {
5+
let ctrl_c = async {
6+
signal::ctrl_c()
7+
.await
8+
.expect("Failed to install Ctrl+C handler");
9+
};
10+
11+
#[cfg(unix)]
12+
let terminate = async {
13+
signal::unix::signal(signal::unix::SignalKind::terminate())
14+
.expect("Failed to install SIGTERM handler")
15+
.recv()
16+
.await;
17+
};
18+
19+
#[cfg(not(unix))]
20+
let terminate = std::future::pending::<()>();
21+
22+
tokio::select! {
23+
_ = ctrl_c => {
24+
warn!("Received SIGINT (Ctrl+C)");
25+
}
26+
_ = terminate => {
27+
warn!("Received SIGTERM");
28+
}
29+
}
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[package]
2+
name = "acp-nats"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
agent-client-protocol = "0.9.3"
8+
async-nats = "0.45.0"
9+
async-trait = "0.1.89"
10+
bytes = "1"
11+
futures = "0.3.31"
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics", "testing"] }
16+
serde = { version = "1.0.228", features = ["derive"] }
17+
serde_json = "1.0.149"
18+
tokio = { version = "1.49.0", features = ["process", "io-util", "io-std", "rt-multi-thread", "macros", "sync", "signal"] }
19+
tracing = "0.1.44"
20+
tracing-opentelemetry = "0.32.1"
21+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
22+
uuid = { version = "1.16", features = ["v4"] }
23+
24+
# Shared NATS infrastructure
25+
trogon-nats = { path = "../trogon-nats" }
26+
27+
[dev-dependencies]
28+
trogon-nats = { path = "../trogon-nats", features = ["test-support"] }

0 commit comments

Comments
 (0)