Skip to content

Commit f320c2e

Browse files
committed
Split the rest of utils
1 parent 71c152c commit f320c2e

File tree

6 files changed

+127
-127
lines changed

6 files changed

+127
-127
lines changed

rust/operator-binary/src/csi_server/controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use stackable_operator::{
88
};
99
use tonic::{Request, Response, Status};
1010

11-
use crate::utils::error_full_message;
11+
use crate::utils::error::error_full_message;
1212

1313
use super::{tonic_unimplemented, ListenerSelector, ListenerVolumeContext};
1414

rust/operator-binary/src/csi_server/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::{
2121
listener_mounted_pod_label, listener_persistent_volume_label, ListenerMountedPodLabelError,
2222
ListenerPersistentVolumeLabelError,
2323
},
24-
utils::{address::node_primary_addresses, error_full_message},
24+
utils::{address::node_primary_addresses, error::error_full_message},
2525
};
2626

2727
use super::{tonic_unimplemented, ListenerSelector, ListenerVolumeContext};

rust/operator-binary/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use stackable_operator::{
1717
use tokio::signal::unix::{signal, SignalKind};
1818
use tokio_stream::wrappers::UnixListenerStream;
1919
use tonic::transport::Server;
20-
use utils::{uds_bind_private, TonicUnixStream};
20+
use utils::unix_stream::{uds_bind_private, TonicUnixStream};
2121

2222
mod csi_server;
2323
mod listener_controller;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/// Combines the messages of an error and its sources into a [`String`] of the form `"error: source 1: source 2: root error"`
2+
pub fn error_full_message(err: &dyn std::error::Error) -> String {
3+
use std::fmt::Write;
4+
// Build the full hierarchy of error messages by walking up the stack until an error
5+
// without `source` set is encountered and concatenating all encountered error strings.
6+
let mut full_msg = format!("{}", err);
7+
let mut curr_err = err.source();
8+
while let Some(curr_source) = curr_err {
9+
write!(full_msg, ": {curr_source}").expect("string formatting should be infallible");
10+
curr_err = curr_source.source();
11+
}
12+
full_msg
13+
}
14+
15+
#[cfg(test)]
16+
pub(crate) mod tests {
17+
use super::error_full_message;
18+
19+
#[test]
20+
fn error_messages() {
21+
assert_eq!(
22+
error_full_message(anyhow::anyhow!("standalone error").as_ref()),
23+
"standalone error"
24+
);
25+
assert_eq!(
26+
error_full_message(
27+
anyhow::anyhow!("root error")
28+
.context("middleware")
29+
.context("leaf")
30+
.as_ref()
31+
),
32+
"leaf: middleware: root error"
33+
);
34+
}
35+
}
Lines changed: 2 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,125 +1,3 @@
1-
use std::{os::unix::prelude::AsRawFd, path::Path};
2-
3-
use pin_project::pin_project;
4-
use socket2::Socket;
5-
use tokio::{
6-
io::{AsyncRead, AsyncWrite},
7-
net::{UnixListener, UnixStream},
8-
};
9-
use tonic::transport::server::Connected;
10-
111
pub mod address;
12-
13-
/// Adapter for using [`UnixStream`] as a [`tonic`] connection
14-
/// Tonic usually communicates via TCP sockets, but the Kubernetes CSI interface expects
15-
/// plugins to use Unix sockets instead.
16-
/// This provides a wrapper implementation which delegates to tokio's [`UnixStream`] in order
17-
/// to enable tonic to communicate via Unix sockets.
18-
#[pin_project]
19-
pub struct TonicUnixStream(#[pin] pub UnixStream);
20-
21-
impl AsyncRead for TonicUnixStream {
22-
fn poll_read(
23-
self: std::pin::Pin<&mut Self>,
24-
cx: &mut std::task::Context<'_>,
25-
buf: &mut tokio::io::ReadBuf<'_>,
26-
) -> std::task::Poll<std::io::Result<()>> {
27-
self.project().0.poll_read(cx, buf)
28-
}
29-
}
30-
31-
impl AsyncWrite for TonicUnixStream {
32-
fn poll_write(
33-
self: std::pin::Pin<&mut Self>,
34-
cx: &mut std::task::Context<'_>,
35-
buf: &[u8],
36-
) -> std::task::Poll<Result<usize, std::io::Error>> {
37-
self.project().0.poll_write(cx, buf)
38-
}
39-
40-
fn poll_flush(
41-
self: std::pin::Pin<&mut Self>,
42-
cx: &mut std::task::Context<'_>,
43-
) -> std::task::Poll<Result<(), std::io::Error>> {
44-
self.project().0.poll_flush(cx)
45-
}
46-
47-
fn poll_shutdown(
48-
self: std::pin::Pin<&mut Self>,
49-
cx: &mut std::task::Context<'_>,
50-
) -> std::task::Poll<Result<(), std::io::Error>> {
51-
self.project().0.poll_shutdown(cx)
52-
}
53-
54-
fn poll_write_vectored(
55-
self: std::pin::Pin<&mut Self>,
56-
cx: &mut std::task::Context<'_>,
57-
bufs: &[std::io::IoSlice<'_>],
58-
) -> std::task::Poll<Result<usize, std::io::Error>> {
59-
self.project().0.poll_write_vectored(cx, bufs)
60-
}
61-
62-
fn is_write_vectored(&self) -> bool {
63-
self.0.is_write_vectored()
64-
}
65-
}
66-
67-
impl Connected for TonicUnixStream {
68-
type ConnectInfo = ();
69-
70-
fn connect_info(&self) -> Self::ConnectInfo {}
71-
}
72-
73-
/// Bind a Unix Domain Socket listener that is only accessible to the current user
74-
pub fn uds_bind_private(path: impl AsRef<Path>) -> Result<UnixListener, std::io::Error> {
75-
// Workaround for https://github.com/tokio-rs/tokio/issues/4422
76-
let socket = Socket::new(socket2::Domain::UNIX, socket2::Type::STREAM, None)?;
77-
unsafe {
78-
// Socket-level chmod is propagated to the file created by Socket::bind.
79-
// We need to chmod /before/ creating the file, because otherwise there is a brief window where
80-
// the file is world-accessible (unless restricted by the global umask).
81-
if libc::fchmod(socket.as_raw_fd(), 0o600) == -1 {
82-
return Err(std::io::Error::last_os_error());
83-
}
84-
}
85-
socket.bind(&socket2::SockAddr::unix(path)?)?;
86-
socket.listen(1024)?;
87-
socket.set_nonblocking(true)?;
88-
UnixListener::from_std(socket.into())
89-
}
90-
91-
/// Combines the messages of an error and its sources into a [`String`] of the form `"error: source 1: source 2: root error"`
92-
pub fn error_full_message(err: &dyn std::error::Error) -> String {
93-
use std::fmt::Write;
94-
// Build the full hierarchy of error messages by walking up the stack until an error
95-
// without `source` set is encountered and concatenating all encountered error strings.
96-
let mut full_msg = format!("{}", err);
97-
let mut curr_err = err.source();
98-
while let Some(curr_source) = curr_err {
99-
write!(full_msg, ": {curr_source}").expect("string formatting should be infallible");
100-
curr_err = curr_source.source();
101-
}
102-
full_msg
103-
}
104-
105-
#[cfg(test)]
106-
mod tests {
107-
use crate::utils::error_full_message;
108-
109-
#[test]
110-
fn error_messages() {
111-
assert_eq!(
112-
error_full_message(anyhow::anyhow!("standalone error").as_ref()),
113-
"standalone error"
114-
);
115-
assert_eq!(
116-
error_full_message(
117-
anyhow::anyhow!("root error")
118-
.context("middleware")
119-
.context("leaf")
120-
.as_ref()
121-
),
122-
"leaf: middleware: root error"
123-
);
124-
}
125-
}
2+
pub mod error;
3+
pub mod unix_stream;
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::{os::unix::prelude::AsRawFd, path::Path};
2+
3+
use pin_project::pin_project;
4+
use socket2::Socket;
5+
use tokio::{
6+
io::{AsyncRead, AsyncWrite},
7+
net::{UnixListener, UnixStream},
8+
};
9+
use tonic::transport::server::Connected;
10+
11+
/// Adapter for using [`UnixStream`] as a [`tonic`] connection
12+
/// Tonic usually communicates via TCP sockets, but the Kubernetes CSI interface expects
13+
/// plugins to use Unix sockets instead.
14+
/// This provides a wrapper implementation which delegates to tokio's [`UnixStream`] in order
15+
/// to enable tonic to communicate via Unix sockets.
16+
#[pin_project]
17+
pub struct TonicUnixStream(#[pin] pub UnixStream);
18+
19+
impl AsyncRead for TonicUnixStream {
20+
fn poll_read(
21+
self: std::pin::Pin<&mut Self>,
22+
cx: &mut std::task::Context<'_>,
23+
buf: &mut tokio::io::ReadBuf<'_>,
24+
) -> std::task::Poll<std::io::Result<()>> {
25+
self.project().0.poll_read(cx, buf)
26+
}
27+
}
28+
29+
impl AsyncWrite for TonicUnixStream {
30+
fn poll_write(
31+
self: std::pin::Pin<&mut Self>,
32+
cx: &mut std::task::Context<'_>,
33+
buf: &[u8],
34+
) -> std::task::Poll<Result<usize, std::io::Error>> {
35+
self.project().0.poll_write(cx, buf)
36+
}
37+
38+
fn poll_flush(
39+
self: std::pin::Pin<&mut Self>,
40+
cx: &mut std::task::Context<'_>,
41+
) -> std::task::Poll<Result<(), std::io::Error>> {
42+
self.project().0.poll_flush(cx)
43+
}
44+
45+
fn poll_shutdown(
46+
self: std::pin::Pin<&mut Self>,
47+
cx: &mut std::task::Context<'_>,
48+
) -> std::task::Poll<Result<(), std::io::Error>> {
49+
self.project().0.poll_shutdown(cx)
50+
}
51+
52+
fn poll_write_vectored(
53+
self: std::pin::Pin<&mut Self>,
54+
cx: &mut std::task::Context<'_>,
55+
bufs: &[std::io::IoSlice<'_>],
56+
) -> std::task::Poll<Result<usize, std::io::Error>> {
57+
self.project().0.poll_write_vectored(cx, bufs)
58+
}
59+
60+
fn is_write_vectored(&self) -> bool {
61+
self.0.is_write_vectored()
62+
}
63+
}
64+
65+
impl Connected for TonicUnixStream {
66+
type ConnectInfo = ();
67+
68+
fn connect_info(&self) -> Self::ConnectInfo {}
69+
}
70+
71+
/// Bind a Unix Domain Socket listener that is only accessible to the current user
72+
pub fn uds_bind_private(path: impl AsRef<Path>) -> Result<UnixListener, std::io::Error> {
73+
// Workaround for https://github.com/tokio-rs/tokio/issues/4422
74+
let socket = Socket::new(socket2::Domain::UNIX, socket2::Type::STREAM, None)?;
75+
unsafe {
76+
// Socket-level chmod is propagated to the file created by Socket::bind.
77+
// We need to chmod /before/ creating the file, because otherwise there is a brief window where
78+
// the file is world-accessible (unless restricted by the global umask).
79+
if libc::fchmod(socket.as_raw_fd(), 0o600) == -1 {
80+
return Err(std::io::Error::last_os_error());
81+
}
82+
}
83+
socket.bind(&socket2::SockAddr::unix(path)?)?;
84+
socket.listen(1024)?;
85+
socket.set_nonblocking(true)?;
86+
UnixListener::from_std(socket.into())
87+
}

0 commit comments

Comments
 (0)