Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/actix-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use opentelemetry::{
};

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://127.0.0.1:14268/api/traces")
.with_service_name("trace-http-demo")
.install_batch(opentelemetry::runtime::TokioCurrentThread)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/actix-udp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use opentelemetry::{
};

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint("localhost:6831")
opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![
Expand Down
2 changes: 1 addition & 1 deletion examples/async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
}

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("trace-demo")
.install_batch(opentelemetry::runtime::Tokio)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::error::Error;
use std::time::Duration;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("trace-demo")
.with_trace_config(Config::default().with_resource(Resource::new(vec![
KeyValue::new("service.name", "new_service"),
Expand Down
2 changes: 1 addition & 1 deletion examples/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod hello_world {

fn tracing_init() -> TraceResult<Tracer> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-client")
.install_simple()
}
Expand Down
2 changes: 1 addition & 1 deletion examples/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Greeter for MyGreeter {

fn tracing_init() -> Result<impl Tracer, TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-server")
.install_simple()
}
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-span-processors/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use std::time::Duration;
fn init_tracer() -> Result<(), TraceError> {
// build a jaeger batch span processor
let jaeger_processor = BatchSpanProcessor::builder(
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("trace-demo")
.with_trace_config(
Config::default()
.with_resource(Resource::new(vec![KeyValue::new("exporter", "jaeger")])),
)
.init_async_exporter(opentelemetry::runtime::Tokio)?,
.build_async_agent_exporter(opentelemetry::runtime::Tokio)?,
opentelemetry::runtime::Tokio,
)
.build();
Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-jaeger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use opentelemetry::trace::Tracer;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline().install_simple()?;
let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;

tracer.in_span("doing_work", |cx| {
// Traced app logic here...
Expand Down Expand Up @@ -76,7 +76,7 @@ opentelemetry-jaeger = { version = "*", features = ["rt-tokio"] }
```

```rust
let tracer = opentelemetry_jaeger::new_pipeline()
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.install_batch(opentelemetry::runtime::Tokio)?;
```

Expand Down Expand Up @@ -120,11 +120,11 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:
use opentelemetry::trace::Tracer;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = opentelemetry_jaeger::new_pipeline()
.with_collector_endpoint("http://localhost:14268/api/traces")
let tracer = opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://localhost:14268/api/traces")
// optionally set username and password as well.
.with_collector_username("username")
.with_collector_password("s3cr3t")
.with_username("username")
.with_password("s3cr3t")
.install_batch()?;

tracer.in_span("doing_work", |cx| {
Expand Down
9 changes: 2 additions & 7 deletions opentelemetry-jaeger/src/exporter/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use thrift::{
transport::{ReadHalf, TIoChannel, WriteHalf},
};

/// The max size of UDP packet we want to send, synced with jaeger-agent
const UDP_PACKET_MAX_LENGTH: usize = 65_000;

struct BufferClient {
buffer: ReadHalf<TBufferChannel>,
client: agent::AgentSyncClient<
Expand Down Expand Up @@ -47,10 +44,9 @@ impl AgentSyncClientUdp {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
host_port: T,
max_packet_size: Option<usize>,
max_packet_size: usize,
auto_split: bool,
) -> thrift::Result<Self> {
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
Expand Down Expand Up @@ -106,11 +102,10 @@ impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
host_port: T,
max_packet_size: Option<usize>,
max_packet_size: usize,
runtime: R,
auto_split: bool,
) -> thrift::Result<Self> {
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
Expand Down
58 changes: 33 additions & 25 deletions opentelemetry-jaeger/src/exporter/collector.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
//! # HTTP Jaeger Collector Client
//!
#[cfg(feature = "collector_client")]
use http::Uri;
#[cfg(feature = "collector_client")]
use opentelemetry_http::{HttpClient, ResponseExt as _};
use std::sync::atomic::AtomicUsize;

/// `CollectorAsyncClientHttp` implements an async version of the
/// `TCollectorSyncClient` interface over HTTP
#[derive(Debug)]
pub(crate) struct CollectorAsyncClientHttp {
endpoint: Uri,
#[cfg(feature = "collector_client")]
client: Box<dyn HttpClient>,
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
client: WasmHttpClient,
payload_size_estimate: AtomicUsize,
}

#[cfg(feature = "collector_client")]
pub(crate) use collector_client::AsyncHttpClient;
#[cfg(feature = "wasm_collector_client")]
#[derive(Debug)]
struct WasmHttpClient {
_auth: Option<String>,
}
pub(crate) use wasm_collector_client::WasmCollector;

#[cfg(feature = "collector_client")]
mod collector_client {
Expand All @@ -31,14 +19,23 @@ mod collector_client {
use std::sync::atomic::{AtomicUsize, Ordering};
use thrift::protocol::TBinaryOutputProtocol;

impl CollectorAsyncClientHttp {
/// `AsyncHttpClient` implements an async version of the
/// `TCollectorSyncClient` interface over HTTP
#[derive(Debug)]
pub(crate) struct AsyncHttpClient {
endpoint: Uri,
http_client: Box<dyn HttpClient>,
payload_size_estimate: AtomicUsize,
}

impl AsyncHttpClient {
/// Create a new HTTP collector client
pub(crate) fn new(endpoint: Uri, client: Box<dyn HttpClient>) -> Self {
let payload_size_estimate = AtomicUsize::new(512);

CollectorAsyncClientHttp {
AsyncHttpClient {
endpoint,
client,
http_client: client,
payload_size_estimate,
}
}
Expand Down Expand Up @@ -68,15 +65,14 @@ mod collector_client {
.expect("request should always be valid");

// Send request to collector
let _ = self.client.send(req).await?.error_for_status()?;
let _ = self.http_client.send(req).await?.error_for_status()?;
Ok(())
}
}
}

#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
#[cfg(feature = "wasm_collector_client")]
mod wasm_collector_client {
use super::*;
use crate::exporter::thrift::jaeger;
use futures_util::future;
use http::Uri;
Expand All @@ -91,7 +87,19 @@ mod wasm_collector_client {
use wasm_bindgen_futures::JsFuture;
use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response};

impl CollectorAsyncClientHttp {
#[derive(Debug)]
pub(crate) struct WasmCollector {
endpoint: Uri,
payload_size_estimate: AtomicUsize,
client: WasmHttpClient,
}

#[derive(Debug, Default)]
struct WasmHttpClient {
auth: Option<String>,
}

impl WasmCollector {
/// Create a new HTTP collector client
pub(crate) fn new(
endpoint: Uri,
Expand All @@ -111,7 +119,7 @@ mod wasm_collector_client {

Ok(Self {
endpoint,
client: WasmHttpClient { _auth: auth },
client: WasmHttpClient { auth },
payload_size_estimate,
})
}
Expand Down
Loading