Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,457 changes: 1,427 additions & 30 deletions rsworkspace/Cargo.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "acp-nats-stdio"
version = "0.1.0"
edition = "2024"

[dependencies]
acp-nats = { path = "../acp-nats" }
agent-client-protocol = "0.9.3"
async-compat = "0.2.5"
async-nats = "0.45.0"
clap = { version = "4.5", features = ["derive", "env"] }
opentelemetry = "0.31.0"
opentelemetry-appender-tracing = "0.31.0"
opentelemetry-otlp = { version = "0.31.0", features = ["http-json", "logs", "metrics", "reqwest-rustls"] }
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "logs", "metrics"] }
trogon-std = { path = "../trogon-std" }
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "signal"] }
tracing = "0.1.44"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3.22", features = ["env-filter", "fmt", "json"] }

[dev-dependencies]
trogon-std = { path = "../trogon-std", features = ["test-support"] }
47 changes: 47 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# ACP NATS Bridge

A Rust bridge that connects ACP (Agent Client Protocol) clients to NATS-based backends.

```mermaid
graph LR
A[IDE] <-->|stdio| B[acp-nats-bridge]
B <-->|NATS| C[Backend]
style A fill:#e1f5ff,stroke:#0288d1
style B fill:#fff3e0,stroke:#f57c00
style C fill:#f3e5f5,stroke:#7b1fa2
```

## Features

- Bidirectional ACP Bridge with request forwarding
- Auto-reconnect with exponential backoff
- OpenTelemetry integration (logs, metrics, traces)
- Graceful shutdown (SIGINT/SIGTERM)
- Custom prefix support for multi-tenancy

## Quick Start

```bash
# Prerequisites: NATS server running
docker run -p 4222:4222 nats:latest

# Build
cargo build --release

# Run
./target/release/acp-nats-stdio
```

## Configuration

Configure via environment variables:

- `NATS_URL` - NATS server URL(s). Single server: `localhost:4222` or multiple for failover: `localhost:4222,localhost:4223,localhost:4224` (default: `localhost:4222`)
- `NATS_USER` - NATS username (optional)
- `NATS_PASSWORD` - NATS password (optional)
- `NATS_TOKEN` - NATS token (optional)
- `CUSTOM_PREFIX` - Custom subject prefix for multi-tenancy (optional)
- `OTEL_EXPORTER_OTLP_ENDPOINT` - OpenTelemetry collector endpoint (optional)

See code documentation for additional configuration options.
56 changes: 56 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use acp_nats::{Config, NatsConfig};
use clap::Parser;
use trogon_std::env::ReadEnv;

const ENV_ACP_PREFIX: &str = "ACP_PREFIX";
const DEFAULT_ACP_PREFIX: &str = "acp";

#[derive(Parser, Debug)]
#[command(name = "acp-nats-stdio")]
#[command(about = "ACP stdio to NATS bridge for agent-client protocol", long_about = None)]
pub struct Args {
#[arg(long = "acp-prefix")]
pub acp_prefix: Option<String>,
}

pub fn from_env_with_provider<E: ReadEnv>(env_provider: &E) -> Config {
let args = Args::parse();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Args::parse() in tests reads real process arguments

Medium Severity

from_env_with_provider hardcodes Args::parse(), which reads from std::env::args(). In test binaries, those args include cargo-test flags (e.g., --nocapture, test name filters). Clap will reject unrecognized args and call process::exit(2), killing the entire test suite. The tests only pass when cargo test is invoked with zero extra arguments — running cargo test test_default_config or cargo test -- --nocapture will abort the process before any test runs.

Additional Locations (1)

Fix in Cursor Fix in Web

let acp_prefix = args
.acp_prefix
.or_else(|| env_provider.var(ENV_ACP_PREFIX).ok())
.unwrap_or_else(|| DEFAULT_ACP_PREFIX.to_string());
Config::new(acp_prefix, NatsConfig::from_env(env_provider))
}

#[cfg(test)]
mod tests {
use super::*;
use trogon_std::env::InMemoryEnv;

#[test]
fn test_default_config() {
let env = InMemoryEnv::new();
let config = from_env_with_provider(&env);
assert_eq!(config.acp_prefix, DEFAULT_ACP_PREFIX);
assert_eq!(config.nats.servers, vec!["localhost:4222"]);
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::None));
}

#[test]
fn test_acp_prefix_from_env_provider() {
let env = InMemoryEnv::new();
env.set("ACP_PREFIX", "custom-prefix");
let config = from_env_with_provider(&env);
assert_eq!(config.acp_prefix, "custom-prefix");
}

#[test]
fn test_nats_config_from_env() {
let env = InMemoryEnv::new();
env.set("NATS_URL", "host1:4222,host2:4222");
env.set("NATS_TOKEN", "my-token");
let config = from_env_with_provider(&env);
assert_eq!(config.nats.servers, vec!["host1:4222", "host2:4222"]);
assert!(matches!(config.nats.auth, acp_nats::NatsAuth::Token(t) if t == "my-token"));
}
}
75 changes: 75 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
mod config;
mod signal;
mod telemetry;

use acp_nats::{agent::Bridge, client, nats};
use agent_client_protocol::AgentSideConnection;
use async_nats::Client as NatsAsyncClient;
use std::rc::Rc;
use tracing::{error, info, warn};
use trogon_std::env::SystemEnv;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = config::from_env_with_provider(&SystemEnv);
telemetry::init_logger(&config)?;

info!("ACP bridge starting");

let nats_client = match nats::connect(&config.nats).await {
Ok(client) => Some(client),
Err(e) => {
warn!(error = %e, "Failed to connect to NATS");
None
}
};
let local = tokio::task::LocalSet::new();

local.run_until(run_bridge(nats_client, &config)).await;

info!("Flushing telemetry...");
telemetry::shutdown_otel().await;
info!("ACP bridge stopped");

Ok(())
}

async fn run_bridge(nats_client: Option<NatsAsyncClient>, config: &acp_nats::Config) {
let stdin = async_compat::Compat::new(tokio::io::stdin());
let stdout = async_compat::Compat::new(tokio::io::stdout());

let bridge = Rc::new(Bridge::<NatsAsyncClient>::new(
nats_client.clone(),
config.acp_prefix.clone(),
));

let (connection, io_task) = AgentSideConnection::new(bridge.clone(), stdout, stdin, |fut| {
tokio::task::spawn_local(fut);
});

let connection = Rc::new(connection);

if let Some(nats_instance) = nats_client {
let client_connection = connection.clone();
let bridge_for_client = bridge.clone();
tokio::task::spawn_local(async move {
client::run::<NatsAsyncClient, _>(nats_instance, client_connection, bridge_for_client)
.await;
});
info!("ACP bridge running on stdio with NATS client proxy");
} else {
info!("ACP bridge running on stdio (no NATS)");
}

tokio::select! {
result = io_task => {
if let Err(e) = result {
error!(error = %e, "IO task error");
}
info!("ACP bridge shutting down (IO closed)");
}
_ = signal::shutdown_signal() => {
info!("ACP bridge shutting down (signal received)");
}
}
}
30 changes: 30 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/src/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use tokio::signal;
use tracing::info;

pub async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {
info!("Received SIGINT (Ctrl+C)");
}
_ = terminate => {
info!("Received SIGTERM");
}
}
}
54 changes: 54 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/src/telemetry/log.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use std::path::PathBuf;
use std::sync::OnceLock;

pub(super) static LOGGER_PROVIDER: OnceLock<SdkLoggerProvider> = OnceLock::new();

pub(super) fn init_provider(
resource: &Resource,
) -> Result<SdkLoggerProvider, Box<dyn std::error::Error>> {
let exporter = LogExporter::builder().with_http().build()?;

let provider = SdkLoggerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource.clone())
.build();

Ok(provider)
}

pub(super) fn shutdown() {
if let Some(provider) = LOGGER_PROVIDER.get()
&& let Err(e) = provider.shutdown()
{
eprintln!("Failed to shutdown logger provider: {e}");
}
}

pub(super) fn get_log_dir() -> Result<PathBuf, Box<dyn std::error::Error>> {
if let Ok(dir) = std::env::var("ACP_LOG_DIR") {
let path = PathBuf::from(dir);
std::fs::create_dir_all(&path)?;
return Ok(path);
}

let log_dir = platform_log_dir()?;
std::fs::create_dir_all(&log_dir)?;
Ok(log_dir)
}

fn platform_log_dir() -> Result<PathBuf, Box<dyn std::error::Error>> {
use trogon_std::dirs::{HomeDir, StateDir, SystemDirs};

if cfg!(target_os = "macos") {
let home = SystemDirs.home_dir().ok_or("HOME not set")?;
Ok(home.join("Library").join("Logs").join("acp-nats-stdio"))
} else {
let base = SystemDirs
.state_dir()
.ok_or("could not determine state directory")?;
Ok(base.join("acp-nats-stdio"))
}
}
32 changes: 32 additions & 0 deletions rsworkspace/crates/acp-nats-stdio/src/telemetry/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use opentelemetry_otlp::MetricExporter;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use std::sync::OnceLock;
use std::time::Duration;

pub(super) static METER_PROVIDER: OnceLock<SdkMeterProvider> = OnceLock::new();

pub(super) fn init_provider(
resource: &Resource,
) -> Result<SdkMeterProvider, Box<dyn std::error::Error>> {
let exporter = MetricExporter::builder().with_http().build()?;

let reader = PeriodicReader::builder(exporter)
.with_interval(Duration::from_secs(30))
.build();

let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource.clone())
.build();

Ok(provider)
}

pub(super) fn shutdown() {
if let Some(provider) = METER_PROVIDER.get()
&& let Err(e) = provider.shutdown()
{
eprintln!("Failed to shutdown meter provider: {e}");
}
}
Loading