Skip to content

Commit 17424fd

Browse files
committed
Add ACP IO Nats Bridge
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 3107329 commit 17424fd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+5461
-31
lines changed

rsworkspace/Cargo.lock

Lines changed: 1427 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.3"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics"] }
16+
trogon-std = { path = "../trogon-std" }
17+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal"] }
18+
tracing = "0.1.44"
19+
tracing-opentelemetry = "0.32.1"
20+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
21+
22+
[dev-dependencies]
23+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# ACP NATS Bridge
2+
3+
A Rust bridge that connects ACP (Agent Client Protocol) clients to NATS-based backends.
4+
5+
```mermaid
6+
graph LR
7+
A[IDE] <-->|stdio| B[acp-nats-bridge]
8+
B <-->|NATS| C[Backend]
9+
10+
style A fill:#e1f5ff,stroke:#0288d1
11+
style B fill:#fff3e0,stroke:#f57c00
12+
style C fill:#f3e5f5,stroke:#7b1fa2
13+
```
14+
15+
## Features
16+
17+
- Bidirectional ACP Bridge with request forwarding
18+
- Auto-reconnect with exponential backoff
19+
- OpenTelemetry integration (logs, metrics, traces)
20+
- Graceful shutdown (SIGINT/SIGTERM)
21+
- Custom prefix support for multi-tenancy
22+
23+
## Quick Start
24+
25+
```bash
26+
# Prerequisites: NATS server running
27+
docker run -p 4222:4222 nats:latest
28+
29+
# Build
30+
cargo build --release
31+
32+
# Run
33+
./target/release/acp-nats-stdio
34+
```
35+
36+
## Configuration
37+
38+
Configure via environment variables:
39+
40+
- `NATS_URL` - NATS server URL(s). Single server: `localhost:4222` or multiple for failover: `localhost:4222,localhost:4223,localhost:4224` (default: `localhost:4222`)
41+
- `NATS_USER` - NATS username (optional)
42+
- `NATS_PASSWORD` - NATS password (optional)
43+
- `NATS_TOKEN` - NATS token (optional)
44+
- `CUSTOM_PREFIX` - Custom subject prefix for multi-tenancy (optional)
45+
- `OTEL_EXPORTER_OTLP_ENDPOINT` - OpenTelemetry collector endpoint (optional)
46+
47+
See code documentation for additional configuration options.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use acp_nats::{Config, NatsConfig};
2+
use clap::Parser;
3+
use trogon_std::env::ReadEnv;
4+
5+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
6+
const DEFAULT_ACP_PREFIX: &str = "acp";
7+
8+
#[derive(Parser, Debug)]
9+
#[command(name = "acp-nats-stdio")]
10+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
11+
pub struct Args {
12+
#[arg(long = "acp-prefix", env = ENV_ACP_PREFIX)]
13+
pub acp_prefix: Option<String>,
14+
}
15+
16+
pub fn from_env_with_provider<E: ReadEnv>(env_provider: &E) -> Config {
17+
let args = Args::parse();
18+
Config::new(
19+
args.acp_prefix
20+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string()),
21+
NatsConfig::from_env(env_provider),
22+
)
23+
}
24+
25+
#[cfg(test)]
26+
mod tests {
27+
use super::*;
28+
use trogon_std::env::InMemoryEnv;
29+
30+
#[test]
31+
fn test_default_config() {
32+
let env = InMemoryEnv::new();
33+
let config = from_env_with_provider(&env);
34+
assert_eq!(config.acp_prefix, DEFAULT_ACP_PREFIX);
35+
assert_eq!(config.nats.servers, vec!["localhost:4222"]);
36+
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::None));
37+
}
38+
39+
#[test]
40+
fn test_nats_config_from_env() {
41+
let env = InMemoryEnv::new();
42+
env.set("NATS_URL", "host1:4222,host2:4222");
43+
env.set("NATS_TOKEN", "my-token");
44+
let config = from_env_with_provider(&env);
45+
assert_eq!(config.nats.servers, vec!["host1:4222", "host2:4222"]);
46+
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
47+
}
48+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
mod config;
2+
mod telemetry;
3+
mod signal;
4+
5+
use acp_nats::{agent::Bridge, client, nats};
6+
use agent_client_protocol::AgentSideConnection;
7+
use async_nats::Client as NatsAsyncClient;
8+
use std::sync::Arc;
9+
use tracing::{error, info};
10+
use trogon_std::env::SystemEnv;
11+
12+
#[tokio::main]
13+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
14+
let config = config::from_env_with_provider(&SystemEnv);
15+
telemetry::init_logger(&config)?;
16+
17+
info!("ACP bridge starting");
18+
19+
let nats_client = match nats::connect(&config.nats).await {
20+
Ok(client) => Some(client),
21+
Err(e) => {
22+
error!(error = %e, "Failed to connect to NATS");
23+
None
24+
}
25+
};
26+
let local = tokio::task::LocalSet::new();
27+
28+
local.run_until(run_bridge(nats_client, &config)).await;
29+
30+
info!("Flushing telemetry...");
31+
telemetry::shutdown_otel().await;
32+
info!("ACP bridge stopped");
33+
34+
Ok(())
35+
}
36+
37+
async fn run_bridge(nats_client: Option<NatsAsyncClient>, config: &acp_nats::Config) {
38+
let stdin = async_compat::Compat::new(tokio::io::stdin());
39+
let stdout = async_compat::Compat::new(tokio::io::stdout());
40+
41+
let bridge = Arc::new(Bridge::<NatsAsyncClient>::new(
42+
nats_client.clone(),
43+
config.acp_prefix.clone(),
44+
));
45+
46+
let (connection, io_task) =
47+
AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
48+
tokio::task::spawn_local(fut);
49+
});
50+
51+
let connection = Arc::new(connection);
52+
53+
if let Some(nats_instance) = nats_client {
54+
let client_connection = connection.clone();
55+
let bridge_for_client = bridge.clone();
56+
tokio::task::spawn_local(async move {
57+
client::run::<NatsAsyncClient, _>(
58+
nats_instance,
59+
client_connection,
60+
bridge_for_client,
61+
)
62+
.await;
63+
});
64+
info!("ACP bridge running on stdio with NATS client proxy");
65+
} else {
66+
info!("ACP bridge running on stdio (no NATS)");
67+
}
68+
69+
tokio::select! {
70+
result = io_task => {
71+
if let Err(e) = result {
72+
error!(error = %e, "IO task error");
73+
}
74+
info!("ACP bridge shutting down (IO closed)");
75+
}
76+
_ = signal::shutdown_signal() => {
77+
info!("ACP bridge shutting down (signal received)");
78+
}
79+
}
80+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use tokio::signal;
2+
use tracing::warn;
3+
4+
pub async fn shutdown_signal() {
5+
let ctrl_c = async {
6+
signal::ctrl_c()
7+
.await
8+
.expect("Failed to install Ctrl+C handler");
9+
};
10+
11+
#[cfg(unix)]
12+
let terminate = async {
13+
signal::unix::signal(signal::unix::SignalKind::terminate())
14+
.expect("Failed to install SIGTERM handler")
15+
.recv()
16+
.await;
17+
};
18+
19+
#[cfg(not(unix))]
20+
let terminate = std::future::pending::<()>();
21+
22+
tokio::select! {
23+
_ = ctrl_c => {
24+
warn!("Received SIGINT (Ctrl+C)");
25+
}
26+
_ = terminate => {
27+
warn!("Received SIGTERM");
28+
}
29+
}
30+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use opentelemetry_otlp::LogExporter;
2+
use opentelemetry_sdk::Resource;
3+
use opentelemetry_sdk::logs::SdkLoggerProvider;
4+
use std::path::PathBuf;
5+
use std::sync::OnceLock;
6+
7+
pub(super) static LOGGER_PROVIDER: OnceLock<SdkLoggerProvider> = OnceLock::new();
8+
9+
pub(super) fn init_provider(
10+
resource: &Resource,
11+
) -> Result<SdkLoggerProvider, Box<dyn std::error::Error>> {
12+
let exporter = LogExporter::builder().with_http().build()?;
13+
14+
let provider = SdkLoggerProvider::builder()
15+
.with_batch_exporter(exporter)
16+
.with_resource(resource.clone())
17+
.build();
18+
19+
Ok(provider)
20+
}
21+
22+
pub(super) fn shutdown() {
23+
if let Some(provider) = LOGGER_PROVIDER.get() {
24+
if let Err(e) = provider.shutdown() {
25+
eprintln!("Failed to shutdown logger provider: {e}");
26+
}
27+
}
28+
}
29+
30+
pub(super) fn get_log_dir() -> Result<PathBuf, Box<dyn std::error::Error>> {
31+
if let Ok(dir) = std::env::var("ACP_LOG_DIR") {
32+
let path = PathBuf::from(dir);
33+
std::fs::create_dir_all(&path)?;
34+
return Ok(path);
35+
}
36+
37+
let log_dir = platform_log_dir()?;
38+
std::fs::create_dir_all(&log_dir)?;
39+
Ok(log_dir)
40+
}
41+
42+
fn platform_log_dir() -> Result<PathBuf, Box<dyn std::error::Error>> {
43+
use trogon_std::dirs::{HomeDir, StateDir, SystemDirs};
44+
45+
if cfg!(target_os = "macos") {
46+
let home = SystemDirs
47+
.home_dir()
48+
.ok_or("HOME not set")?;
49+
Ok(home.join("Library").join("Logs").join("acp-nats-stdio"))
50+
} else {
51+
let base = SystemDirs
52+
.state_dir()
53+
.ok_or("could not determine state directory")?;
54+
Ok(base.join("acp-nats-stdio"))
55+
}
56+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use opentelemetry_otlp::MetricExporter;
2+
use opentelemetry_sdk::Resource;
3+
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
4+
use std::sync::OnceLock;
5+
use std::time::Duration;
6+
7+
pub(super) static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
8+
9+
pub(super) fn init_provider(
10+
resource: &Resource,
11+
) -> Result<SdkMeterProvider, Box<dyn std::error::Error>> {
12+
let exporter = MetricExporter::builder().with_http().build()?;
13+
14+
let reader = PeriodicReader::builder(exporter)
15+
.with_interval(Duration::from_secs(30))
16+
.build();
17+
18+
let provider = SdkMeterProvider::builder()
19+
.with_reader(reader)
20+
.with_resource(resource.clone())
21+
.build();
22+
23+
Ok(provider)
24+
}
25+
26+
pub(super) fn shutdown() {
27+
if let Some(provider) = METER_PROVIDER.get() {
28+
if let Err(e) = provider.shutdown() {
29+
eprintln!("Failed to shutdown meter provider: {e}");
30+
}
31+
}
32+
}

0 commit comments

Comments
 (0)