Skip to content

Commit 689a554

Browse files
committed
fix
1 parent 957659f commit 689a554

File tree

2 files changed

+218
-0
lines changed

2 files changed

+218
-0
lines changed

opentelemetry-sdk/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pub mod runtime;
137137
#[cfg(any(feature = "testing", test))]
138138
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
139139
pub mod testing;
140+
pub mod thread_runtime;
140141

141142
#[allow(deprecated)]
142143
#[cfg(feature = "trace")]
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
use crate::runtime::{Runtime, RuntimeChannel, TrySend, TrySendError};
2+
use futures_executor;
3+
use futures_util::{future::BoxFuture, stream::Stream};
4+
use std::{
5+
fmt::Debug,
6+
future::Future,
7+
pin::Pin,
8+
sync::{mpsc, Arc, Mutex},
9+
task::{Context, Poll},
10+
thread,
11+
time::{Duration, Instant},
12+
};
13+
14+
/// WorkerPool: Manages worker threads to process tasks.
15+
#[derive(Clone, Debug)]
16+
struct WorkerPool {
17+
task_sender: Arc<Mutex<mpsc::SyncSender<BoxFuture<'static, ()>>>>,
18+
}
19+
20+
impl WorkerPool {
21+
/// Create a new WorkerPool with the specified number of worker threads.
22+
fn new(num_threads: usize, queue_capacity: usize) -> Self {
23+
let (task_sender, task_receiver) = mpsc::sync_channel(queue_capacity);
24+
let task_receiver = Arc::new(Mutex::new(task_receiver));
25+
26+
// Spawn worker threads
27+
for _ in 0..num_threads {
28+
let task_receiver = Arc::clone(&task_receiver);
29+
thread::spawn(move || Self::worker_loop(task_receiver));
30+
}
31+
32+
WorkerPool {
33+
task_sender: Arc::new(Mutex::new(task_sender)),
34+
}
35+
}
36+
37+
/// Worker loop that runs tasks in worker threads.
38+
fn worker_loop(task_receiver: Arc<Mutex<mpsc::Receiver<BoxFuture<'static, ()>>>>) {
39+
loop {
40+
let task = task_receiver.lock().unwrap().recv();
41+
if let Ok(task) = task {
42+
// Block on task execution.
43+
futures_executor::block_on(task);
44+
} else {
45+
break; // Exit the loop when the sender is closed
46+
}
47+
}
48+
}
49+
50+
/// Enqueue a new task for execution.
51+
fn enqueue_task(&self, future: BoxFuture<'static, ()>) {
52+
let task_sender = Arc::clone(&self.task_sender);
53+
let sender = task_sender.lock().unwrap();
54+
sender.send(future).unwrap();
55+
}
56+
57+
/// Shutdown the worker pool.
58+
fn shutdown(&self) {
59+
// Signal threads to exit and process any remaining tasks
60+
drop(self.task_sender.lock().unwrap().clone());
61+
}
62+
}
63+
64+
/// TimeSchedulers: Manages interval and delay mechanisms.
65+
struct TimeSchedulers;
66+
67+
impl TimeSchedulers {
68+
/// Create an interval stream that ticks at a given duration.
69+
fn create_interval(duration: Duration) -> CustomInterval {
70+
let (sender, receiver) = mpsc::channel();
71+
thread::spawn(move || {
72+
let mut next_tick = Instant::now();
73+
loop {
74+
next_tick += duration;
75+
if sender.send(()).is_err() {
76+
break;
77+
}
78+
let now = Instant::now();
79+
if next_tick > now {
80+
thread::sleep(next_tick - now);
81+
}
82+
}
83+
});
84+
CustomInterval { receiver }
85+
}
86+
87+
/// Create a delay future that resolves after the given duration.
88+
fn create_delay(duration: Duration) -> CustomDelay {
89+
let (sender, receiver) = mpsc::channel();
90+
thread::spawn(move || {
91+
thread::sleep(duration);
92+
let _ = sender.send(());
93+
});
94+
CustomDelay { receiver }
95+
}
96+
}
97+
98+
/// CustomInterval: A stream that ticks at fixed intervals using a background thread.
99+
#[derive(Debug)]
100+
pub struct CustomInterval {
101+
receiver: mpsc::Receiver<()>,
102+
}
103+
104+
impl Stream for CustomInterval {
105+
type Item = ();
106+
107+
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108+
match self.receiver.recv() {
109+
Ok(_) => Poll::Ready(Some(())),
110+
Err(_) => Poll::Ready(None),
111+
}
112+
}
113+
}
114+
115+
/// CustomDelay: A future that resolves after a fixed delay using a background thread.
116+
#[derive(Debug)]
117+
pub struct CustomDelay {
118+
receiver: mpsc::Receiver<()>,
119+
}
120+
121+
impl Future for CustomDelay {
122+
type Output = ();
123+
124+
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
125+
match self.receiver.recv() {
126+
Ok(_) => Poll::Ready(()),
127+
Err(_) => Poll::Ready(()),
128+
}
129+
}
130+
}
131+
132+
/// CustomThreadRuntime: Combines worker pool and time schedulers to manage tasks and timers.
133+
#[derive(Debug, Clone)]
134+
pub struct CustomThreadRuntime {
135+
worker_pool: WorkerPool,
136+
}
137+
138+
impl CustomThreadRuntime {
139+
/// Create a new CustomThreadRuntime with the specified number of worker threads.
140+
pub fn new(num_threads: usize, queue_capacity: usize) -> Self {
141+
CustomThreadRuntime {
142+
worker_pool: WorkerPool::new(num_threads, queue_capacity),
143+
}
144+
}
145+
}
146+
147+
impl Runtime for CustomThreadRuntime {
148+
type Interval = CustomInterval;
149+
type Delay = CustomDelay;
150+
151+
fn interval(&self, duration: Duration) -> Self::Interval {
152+
TimeSchedulers::create_interval(duration)
153+
}
154+
155+
fn spawn(&self, future: BoxFuture<'static, ()>) {
156+
self.worker_pool.enqueue_task(future);
157+
}
158+
159+
fn delay(&self, duration: Duration) -> Self::Delay {
160+
TimeSchedulers::create_delay(duration)
161+
}
162+
}
163+
164+
/// Messaging system for sending batch messages.
165+
#[derive(Debug)]
166+
pub struct CustomSender<T: Debug + Send> {
167+
tx: mpsc::SyncSender<T>,
168+
}
169+
170+
/// Messaging system for receiving batch messages.
171+
#[derive(Debug)]
172+
pub struct CustomReceiver<T: Debug + Send> {
173+
rx: mpsc::Receiver<T>,
174+
}
175+
176+
impl<T: Debug + Send> TrySend for CustomSender<T> {
177+
type Message = T;
178+
179+
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
180+
self.tx.send(item).map_err(|_| TrySendError::ChannelClosed)
181+
}
182+
}
183+
184+
impl<T: Debug + Send> Stream for CustomReceiver<T> {
185+
type Item = T;
186+
187+
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188+
// Use `try_recv` instead of `recv` to avoid blocking
189+
match self.rx.try_recv() {
190+
Ok(item) => Poll::Ready(Some(item)),
191+
Err(mpsc::TryRecvError::Empty) => {
192+
// No message is available yet, so we'll return `Poll::Pending`
193+
// and recheck after a short sleep to avoid busy-waiting.
194+
thread::sleep(Duration::from_millis(10)); // Adjust sleep duration if needed
195+
Poll::Pending
196+
}
197+
Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(None), // Channel is closed, terminate the stream
198+
}
199+
}
200+
}
201+
202+
impl RuntimeChannel for CustomThreadRuntime {
203+
type Receiver<T: Debug + Send> = CustomReceiver<T>;
204+
type Sender<T: Debug + Send> = CustomSender<T>;
205+
206+
fn batch_message_channel<T: Debug + Send>(
207+
&self,
208+
capacity: usize,
209+
) -> (Self::Sender<T>, Self::Receiver<T>) {
210+
// Use mpsc to create a bounded channel
211+
let (tx, rx) = mpsc::sync_channel(capacity);
212+
(
213+
CustomSender { tx }, // Sender part
214+
CustomReceiver { rx }, // Receiver part
215+
)
216+
}
217+
}

0 commit comments

Comments
 (0)