Skip to content

Commit d8838ae

Browse files
committed
Add ACP IO Nats Bridge
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 3107329 commit d8838ae

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+5025
-30
lines changed

rsworkspace/Cargo.lock

Lines changed: 1427 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "acp-nats-stdio"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
acp-nats = { path = "../acp-nats" }
8+
agent-client-protocol = "0.9.3"
9+
async-compat = "0.2.5"
10+
async-nats = "0.45.0"
11+
clap = { version = "4.5", features = ["derive", "env"] }
12+
opentelemetry = "0.31.0"
13+
opentelemetry-appender-tracing = "0.31.0"
14+
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
15+
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics"] }
16+
trogon-std = { path = "../trogon-std" }
17+
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal"] }
18+
tracing = "0.1.44"
19+
tracing-opentelemetry = "0.32.1"
20+
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }
21+
22+
[dev-dependencies]
23+
trogon-std = { path = "../trogon-std", features = ["test-support"] }
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# ACP NATS Bridge
2+
3+
A Rust bridge that connects ACP (Agent Client Protocol) clients to NATS-based backends.
4+
5+
```mermaid
6+
graph LR
7+
A[IDE] <-->|stdio| B[acp-nats-bridge]
8+
B <-->|NATS| C[Backend]
9+
10+
style A fill:#e1f5ff,stroke:#0288d1
11+
style B fill:#fff3e0,stroke:#f57c00
12+
style C fill:#f3e5f5,stroke:#7b1fa2
13+
```
14+
15+
## Features
16+
17+
- Bidirectional ACP Bridge with request forwarding
18+
- Auto-reconnect with exponential backoff
19+
- OpenTelemetry integration (logs, metrics, traces)
20+
- Graceful shutdown (SIGINT/SIGTERM)
21+
- Custom prefix support for multi-tenancy
22+
23+
## Quick Start
24+
25+
```bash
26+
# Prerequisites: NATS server running
27+
docker run -p 4222:4222 nats:latest
28+
29+
# Build
30+
cargo build --release
31+
32+
# Run
33+
./target/release/acp-nats-stdio
34+
```
35+
36+
## Configuration
37+
38+
Configure via environment variables:
39+
40+
- `NATS_URL` - NATS server URL(s). Single server: `localhost:4222` or multiple for failover: `localhost:4222,localhost:4223,localhost:4224` (default: `localhost:4222`)
41+
- `NATS_USER` - NATS username (optional)
42+
- `NATS_PASSWORD` - NATS password (optional)
43+
- `NATS_TOKEN` - NATS token (optional)
44+
- `CUSTOM_PREFIX` - Custom subject prefix for multi-tenancy (optional)
45+
- `OTEL_EXPORTER_OTLP_ENDPOINT` - OpenTelemetry collector endpoint (optional)
46+
47+
See code documentation for additional configuration options.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use acp_nats::{Config, NatsConfig};
2+
use clap::Parser;
3+
use trogon_std::env::ReadEnv;
4+
5+
const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
6+
const DEFAULT_ACP_PREFIX: &str = "acp";
7+
8+
#[derive(Parser, Debug)]
9+
#[command(name = "acp-nats-stdio")]
10+
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
11+
pub struct Args {
12+
#[arg(long = "acp-prefix", env = ENV_ACP_PREFIX)]
13+
pub acp_prefix: Option<String>,
14+
}
15+
16+
pub fn from_env_with_provider<E: ReadEnv>(env_provider: &E) -> Config {
17+
let args = Args::parse();
18+
Config::new(
19+
args.acp_prefix
20+
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string()),
21+
NatsConfig::from_env(env_provider),
22+
)
23+
}
24+
25+
#[cfg(test)]
26+
mod tests {
27+
use super::*;
28+
use trogon_std::env::InMemoryEnv;
29+
30+
#[test]
31+
fn test_default_config() {
32+
let env = InMemoryEnv::new();
33+
let config = from_env_with_provider(&env);
34+
assert_eq!(config.acp_prefix, DEFAULT_ACP_PREFIX);
35+
assert_eq!(config.nats.servers, vec!["localhost:4222"]);
36+
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::None));
37+
}
38+
39+
#[test]
40+
fn test_nats_config_from_env() {
41+
let env = InMemoryEnv::new();
42+
env.set("NATS_URL", "host1:4222,host2:4222");
43+
env.set("NATS_TOKEN", "my-token");
44+
let config = from_env_with_provider(&env);
45+
assert_eq!(config.nats.servers, vec!["host1:4222", "host2:4222"]);
46+
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
47+
}
48+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
mod config;
2+
mod otel;
3+
mod signal;
4+
5+
use acp_nats::{agent::Bridge, client, nats};
6+
use agent_client_protocol::AgentSideConnection;
7+
use async_nats::Client as NatsAsyncClient;
8+
use std::sync::Arc;
9+
use tracing::{error, info};
10+
use trogon_std::env::SystemEnv;
11+
12+
#[tokio::main]
13+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
14+
let config = config::from_env_with_provider(&SystemEnv);
15+
otel::init_logger(&config)?;
16+
17+
let log_file = otel::get_log_file_path()?;
18+
info!(log_file = %log_file.display(), "ACP bridge starting");
19+
20+
let nats_client = match nats::connect(&config.nats).await {
21+
Ok(client) => Some(client),
22+
Err(e) => {
23+
error!(error = %e, "Failed to connect to NATS");
24+
None
25+
}
26+
};
27+
let local = tokio::task::LocalSet::new();
28+
29+
local
30+
.run_until(async {
31+
let stdin = async_compat::Compat::new(tokio::io::stdin());
32+
let stdout = async_compat::Compat::new(tokio::io::stdout());
33+
34+
let bridge = Arc::new(Bridge::<NatsAsyncClient>::new(
35+
nats_client.clone(),
36+
config.acp_prefix.clone(),
37+
));
38+
39+
let (connection, io_task) =
40+
AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
41+
tokio::task::spawn_local(fut);
42+
});
43+
44+
let connection = Arc::new(connection);
45+
46+
if let Some(nats_instance) = nats_client {
47+
let client_connection = connection.clone();
48+
let bridge_for_client = bridge.clone();
49+
tokio::task::spawn_local(async move {
50+
client::run::<NatsAsyncClient, _>(
51+
nats_instance,
52+
client_connection,
53+
bridge_for_client,
54+
)
55+
.await;
56+
});
57+
info!("ACP bridge running on stdio with NATS client proxy");
58+
} else {
59+
info!("ACP bridge running on stdio (no NATS)");
60+
}
61+
62+
tokio::select! {
63+
result = io_task => {
64+
if let Err(e) = result {
65+
error!(error = %e, "IO task error");
66+
}
67+
info!("ACP bridge shutting down (IO closed)");
68+
}
69+
_ = signal::shutdown_signal() => {
70+
info!("ACP bridge shutting down (signal received)");
71+
}
72+
}
73+
})
74+
.await;
75+
76+
info!("Flushing telemetry...");
77+
otel::shutdown_otel().await;
78+
info!("ACP bridge stopped");
79+
80+
Ok(())
81+
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
use acp_nats::Config;
2+
use opentelemetry::KeyValue;
3+
use opentelemetry::trace::TracerProvider;
4+
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
5+
use opentelemetry_otlp::{LogExporter, MetricExporter, SpanExporter};
6+
use opentelemetry_sdk::logs::SdkLoggerProvider;
7+
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
8+
use opentelemetry_sdk::propagation::TraceContextPropagator;
9+
use opentelemetry_sdk::{Resource, trace as sdktrace};
10+
use std::fs::File;
11+
use std::path::PathBuf;
12+
use std::sync::OnceLock;
13+
use std::time::Duration;
14+
use tracing_subscriber::EnvFilter;
15+
use tracing_subscriber::fmt::format::FmtSpan;
16+
use tracing_subscriber::layer::SubscriberExt;
17+
use tracing_subscriber::util::SubscriberInitExt;
18+
19+
static TRACER_PROVIDER: OnceLock<sdktrace::SdkTracerProvider> = OnceLock::new();
20+
static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();
21+
static LOGGER_PROVIDER: OnceLock<SdkLoggerProvider> = OnceLock::new();
22+
23+
pub fn init_logger(config: &Config) -> Result<(), Box<dyn std::error::Error>> {
24+
let log_dir = get_log_dir()?;
25+
let log_file = log_dir.join("acp-io-bridge-nats.log");
26+
27+
let file = File::create(&log_file)?;
28+
29+
let file_layer = tracing_subscriber::fmt::layer()
30+
.with_writer(file)
31+
.with_thread_ids(true)
32+
.with_span_events(FmtSpan::CLOSE)
33+
.json();
34+
35+
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
36+
37+
match try_init_otel(config) {
38+
Ok((tracer_provider, meter_provider, logger_provider)) => {
39+
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
40+
41+
let tracer = tracer_provider.tracer(DEFAULT_SERVICE_NAME);
42+
let otel_trace_layer = tracing_opentelemetry::layer().with_tracer(tracer);
43+
44+
opentelemetry::global::set_meter_provider(meter_provider.clone());
45+
METER_PROVIDER.set(meter_provider).ok();
46+
47+
let otel_logs_layer = OpenTelemetryTracingBridge::new(&logger_provider);
48+
LOGGER_PROVIDER.set(logger_provider).ok();
49+
50+
tracing_subscriber::registry()
51+
.with(env_filter)
52+
.with(file_layer)
53+
.with(otel_trace_layer)
54+
.with(otel_logs_layer)
55+
.init();
56+
57+
tracing::info!(
58+
log_file = %log_file.display(),
59+
"Logger initialized with OpenTelemetry"
60+
);
61+
}
62+
Err(e) => {
63+
tracing_subscriber::registry()
64+
.with(env_filter)
65+
.with(file_layer)
66+
.init();
67+
68+
tracing::warn!(
69+
log_file = %log_file.display(),
70+
error = %e,
71+
"Logger initialized without OpenTelemetry (init failed)"
72+
);
73+
}
74+
}
75+
76+
Ok(())
77+
}
78+
79+
const DEFAULT_SERVICE_NAME: &str = "acp-nats-stdio";
80+
81+
fn try_init_otel(
82+
config: &Config,
83+
) -> Result<
84+
(
85+
sdktrace::SdkTracerProvider,
86+
SdkMeterProvider,
87+
SdkLoggerProvider,
88+
),
89+
Box<dyn std::error::Error>,
90+
> {
91+
let mut builder = Resource::builder()
92+
.with_attributes(vec![KeyValue::new("acp.prefix", config.acp_prefix.clone())]);
93+
94+
if std::env::var("OTEL_SERVICE_NAME").is_err() {
95+
builder = builder.with_service_name(DEFAULT_SERVICE_NAME);
96+
}
97+
98+
let resource = builder.build();
99+
100+
let tracer_provider = init_tracer_provider(&resource)?;
101+
let meter_provider = init_meter_provider(&resource)?;
102+
let logger_provider = init_logger_provider(&resource)?;
103+
104+
Ok((tracer_provider, meter_provider, logger_provider))
105+
}
106+
107+
fn init_tracer_provider(
108+
resource: &Resource,
109+
) -> Result<sdktrace::SdkTracerProvider, Box<dyn std::error::Error>> {
110+
let exporter = SpanExporter::builder().with_http().build()?;
111+
112+
let provider = sdktrace::SdkTracerProvider::builder()
113+
.with_batch_exporter(exporter)
114+
.with_resource(resource.clone())
115+
.build();
116+
117+
opentelemetry::global::set_tracer_provider(provider.clone());
118+
TRACER_PROVIDER.set(provider.clone()).ok();
119+
120+
Ok(provider)
121+
}
122+
123+
fn init_meter_provider(
124+
resource: &Resource,
125+
) -> Result<SdkMeterProvider, Box<dyn std::error::Error>> {
126+
let exporter = MetricExporter::builder().with_http().build()?;
127+
128+
let reader = PeriodicReader::builder(exporter)
129+
.with_interval(Duration::from_secs(30))
130+
.build();
131+
132+
let provider = SdkMeterProvider::builder()
133+
.with_reader(reader)
134+
.with_resource(resource.clone())
135+
.build();
136+
137+
Ok(provider)
138+
}
139+
140+
fn init_logger_provider(
141+
resource: &Resource,
142+
) -> Result<SdkLoggerProvider, Box<dyn std::error::Error>> {
143+
let exporter = LogExporter::builder().with_http().build()?;
144+
145+
let provider = SdkLoggerProvider::builder()
146+
.with_batch_exporter(exporter)
147+
.with_resource(resource.clone())
148+
.build();
149+
150+
Ok(provider)
151+
}
152+
153+
fn get_log_dir() -> Result<PathBuf, Box<dyn std::error::Error>> {
154+
let base = std::env::var("XDG_CACHE_HOME")
155+
.map(PathBuf::from)
156+
.unwrap_or_else(|_| {
157+
std::env::var("HOME")
158+
.map(|h| PathBuf::from(h).join(".cache"))
159+
.unwrap_or_else(|_| std::env::temp_dir())
160+
});
161+
162+
let log_dir = base.join("acp-io-bridge-nats").join("logs");
163+
std::fs::create_dir_all(&log_dir)?;
164+
Ok(log_dir)
165+
}
166+
167+
pub fn get_log_file_path() -> Result<PathBuf, Box<dyn std::error::Error>> {
168+
let log_dir = get_log_dir()?;
169+
Ok(log_dir.join("acp-io-bridge-nats.log"))
170+
}
171+
172+
pub async fn shutdown_otel() {
173+
tracing::info!("Shutting down OpenTelemetry providers");
174+
tokio::time::sleep(Duration::from_millis(500)).await;
175+
176+
if let Some(logger_provider) = LOGGER_PROVIDER.get() {
177+
if let Err(e) = logger_provider.shutdown() {
178+
eprintln!("Failed to shutdown logger provider: {e}");
179+
}
180+
}
181+
182+
if let Some(meter_provider) = METER_PROVIDER.get() {
183+
if let Err(e) = meter_provider.shutdown() {
184+
eprintln!("Failed to shutdown meter provider: {e}");
185+
}
186+
}
187+
188+
if let Some(tracer_provider) = TRACER_PROVIDER.get() {
189+
if let Err(e) = tracer_provider.shutdown() {
190+
eprintln!("Failed to shutdown tracer provider: {e}");
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)