Skip to content

Commit 4f612d0

Browse files
authored
Merge pull request #30 from bk-rs/bb8
Bb8
2 parents d3d5359 + c842ed3 commit 4f612d0

File tree

6 files changed

+219
-0
lines changed

6 files changed

+219
-0
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
[workspace]
22
members = [
3+
#
34
"async-ssh2-lite",
45
"async-ssh2-lite/demos/*",
6+
#
7+
"bb8-async-ssh2-lite",
8+
"bb8-async-ssh2-lite/demo"
59
]

bb8-async-ssh2-lite/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "bb8-async-ssh2-lite"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[features]
7+
default = ["tokio"]
8+
9+
tokio = ["async-ssh2-lite/tokio"]
10+
11+
[dependencies]
12+
async-ssh2-lite = { version = "0.4", default-features = false, path = "../async-ssh2-lite" }
13+
14+
bb8 = { version = "0.8", default-features = false }
15+
async-trait = { version = "0.1", default-features = false }
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[package]
2+
name = "bb8-async-ssh2-lite-demo"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[[bin]]
7+
name = "bb8_asl_demo_tokio_tcp_stream"
8+
path = "src/tokio_tcp_stream.rs"
9+
10+
[dependencies]
11+
bb8-async-ssh2-lite = { path = "..", features = ["tokio"] }
12+
13+
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
14+
futures-util = { version = "0.3" }
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
RUST_BACKTRACE=1 RUST_LOG=trace cargo run -p bb8-async-ssh2-lite-demo --bin bb8_asl_demo_tokio_tcp_stream -- 127.0.0.1:22 root '~/.ssh/id_rsa'
3+
*/
4+
5+
use std::env;
6+
7+
use bb8_async_ssh2_lite::{bb8, AsyncSessionManagerWithTokioTcpStream, AsyncSessionUserauthType};
8+
use futures_util::{future::join_all, AsyncReadExt as _};
9+
10+
#[tokio::main]
11+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
12+
let socket_addr = env::args().nth(1).ok_or("socket_addr missing")?.parse()?;
13+
let username = env::args().nth(2).ok_or("username missing")?;
14+
let privatekey = env::args().nth(3).ok_or("privatekey missing")?.parse()?;
15+
16+
let mgr = AsyncSessionManagerWithTokioTcpStream::new(
17+
socket_addr,
18+
None,
19+
username,
20+
AsyncSessionUserauthType::PubkeyFile {
21+
pubkey: None,
22+
privatekey,
23+
passphrase: None,
24+
},
25+
);
26+
27+
let pool = bb8::Pool::builder().build(mgr).await?;
28+
29+
let mut handles = vec![];
30+
for i in 0..10 {
31+
let pool = pool.clone();
32+
let handle = tokio::spawn(async move {
33+
let session = pool.get().await?;
34+
35+
let mut channel = session.channel_session().await?;
36+
channel.exec("hostname").await?;
37+
let mut s = String::new();
38+
channel.read_to_string(&mut s).await?;
39+
println!("exec hostname output:{s} i:{i}");
40+
channel.close().await?;
41+
println!("exec hostname exit_status:{} i:{i}", channel.exit_status()?);
42+
43+
Result::<(), Box<dyn std::error::Error + Send + Sync>>::Ok(())
44+
});
45+
handles.push(handle);
46+
}
47+
48+
let rets = join_all(handles).await;
49+
println!("rets:{rets:?}");
50+
assert!(rets.iter().all(|x| x.is_ok()));
51+
52+
Ok(())
53+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::net::SocketAddr;
2+
3+
use async_ssh2_lite::{AsyncSession, SessionConfiguration, TokioTcpStream};
4+
use async_trait::async_trait;
5+
6+
use crate::{AsyncSessionManagerError, AsyncSessionUserauthType};
7+
8+
//
9+
#[derive(Debug, Clone)]
10+
pub struct AsyncSessionManagerWithTokioTcpStream {
11+
socket_addr: SocketAddr,
12+
configuration: Option<SessionConfiguration>,
13+
username: String,
14+
userauth_type: AsyncSessionUserauthType,
15+
}
16+
17+
impl AsyncSessionManagerWithTokioTcpStream {
18+
pub fn new(
19+
socket_addr: SocketAddr,
20+
configuration: impl Into<Option<SessionConfiguration>>,
21+
username: impl AsRef<str>,
22+
userauth_type: AsyncSessionUserauthType,
23+
) -> Self {
24+
Self {
25+
socket_addr,
26+
configuration: configuration.into(),
27+
username: username.as_ref().into(),
28+
userauth_type,
29+
}
30+
}
31+
}
32+
33+
#[async_trait]
34+
impl bb8::ManageConnection for AsyncSessionManagerWithTokioTcpStream {
35+
type Connection = AsyncSession<TokioTcpStream>;
36+
37+
type Error = AsyncSessionManagerError;
38+
39+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
40+
let mut session = AsyncSession::<TokioTcpStream>::connect(
41+
self.socket_addr,
42+
self.configuration.to_owned(),
43+
)
44+
.await
45+
.map_err(AsyncSessionManagerError::ConnectError)?;
46+
47+
session
48+
.handshake()
49+
.await
50+
.map_err(AsyncSessionManagerError::HandshakeError)?;
51+
52+
match &self.userauth_type {
53+
AsyncSessionUserauthType::Password { password } => {
54+
session
55+
.userauth_password(&self.username, password)
56+
.await
57+
.map_err(AsyncSessionManagerError::UserauthError)?;
58+
}
59+
AsyncSessionUserauthType::Agent => {
60+
session
61+
.userauth_agent(&self.username)
62+
.await
63+
.map_err(AsyncSessionManagerError::UserauthError)?;
64+
}
65+
AsyncSessionUserauthType::PubkeyFile {
66+
pubkey,
67+
privatekey,
68+
passphrase,
69+
} => {
70+
session
71+
.userauth_pubkey_file(
72+
&self.username,
73+
pubkey.as_deref(),
74+
privatekey,
75+
passphrase.as_deref(),
76+
)
77+
.await
78+
.map_err(AsyncSessionManagerError::UserauthError)?;
79+
}
80+
}
81+
82+
if !session.authenticated() {
83+
return Err(AsyncSessionManagerError::AssertAuthenticated);
84+
}
85+
86+
Ok(session)
87+
}
88+
89+
async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
90+
Ok(())
91+
}
92+
93+
fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
94+
false
95+
}
96+
}

bb8-async-ssh2-lite/src/lib.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
pub use async_ssh2_lite;
2+
pub use bb8;
3+
4+
#[cfg(feature = "tokio")]
5+
mod impl_tokio;
6+
#[cfg(feature = "tokio")]
7+
pub use impl_tokio::AsyncSessionManagerWithTokioTcpStream;
8+
9+
use std::path::PathBuf;
10+
11+
//
12+
#[derive(Debug, Clone)]
13+
pub enum AsyncSessionUserauthType {
14+
Password {
15+
password: String,
16+
},
17+
Agent,
18+
PubkeyFile {
19+
pubkey: Option<PathBuf>,
20+
privatekey: PathBuf,
21+
passphrase: Option<String>,
22+
},
23+
}
24+
25+
#[derive(Debug)]
26+
pub enum AsyncSessionManagerError {
27+
ConnectError(async_ssh2_lite::Error),
28+
HandshakeError(async_ssh2_lite::Error),
29+
UserauthError(async_ssh2_lite::Error),
30+
AssertAuthenticated,
31+
}
32+
impl core::fmt::Display for AsyncSessionManagerError {
33+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
34+
write!(f, "{self:?}")
35+
}
36+
}
37+
impl std::error::Error for AsyncSessionManagerError {}

0 commit comments

Comments
 (0)