Skip to content

Commit b66eabd

Browse files
committed
feat: implement AsyncRead/Write
1 parent 83cafcd commit b66eabd

File tree

4 files changed

+96
-8
lines changed

4 files changed

+96
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gix-transport/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async-client = [
6565
## Data structures implement `serde::Serialize` and `serde::Deserialize`.
6666
serde = ["dep:serde"]
6767

68-
russh = ["dep:russh"]
68+
russh = ["dep:russh", "dep:tokio"]
6969

7070
[[test]]
7171
name = "blocking-transport"
@@ -124,6 +124,7 @@ async-std = { version = "1.12.0", optional = true }
124124
document-features = { version = "0.2.0", optional = true }
125125

126126
russh = { version = "0.53", optional = true }
127+
tokio = { version = "1", optional = true, default-features = false }
127128

128129
[dev-dependencies]
129130
gix-pack = { path = "../gix-pack", default-features = false, features = [

gix-transport/src/client/async_io/ssh/client.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::{ops::DerefMut, sync::Arc, task::ready};
22

33
use russh::{
44
client::Handle,
@@ -9,6 +9,7 @@ pub enum AuthMode {
99
UsernamePassword { username: String, password: String },
1010
}
1111

12+
#[derive(Clone)]
1213
pub struct Client {
1314
handle: Arc<Handle<ClientHandler>>,
1415
}
@@ -37,10 +38,90 @@ impl Client {
3738
}
3839
}
3940
}
41+
42+
pub async fn open_session(&mut self) -> Result<Session, super::Error> {
43+
let channel = self.handle.channel_open_session().await?;
44+
let stream = channel.into_stream();
45+
Ok(Session {
46+
stream: Arc::new(std::sync::Mutex::new(stream)),
47+
})
48+
}
49+
}
50+
51+
#[derive(Clone)]
52+
pub struct Session {
53+
stream: Arc<std::sync::Mutex<russh::ChannelStream<russh::client::Msg>>>,
54+
}
55+
56+
impl Session {
57+
fn poll_fn<F, R>(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, poll_fn: F) -> std::task::Poll<R>
58+
where
59+
F: FnOnce(
60+
std::pin::Pin<&mut russh::ChannelStream<russh::client::Msg>>,
61+
&mut std::task::Context<'_>,
62+
) -> std::task::Poll<R>,
63+
{
64+
match self.stream.try_lock() {
65+
Ok(mut inner) => {
66+
let pinned = std::pin::Pin::new(inner.deref_mut());
67+
(poll_fn)(pinned, cx)
68+
}
69+
Err(_) => {
70+
cx.waker().wake_by_ref();
71+
std::task::Poll::Pending
72+
}
73+
}
74+
}
75+
}
76+
77+
impl futures_io::AsyncRead for Session {
78+
fn poll_read(
79+
self: std::pin::Pin<&mut Self>,
80+
cx: &mut std::task::Context<'_>,
81+
slice: &mut [u8],
82+
) -> std::task::Poll<std::io::Result<usize>> {
83+
self.poll_fn(cx, |pinned, cx| {
84+
let mut buf = tokio::io::ReadBuf::new(slice);
85+
ready!(tokio::io::AsyncRead::poll_read(pinned, cx, &mut buf))?;
86+
std::task::Poll::Ready(Ok(buf.filled().len()))
87+
})
88+
}
89+
}
90+
91+
impl futures_io::AsyncWrite for Session {
92+
fn poll_write(
93+
self: std::pin::Pin<&mut Self>,
94+
cx: &mut std::task::Context<'_>,
95+
buf: &[u8],
96+
) -> std::task::Poll<std::io::Result<usize>> {
97+
self.poll_fn(cx, |pinned, cx| tokio::io::AsyncWrite::poll_write(pinned, cx, buf))
98+
}
99+
100+
fn poll_flush(
101+
self: std::pin::Pin<&mut Self>,
102+
cx: &mut std::task::Context<'_>,
103+
) -> std::task::Poll<std::io::Result<()>> {
104+
self.poll_fn(cx, tokio::io::AsyncWrite::poll_flush)
105+
}
106+
107+
fn poll_close(
108+
self: std::pin::Pin<&mut Self>,
109+
cx: &mut std::task::Context<'_>,
110+
) -> std::task::Poll<std::io::Result<()>> {
111+
self.poll_fn(cx, tokio::io::AsyncWrite::poll_shutdown)
112+
}
40113
}
41114

42115
struct ClientHandler;
43116

44117
impl Handler for ClientHandler {
45118
type Error = super::Error;
119+
120+
async fn check_server_key(
121+
&mut self,
122+
_server_public_key: &russh::keys::ssh_key::PublicKey,
123+
) -> Result<bool, Self::Error> {
124+
// TODO: configurable
125+
Ok(true)
126+
}
46127
}

gix-transport/src/client/async_io/ssh/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub struct NativeSsh {
1616

1717
identity: Option<gix_sec::identity::Account>,
1818
client: Option<client::Client>,
19-
connection: Option<crate::client::git::Connection<&'static [u8], Vec<u8>>>,
19+
connection: Option<crate::client::git::Connection<client::Session, client::Session>>,
2020
}
2121

2222
impl TransportWithoutIO for NativeSsh {
@@ -31,7 +31,11 @@ impl TransportWithoutIO for NativeSsh {
3131
on_into_read: crate::client::MessageKind,
3232
trace: bool,
3333
) -> Result<super::RequestWriter<'_>, crate::client::Error> {
34-
todo!()
34+
if let Some(connection) = &mut self.connection {
35+
connection.request(write_mode, on_into_read, trace)
36+
} else {
37+
Err(crate::client::Error::MissingHandshake)
38+
}
3539
}
3640

3741
fn to_url(&self) -> std::borrow::Cow<'_, bstr::BStr> {
@@ -44,7 +48,7 @@ impl TransportWithoutIO for NativeSsh {
4448

4549
fn configure(
4650
&mut self,
47-
config: &dyn std::any::Any,
51+
_config: &dyn std::any::Any,
4852
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
4953
Ok(())
5054
}
@@ -72,11 +76,12 @@ impl Transport for NativeSsh {
7276
None => return Err(crate::client::Error::AuthenticationUnsupported),
7377
};
7478

75-
let client = client::Client::connect(host, port, auth_mode).await?;
79+
let mut client = client::Client::connect(host, port, auth_mode).await?;
80+
let session = client.open_session().await?;
7681

7782
let connection = crate::client::git::Connection::new(
78-
[0u8].as_slice(), // TODO
79-
vec![], // TODO
83+
session.clone(),
84+
session,
8085
self.desired_version,
8186
self.url.path.clone(),
8287
None::<(String, _)>,

0 commit comments

Comments
 (0)