Skip to content

Commit 07a4c67

Browse files
committed
fix test bug
1 parent 4c047a5 commit 07a4c67

File tree

17 files changed

+131
-98
lines changed

17 files changed

+131
-98
lines changed

agent/Cargo.lock

Lines changed: 63 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/crates/deeptrace/src/agent.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use observ_config::{Configurator, trace_config};
66
use observ_core::Module;
77
use observ_runtime::handle;
88
use observ_sender::{Sender, elastic::ElasticSender, file::FileSender};
9-
use observ_trace::TraceCollector;
9+
use observ_trace::{TraceCollector, span::SpanConstructor};
1010
use tokio::{
1111
sync::{mpsc, watch},
1212
task::JoinHandle,
@@ -104,16 +104,17 @@ async fn run(
104104
info!("Start configurator");
105105
configurator.start()?;
106106
info!("Starting configurator");
107-
let (span_sender, span_receiver) = mpsc::channel(1024);
108-
let mut trace_sender = Sender::new(
109-
"Trace sender",
110-
span_receiver,
111-
ElasticSender::new(trace_config().load().sender.clone())?,
112-
JsonEncoderBuilder::new().build(),
113-
);
107+
let (message_sender, message_receiver) = crossbeam_channel::bounded(1024);
108+
let (span_sender, span_receiver) = crossbeam_channel::bounded(1024);
109+
let sender = ElasticSender::new(trace_config().load().sender.clone())?;
110+
let mut trace_sender =
111+
Sender::new("Trace sender", span_receiver, sender, JsonEncoderBuilder::new().build());
114112
trace_sender.start()?;
115113

116-
let mut trace_collector = TraceCollector::new(span_sender)?;
114+
let mut span_constructor = SpanConstructor::new(message_receiver, span_sender);
115+
span_constructor.start()?;
116+
117+
let mut trace_collector = TraceCollector::new(message_sender)?;
117118
trace_collector.start()?;
118119

119120
let _ = state_tx.send(State::Running);
@@ -128,6 +129,7 @@ async fn run(
128129
}
129130

130131
trace_collector.stop().await?;
132+
span_constructor.stop().await?;
131133
trace_sender.stop().await?;
132134
configurator.stop().await?;
133135

agent/crates/deeptrace/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ pub enum AgentError {
1313
Sender(#[from] observ_sender::SendError),
1414
#[error("Elastic sender error: {0}")]
1515
ElasticSender(#[from] observ_sender::elastic::ElasticError),
16+
#[error("Span constructor error: {0}")]
17+
SpanConstructor(#[from] observ_trace::span::SpanError),
1618
}

agent/crates/ebpf-common/src/buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
};
66
use aya_ebpf::{
77
check_bounds_signed,
8-
helpers::{bpf_probe_read_kernel_str_bytes, r#gen}
8+
helpers::{bpf_probe_read_kernel_str_bytes, r#gen},
99
};
1010
use core::cmp::min;
1111

agent/crates/ebpf-common/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ macro_rules! try_or_log {
212212
Ok(val) => val,
213213
Err(code) => {
214214
// Remove aya_log_ebpf::error! to avoid __bpf_trap
215-
aya_log_ebpf::error!(
215+
aya_log_ebpf::debug!(
216216
$ctx,
217217
"ERROR: [{:X}:{:X}]",
218218
$crate::error::extract_module(code),

agent/crates/l7-parser/src/infer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
cassandra::Cassandra, dns::DNS, http1::HTTP1, memcached::Memcached, mongodb::MongoDB,
3-
mysql::MySQL, redis::Redis, rocketmq::RocketMQ, thrift::Thrift, Classification,
2+
Classification, cassandra::Cassandra, dns::DNS, http1::HTTP1, memcached::Memcached,
3+
mongodb::MongoDB, mysql::MySQL, redis::Redis, rocketmq::RocketMQ, thrift::Thrift,
44
};
55
use aya_ebpf::programs::TracePointContext;
66
use ebpf_common::{

agent/crates/observ-metric/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ observ-metric-utils.workspace = true
2323
observ-network.workspace = true
2424
observ-runtime.workspace = true
2525

26+
crossbeam-channel.workspace = true
2627
log.workspace = true
2728
thiserror.workspace = true
2829
tokio.workspace = true

agent/crates/observ-metric/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crossbeam_channel::Sender;
12
pub use error::MetricError;
23
use host::HostCollector;
34
use log::{info, warn};
@@ -13,7 +14,7 @@ use std::{
1314
},
1415
time::Duration,
1516
};
16-
use tokio::{sync::mpsc::Sender, task::JoinHandle, time};
17+
use tokio::{task::JoinHandle, time};
1718
use tokio_stream::{StreamExt, wrappers::IntervalStream};
1819
mod error;
1920
mod host;
@@ -74,7 +75,7 @@ impl Module for MetricCollector {
7475
}
7576
}
7677
let metrics = mem::take(&mut buffer);
77-
output.send(metrics).await.map_err(|_| MetricError::Send)?;
78+
output.send(metrics).map_err(|_| MetricError::Send)?;
7879
// for metrics in buffer.drain(..) {
7980
// if output.send(metrics).await.is_err() {
8081
// warn!("Metric channel closed");

agent/crates/observ-sender/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ codec.workspace = true
2121
arc-swap.workspace = true
2222
bytes.workspace = true
2323
chrono.workspace = true
24+
crossbeam-channel.workspace = true
2425
elasticsearch.workspace = true
2526
log.workspace = true
2627
serde.workspace = true
28+
serde_json.workspace = true
2729
thiserror.workspace = true
2830
tokio.workspace = true

agent/crates/observ-sender/src/elastic/mod.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ use elasticsearch::{
88
},
99
};
1010
pub use error::ElasticError;
11+
use log::info;
1112
use observ_config::{ElasticSenderConfig, elastic_sender_config};
1213
use observ_core::{Sendable, Sender};
1314
use serde::Serialize;
15+
use serde_json::json;
1416
use std::time::Duration;
1517

1618
mod error;
@@ -42,33 +44,25 @@ impl ElasticSender {
4244
impl<S: Sendable + Serialize> Sender<S> for ElasticSender {
4345
type Error = ElasticError;
4446
async fn send(&mut self, item: BytesMut) -> Result<(), Self::Error> {
45-
self.buf.push(BytesMut::from(
46-
format!(
47-
r#"{{
48-
"index": {{
49-
"_index": "{}"
50-
}}
51-
}}"#,
52-
self.config.index_name
53-
)
54-
.as_bytes(),
55-
));
47+
let index = json!({
48+
"index": {
49+
"_index": self.config.index_name,
50+
}
51+
})
52+
.to_string();
53+
self.buf.push(BytesMut::from(index.as_bytes()));
5654
self.buf.push(item);
57-
if self.buf.len() > self.config.bulk_size * 1024 {
55+
if self.buf.len() > self.config.bulk_size * 2 {
5856
<Self as Sender<S>>::flush(self).await?;
5957
}
6058
Ok(())
6159
}
6260

6361
async fn flush(&mut self) -> Result<(), Self::Error> {
6462
let bulk_body = self.buf.drain(..).collect();
65-
let response = self
66-
.client
67-
.bulk(BulkParts::Index(self.config.index_name.as_str()))
68-
.body(bulk_body)
69-
.send()
70-
.await?;
63+
let response = self.client.bulk(BulkParts::None).body(bulk_body).send().await?;
7164
let status = response.status_code();
65+
info!("Elastic response: {}", status);
7266
if !status.is_success() {
7367
let err = response.text().await?;
7468

0 commit comments

Comments
 (0)