Skip to content

Commit 34adc5d

Browse files
committed
feat(sidecar): implement thread listener module
Signed-off-by: Alexandre Rulleau <alexandre.rulleau@datadoghq.com>
1 parent 1caf151 commit 34adc5d

File tree

3 files changed

+287
-0
lines changed

3 files changed

+287
-0
lines changed

datadog-sidecar-ffi/src/lib.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,59 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
307307
MaybeError::None
308308
}
309309

310+
#[no_mangle]
311+
#[cfg(unix)]
312+
pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError {
313+
use datadog_sidecar::setup::MasterListener;
314+
315+
let cfg = datadog_sidecar::config::FromEnv::config();
316+
try_c!(MasterListener::start(pid, cfg));
317+
318+
MaybeError::None
319+
}
320+
321+
#[no_mangle]
322+
#[cfg(unix)]
323+
pub extern "C" fn ddog_sidecar_connect_worker(
324+
pid: i32,
325+
connection: &mut *mut SidecarTransport,
326+
) -> MaybeError {
327+
use datadog_sidecar::setup::connect_to_master;
328+
329+
let transport = try_c!(connect_to_master(pid));
330+
*connection = Box::into_raw(transport);
331+
332+
MaybeError::None
333+
}
334+
335+
#[no_mangle]
336+
#[cfg(unix)]
337+
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
338+
use datadog_sidecar::setup::MasterListener;
339+
340+
try_c!(MasterListener::shutdown());
341+
342+
MaybeError::None
343+
}
344+
345+
#[no_mangle]
346+
#[cfg(unix)]
347+
pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool {
348+
use datadog_sidecar::setup::MasterListener;
349+
350+
MasterListener::is_active(pid)
351+
}
352+
353+
#[no_mangle]
354+
#[cfg(unix)]
355+
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
356+
use datadog_sidecar::setup::MasterListener;
357+
358+
try_c!(MasterListener::clear_inherited_state());
359+
360+
MaybeError::None
361+
}
362+
310363
#[no_mangle]
311364
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
312365
try_c!(blocking::ping(transport));

datadog-sidecar/src/setup/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ mod windows;
1212
#[cfg(windows)]
1313
pub use self::windows::*;
1414

15+
// Thread-based listener module (Unix only)
16+
#[cfg(unix)]
17+
pub mod thread_listener;
18+
#[cfg(unix)]
19+
pub use thread_listener::{MasterListener, connect_to_master};
20+
1521
use datadog_ipc::platform::Channel;
1622
use std::io;
1723

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::sync::{Arc, Mutex, OnceLock, mpsc};
5+
use std::thread::{self, JoinHandle};
6+
use std::time::Duration;
7+
use std::io;
8+
use tokio::net::UnixListener;
9+
use tokio::runtime::Runtime;
10+
use tracing::{info, error};
11+
12+
use crate::config::Config;
13+
use crate::config::IpcMode::{InstancePerProcess, Shared};
14+
use crate::service::blocking::SidecarTransport;
15+
use crate::service::SidecarServer;
16+
use crate::setup::{Liaison, SharedDirLiaison};
17+
use datadog_ipc::platform::AsyncChannel;
18+
use datadog_ipc::transport::blocking::BlockingTransport;
19+
20+
static MASTER_LISTENER: OnceLock<Mutex<Option<MasterListener>>> = OnceLock::new();
21+
22+
pub struct MasterListener {
23+
shutdown_tx: mpsc::Sender<()>,
24+
thread_handle: Option<JoinHandle<()>>,
25+
pid: i32,
26+
}
27+
28+
impl MasterListener {
29+
/// Start the master listener thread.
30+
///
31+
/// This spawns a new OS thread with a Tokio runtime that listens for
32+
/// worker connections. Only one listener can be active per process.
33+
pub fn start(pid: i32, config: Config) -> io::Result<()> {
34+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
35+
let mut listener_guard = listener_mutex.lock()
36+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
37+
38+
if listener_guard.is_some() {
39+
return Err(io::Error::other("Master listener is already running"));
40+
}
41+
42+
info!("Starting master listener thread for PID {}", pid);
43+
44+
let (shutdown_tx, shutdown_rx) = mpsc::channel();
45+
46+
// Wrap shutdown receiver in Arc<Mutex<>> for sharing with async function
47+
let shutdown_rx = Arc::new(Mutex::new(shutdown_rx));
48+
49+
let runtime = Runtime::new()
50+
.map_err(|e| io::Error::other(format!("Failed to create Tokio runtime: {}", e)))?;
51+
52+
let thread_handle = thread::Builder::new()
53+
.name(format!("ddtrace-sidecar-listener-{}", pid))
54+
.spawn(move || {
55+
runtime.block_on(async {
56+
if let Err(e) = run_listener(config, shutdown_rx).await {
57+
error!("Listener thread error: {}", e);
58+
}
59+
});
60+
info!("Listener thread exiting");
61+
})
62+
.map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?;
63+
64+
*listener_guard = Some(MasterListener {
65+
shutdown_tx,
66+
thread_handle: Some(thread_handle),
67+
pid,
68+
});
69+
70+
info!("Master listener thread started successfully");
71+
Ok(())
72+
}
73+
74+
/// Shutdown the master listener thread.
75+
///
76+
/// Sends shutdown signal and joins the listener thread. This is blocking
77+
/// and will wait for the thread to exit cleanly.
78+
pub fn shutdown() -> io::Result<()> {
79+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
80+
let mut listener_guard = listener_mutex.lock()
81+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
82+
83+
if let Some(mut master) = listener_guard.take() {
84+
info!("Shutting down master listener thread (PID {})", master.pid);
85+
86+
let _ = master.shutdown_tx.send(());
87+
88+
// Give the runtime a moment to process shutdown
89+
std::thread::sleep(Duration::from_millis(100));
90+
91+
if let Some(handle) = master.thread_handle.take() {
92+
handle.join()
93+
.map_err(|_| io::Error::other("Failed to join listener thread"))?;
94+
}
95+
96+
info!("Master listener thread shut down successfully");
97+
Ok(())
98+
} else {
99+
Err(io::Error::other("No master listener is running"))
100+
}
101+
}
102+
103+
/// Check if the master listener is active for the given PID.
104+
///
105+
/// Used for fork detection: child processes inherit the listener state
106+
/// but don't own the actual thread.
107+
pub fn is_active(pid: i32) -> bool {
108+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
109+
if let Ok(listener_guard) = listener_mutex.lock() {
110+
listener_guard.as_ref().is_some_and(|l| l.pid == pid)
111+
} else {
112+
false
113+
}
114+
}
115+
116+
/// Clear inherited listener state after fork.
117+
///
118+
/// Child processes must call this to prevent attempting to use the
119+
/// parent's listener thread, which doesn't exist in the child.
120+
pub fn clear_inherited_state() -> io::Result<()> {
121+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
122+
let mut listener_guard = listener_mutex.lock()
123+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
124+
125+
if listener_guard.is_some() {
126+
info!("Clearing inherited master listener state in child process");
127+
*listener_guard = None;
128+
}
129+
130+
Ok(())
131+
}
132+
}
133+
134+
/// Async listener loop that accepts worker connections.
135+
///
136+
/// This runs in the listener thread's Tokio runtime and handles:
137+
/// - Accepting new worker connections
138+
/// - Spawning handlers for each connection
139+
/// - Graceful shutdown on signal
140+
async fn run_listener(config: Config, shutdown_rx: Arc<Mutex<mpsc::Receiver<()>>>) -> io::Result<()> {
141+
info!("Listener thread running, creating IPC server");
142+
143+
// Create IPC server using the platform-specific Liaison
144+
let liaison: SharedDirLiaison = match config.ipc_mode {
145+
Shared => Liaison::ipc_shared(),
146+
InstancePerProcess => Liaison::ipc_per_process(),
147+
};
148+
149+
let std_listener = liaison.attempt_listen()?
150+
.ok_or_else(|| io::Error::other("Failed to create IPC listener"))?;
151+
152+
std_listener.set_nonblocking(true)?;
153+
let ipc_server = UnixListener::from_std(std_listener)?;
154+
155+
info!("IPC server listening for worker connections");
156+
157+
let server = SidecarServer::default();
158+
159+
loop {
160+
if let Ok(rx) = shutdown_rx.lock() {
161+
if rx.try_recv().is_ok() || matches!(rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)) {
162+
info!("Shutdown signal received, exiting listener loop");
163+
break;
164+
}
165+
}
166+
167+
match tokio::time::timeout(Duration::from_millis(100), ipc_server.accept()).await {
168+
Ok(Ok((client, _addr))) => {
169+
info!("Accepted new worker connection");
170+
let server_clone = server.clone();
171+
172+
tokio::spawn(async move {
173+
handle_worker_connection(client, server_clone).await;
174+
});
175+
}
176+
Ok(Err(e)) => {
177+
error!("Failed to accept worker connection: {}", e);
178+
}
179+
Err(_) => {
180+
// Timeout - continue loop to check shutdown signal
181+
continue;
182+
}
183+
}
184+
}
185+
186+
info!("Listener thread shutting down");
187+
Ok(())
188+
}
189+
190+
/// Handle a single worker connection.
191+
///
192+
/// Processes requests from the worker and sends responses until the
193+
/// connection is closed.
194+
async fn handle_worker_connection(
195+
client: tokio::net::UnixStream,
196+
server: SidecarServer,
197+
) {
198+
info!("Handling worker connection");
199+
server.accept_connection(AsyncChannel::from(client)).await;
200+
info!("Worker connection handler exiting");
201+
}
202+
203+
/// Connect to the master listener as a worker.
204+
///
205+
/// Establishes a connection to the master listener thread for the given PID.
206+
pub fn connect_to_master(pid: i32) -> io::Result<Box<SidecarTransport>> {
207+
info!("Connecting to master listener (PID {})", pid);
208+
209+
let config = Config::get();
210+
211+
let liaison: SharedDirLiaison = match config.ipc_mode {
212+
Shared => Liaison::ipc_shared(),
213+
InstancePerProcess => Liaison::ipc_per_process(),
214+
};
215+
216+
let channel = liaison.connect_to_server()
217+
.map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?;
218+
219+
let transport = BlockingTransport::from(channel);
220+
221+
let sidecar_transport = Box::new(SidecarTransport {
222+
inner: Mutex::new(transport),
223+
reconnect_fn: None, // Reconnection handled by caller
224+
});
225+
226+
info!("Successfully connected to master listener");
227+
Ok(sidecar_transport)
228+
}

0 commit comments

Comments
 (0)