Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/src/bin/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn session(user: &str) -> Result<Session<async_native_tls::TlsStream<TcpSt
let mut client = async_imap::Client::new(tls_stream);
let _greeting = client
.read_response()
.await
.await?
.context("unexpected end of stream, expected greeting")?;

let session = client
Expand All @@ -48,7 +48,7 @@ async fn _connect_insecure_then_secure() -> Result<()> {
let mut client = async_imap::Client::new(tcp_stream);
let _greeting = client
.read_response()
.await
.await?
.context("unexpected end of stream, expected greeting")?;
client.run_command_and_check_ok("STARTTLS", None).await?;
let stream = client.into_inner();
Expand Down
125 changes: 59 additions & 66 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use async_std::io::{Read, Write, WriteExt};
use base64::Engine as _;
use extensions::id::{format_identification, parse_id};
use extensions::quota::parse_get_quota_root;
use futures::{io, Stream, StreamExt};
use futures::{io, Stream, TryStreamExt};
use imap_proto::{Metadata, RequestId, Response};
#[cfg(feature = "runtime-tokio")]
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};
Expand Down Expand Up @@ -122,7 +122,7 @@ macro_rules! ok_or_unauth_client_err {
($r:expr, $self:expr) => {
match $r {
Ok(o) => o,
Err(e) => return Err((e, $self)),
Err(e) => return Err((e.into(), $self)),
}
};
}
Expand Down Expand Up @@ -262,42 +262,37 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Client<T> {
// explicit match blocks neccessary to convert error to tuple and not bind self too
// early (see also comment on `login`)
loop {
if let Some(res) = self.read_response().await {
let res = ok_or_unauth_client_err!(res.map_err(Into::into), self);
match res.parsed() {
Response::Continue { information, .. } => {
let challenge = if let Some(text) = information {
ok_or_unauth_client_err!(
base64::engine::general_purpose::STANDARD
.decode(text.as_ref())
.map_err(|e| Error::Parse(ParseError::Authentication(
(*text).to_string(),
Some(e)
))),
self
)
} else {
Vec::new()
};
let raw_response = &mut authenticator.process(&challenge);
let auth_response =
base64::engine::general_purpose::STANDARD.encode(raw_response);

ok_or_unauth_client_err!(
self.conn.run_command_untagged(&auth_response).await,
self
);
}
_ => {
let Some(res) = ok_or_unauth_client_err!(self.read_response().await, self) else {
return Err((Error::ConnectionLost, self));
};
match res.parsed() {
Response::Continue { information, .. } => {
let challenge = if let Some(text) = information {
ok_or_unauth_client_err!(
self.check_done_ok_from(&id, None, res).await,
base64::engine::general_purpose::STANDARD
.decode(text.as_ref())
.map_err(|e| Error::Parse(ParseError::Authentication(
(*text).to_string(),
Some(e)
))),
self
);
return Ok(Session::new(self.conn));
}
)
} else {
Vec::new()
};
let raw_response = &mut authenticator.process(&challenge);
let auth_response =
base64::engine::general_purpose::STANDARD.encode(raw_response);

ok_or_unauth_client_err!(
self.conn.run_command_untagged(&auth_response).await,
self
);
}
_ => {
ok_or_unauth_client_err!(self.check_done_ok_from(&id, None, res).await, self);
return Ok(Session::new(self.conn));
}
} else {
return Err((Error::ConnectionLost, self));
}
}
}
Expand Down Expand Up @@ -975,12 +970,13 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
mailbox_pattern.unwrap_or("\"\"")
))
.await?;

Ok(parse_names(
let names = parse_names(
&mut self.conn.stream,
self.unsolicited_responses_tx.clone(),
id,
))
);

Ok(names)
}

/// The [`LSUB` command](https://tools.ietf.org/html/rfc3501#section-6.3.9) returns a subset of
Expand Down Expand Up @@ -1136,23 +1132,20 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
))
.await?;

match self.read_response().await {
Some(Ok(res)) => {
if let Response::Continue { .. } = res.parsed() {
self.stream.as_mut().write_all(content).await?;
self.stream.as_mut().write_all(b"\r\n").await?;
self.stream.flush().await?;
self.conn
.check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
.await?;
Ok(())
} else {
Err(Error::Append)
}
}
Some(Err(err)) => Err(err.into()),
_ => Err(Error::Append),
}
let Some(res) = self.read_response().await? else {
return Err(Error::Append);
};
let Response::Continue { .. } = res.parsed() else {
return Err(Error::Append);
};

self.stream.as_mut().write_all(content).await?;
self.stream.as_mut().write_all(b"\r\n").await?;
self.stream.flush().await?;
self.conn
.check_done_ok(&id, Some(self.unsolicited_responses_tx.clone()))
.await?;
Ok(())
}

/// The [`SEARCH` command](https://tools.ietf.org/html/rfc3501#section-6.4.4) searches the
Expand Down Expand Up @@ -1352,7 +1345,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
}

/// Read the next response on the connection.
pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
self.conn.read_response().await
}
}
Expand All @@ -1377,8 +1370,8 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
}

/// Read the next response on the connection.
pub async fn read_response(&mut self) -> Option<io::Result<ResponseData>> {
self.stream.next().await
pub async fn read_response(&mut self) -> io::Result<Option<ResponseData>> {
self.stream.try_next().await
}

pub(crate) async fn run_command_untagged(&mut self, command: &str) -> Result<()> {
Expand Down Expand Up @@ -1415,8 +1408,8 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
id: &RequestId,
unsolicited: Option<channel::Sender<UnsolicitedResponse>>,
) -> Result<()> {
if let Some(first_res) = self.stream.next().await {
self.check_done_ok_from(id, unsolicited, first_res?).await
if let Some(first_res) = self.stream.try_next().await? {
self.check_done_ok_from(id, unsolicited, first_res).await
} else {
Err(Error::ConnectionLost)
}
Expand Down Expand Up @@ -1447,11 +1440,10 @@ impl<T: Read + Write + Unpin + fmt::Debug> Connection<T> {
handle_unilateral(response, unsolicited);
}

if let Some(res) = self.stream.next().await {
response = res?;
} else {
let Some(res) = self.stream.try_next().await? else {
return Err(Error::ConnectionLost);
}
};
response = res;
}
}

Expand Down Expand Up @@ -1495,6 +1487,7 @@ mod tests {
use std::future::Future;

use async_std::sync::{Arc, Mutex};
use futures::StreamExt;
use imap_proto::Status;

macro_rules! mock_client {
Expand Down Expand Up @@ -1555,7 +1548,7 @@ mod tests {
async fn readline_eof() {
let mock_stream = MockStream::default().with_eof();
let mut client = mock_client!(mock_stream);
let res = client.read_response().await;
let res = client.read_response().await.unwrap();
assert!(res.is_none());
}

Expand Down Expand Up @@ -2117,7 +2110,7 @@ mod tests {
.unwrap();

// Unexpected EOF.
let err = fetch_result.next().await.unwrap().unwrap_err();
let err = fetch_result.try_next().await.unwrap_err();
let Error::Io(io_err) = err else {
panic!("Unexpected error type: {err}")
};
Expand Down
5 changes: 2 additions & 3 deletions src/extensions/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ pub(crate) async fn parse_id<T: Stream<Item = io::Result<ResponseData>> + Unpin>
let mut id = None;
while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
match resp.parsed() {
Response::Id(res) => {
id = res.as_ref().map(|m| {
Expand Down
3 changes: 1 addition & 2 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
pub async fn init(&mut self) -> Result<()> {
let id = self.session.run_command("IDLE").await?;
self.id = Some(id);
while let Some(res) = self.session.stream.next().await {
let res = res?;
while let Some(res) = self.session.stream.try_next().await? {
match res.parsed() {
Response::Continue { .. } => {
return Ok(());
Expand Down
10 changes: 4 additions & 6 deletions src/extensions/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ pub(crate) async fn parse_get_quota<T: Stream<Item = io::Result<ResponseData>> +
let mut quota = None;
while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
match resp.parsed() {
Response::Quota(q) => quota = Some(q.clone().into()),
_ => {
Expand All @@ -53,10 +52,9 @@ pub(crate) async fn parse_get_quota_root<T: Stream<Item = io::Result<ResponseDat

while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
match resp.parsed() {
Response::QuotaRoot(qr) => {
roots.push(qr.clone().into());
Expand Down
26 changes: 10 additions & 16 deletions src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ pub(crate) async fn parse_status<T: Stream<Item = io::Result<ResponseData>> + Un
) -> Result<Mailbox> {
let mut mbox = Mailbox::default();

while let Some(resp) = stream.next().await {
let resp = resp?;
while let Some(resp) = stream.try_next().await? {
match resp.parsed() {
Response::Done {
tag,
Expand Down Expand Up @@ -192,10 +191,9 @@ pub(crate) async fn parse_capabilities<T: Stream<Item = io::Result<ResponseData>

while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
match resp.parsed() {
Response::Capabilities(cs) => {
for c in cs {
Expand All @@ -218,10 +216,9 @@ pub(crate) async fn parse_noop<T: Stream<Item = io::Result<ResponseData>> + Unpi
) -> Result<()> {
while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
handle_unilateral(resp, unsolicited.clone());
}

Expand All @@ -235,8 +232,7 @@ pub(crate) async fn parse_mailbox<T: Stream<Item = io::Result<ResponseData>> + U
) -> Result<Mailbox> {
let mut mailbox = Mailbox::default();

while let Some(resp) = stream.next().await {
let resp = resp?;
while let Some(resp) = stream.try_next().await? {
match resp.parsed() {
Response::Done {
tag,
Expand Down Expand Up @@ -345,10 +341,9 @@ pub(crate) async fn parse_ids<T: Stream<Item = io::Result<ResponseData>> + Unpin

while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
match resp.parsed() {
Response::MailboxData(MailboxDatum::Search(cs)) => {
for c in cs {
Expand All @@ -374,10 +369,9 @@ pub(crate) async fn parse_metadata<T: Stream<Item = io::Result<ResponseData>> +
let mut res_values = Vec::new();
while let Some(resp) = stream
.take_while(|res| filter(res, &command_tag))
.next()
.await
.try_next()
.await?
{
let resp = resp?;
match resp.parsed() {
// METADATA Response with Values
// <https://datatracker.ietf.org/doc/html/rfc5464.html#section-4.4.1>
Expand Down
Loading