Skip to content

Commit 4da5594

Browse files
committed
async: remove the need for {Ready,Empty}Producer
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 18628d9 commit 4da5594

File tree

6 files changed

+45
-104
lines changed

6 files changed

+45
-104
lines changed

crates/wasi-http/src/p3/body.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::p3::bindings::http::types::{ErrorCode, Fields, Trailers};
22
use crate::p3::{WasiHttp, WasiHttpCtxView};
33
use anyhow::Context as _;
44
use bytes::Bytes;
5+
use core::iter;
56
use core::num::NonZeroUsize;
67
use core::pin::Pin;
78
use core::task::{Context, Poll, ready};
@@ -13,8 +14,8 @@ use std::sync::Arc;
1314
use tokio::sync::{mpsc, oneshot};
1415
use tokio_util::sync::PollSender;
1516
use wasmtime::component::{
16-
Access, Destination, EmptyProducer, FutureConsumer, FutureReader, Resource, Source,
17-
StreamConsumer, StreamProducer, StreamReader, StreamResult,
17+
Access, Destination, FutureConsumer, FutureReader, Resource, Source, StreamConsumer,
18+
StreamProducer, StreamReader, StreamResult,
1819
};
1920
use wasmtime::{AsContextMut, StoreContextMut};
2021

@@ -74,7 +75,7 @@ impl Body {
7475
// https://github.com/WebAssembly/wasi-http/issues/176
7576
_ = result_tx.send(Box::new(async { Ok(()) }));
7677
Ok((
77-
StreamReader::new(instance, &mut store, EmptyProducer::default()),
78+
StreamReader::new(instance, &mut store, iter::empty()),
7879
trailers_rx,
7980
))
8081
}

crates/wasi/src/p3/filesystem/host.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,18 @@ use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
99
use crate::{DirPerms, FilePerms};
1010
use anyhow::Context as _;
1111
use bytes::BytesMut;
12-
use core::mem;
1312
use core::pin::Pin;
1413
use core::task::{Context, Poll, ready};
14+
use core::{iter, mem};
1515
use std::io::{self, Cursor};
1616
use std::sync::Arc;
1717
use system_interface::fs::FileIoExt as _;
1818
use tokio::sync::{mpsc, oneshot};
1919
use tokio::task::{JoinHandle, spawn_blocking};
2020
use wasmtime::StoreContextMut;
2121
use wasmtime::component::{
22-
Accessor, Destination, EmptyProducer, FutureReader, ReadyProducer, Resource, ResourceTable,
23-
Source, StreamConsumer, StreamProducer, StreamReader, StreamResult,
22+
Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
23+
StreamProducer, StreamReader, StreamResult,
2424
};
2525

2626
fn get_descriptor<'a>(
@@ -495,12 +495,10 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
495495
let file = get_file(store.get().table, &fd)?;
496496
if !file.perms.contains(FilePerms::READ) {
497497
return Ok((
498-
StreamReader::new(instance, &mut store, EmptyProducer::default()),
499-
FutureReader::new(
500-
instance,
501-
&mut store,
502-
ReadyProducer::from(Err(ErrorCode::NotPermitted)),
503-
),
498+
StreamReader::new(instance, &mut store, iter::empty()),
499+
FutureReader::new(instance, &mut store, async {
500+
anyhow::Ok(Err(ErrorCode::NotPermitted))
501+
}),
504502
));
505503
}
506504

@@ -641,12 +639,10 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
641639
let dir = get_dir(store.get().table, &fd)?;
642640
if !dir.perms.contains(DirPerms::READ) {
643641
return Ok((
644-
StreamReader::new(instance, &mut store, EmptyProducer::default()),
645-
FutureReader::new(
646-
instance,
647-
&mut store,
648-
ReadyProducer::from(Err(ErrorCode::NotPermitted)),
649-
),
642+
StreamReader::new(instance, &mut store, iter::empty()),
643+
FutureReader::new(instance, &mut store, async {
644+
anyhow::Ok(Err(ErrorCode::NotPermitted))
645+
}),
650646
));
651647
}
652648
let allow_blocking_current_thread = dir.allow_blocking_current_thread;
@@ -664,7 +660,7 @@ impl types::HostDescriptorWithStore for WasiFilesystem {
664660
),
665661
Err(e) => {
666662
result_tx.send(Err(e.into())).unwrap();
667-
StreamReader::new(instance, &mut store, EmptyProducer::default())
663+
StreamReader::new(instance, &mut store, iter::empty())
668664
}
669665
}
670666
} else {

crates/wasi/src/p3/sockets/host/types/tcp.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::p3::sockets::{SocketError, SocketResult, WasiSockets};
88
use crate::sockets::{NonInheritedOptions, SocketAddrUse, SocketAddressFamily, WasiSocketsCtxView};
99
use anyhow::Context as _;
1010
use bytes::BytesMut;
11+
use core::iter;
1112
use core::pin::Pin;
1213
use core::task::{Context, Poll};
1314
use io_lifetimes::AsSocketlike as _;
@@ -17,8 +18,8 @@ use std::sync::Arc;
1718
use tokio::net::{TcpListener, TcpStream};
1819
use tokio::sync::oneshot;
1920
use wasmtime::component::{
20-
Accessor, Destination, EmptyProducer, FutureReader, ReadyProducer, Resource, ResourceTable,
21-
Source, StreamConsumer, StreamProducer, StreamReader, StreamResult,
21+
Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
22+
StreamProducer, StreamReader, StreamResult,
2223
};
2324
use wasmtime::{AsContextMut as _, StoreContextMut};
2425

@@ -353,12 +354,10 @@ impl HostTcpSocketWithStore for WasiSockets {
353354
))
354355
}
355356
None => Ok((
356-
StreamReader::new(instance, &mut store, EmptyProducer::default()),
357-
FutureReader::new(
358-
instance,
359-
&mut store,
360-
ReadyProducer::from(Err(ErrorCode::InvalidState)),
361-
),
357+
StreamReader::new(instance, &mut store, iter::empty()),
358+
FutureReader::new(instance, &mut store, async {
359+
anyhow::Ok(Err(ErrorCode::InvalidState))
360+
}),
362361
)),
363362
}
364363
})

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,9 @@ use wasmtime_environ::component::{
9292

9393
pub use abort::JoinHandle;
9494
pub use futures_and_streams::{
95-
Destination, DirectDestination, DirectSource, EmptyProducer, ErrorContext, FutureConsumer,
96-
FutureProducer, FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer,
97-
ReadyProducer, Source, StreamConsumer, StreamProducer, StreamReader, StreamResult, VecBuffer,
98-
WriteBuffer,
95+
Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
96+
FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
97+
StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
9998
};
10099
pub(crate) use futures_and_streams::{
101100
ResourcePair, lower_error_context_to_index, lower_future_to_index, lower_stream_to_index,

crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs

Lines changed: 16 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use core::marker::PhantomData;
1818
use core::mem::{self, MaybeUninit};
1919
use core::pin::Pin;
2020
use core::task::{Context, Poll, Waker, ready};
21-
use futures::FutureExt;
2221
use futures::channel::oneshot;
22+
use futures::{FutureExt as _, stream};
2323
use std::boxed::Box;
2424
use std::io::Cursor;
2525
use std::string::{String, ToString};
@@ -509,26 +509,25 @@ pub trait StreamProducer<D>: Send + 'static {
509509
) -> Poll<Result<StreamResult>>;
510510
}
511511

512-
/// An empty [`StreamProducer`], which will never produce any elements.
513-
///
514-
/// [StreamProducer::poll_produce] will always report the stream as dropped.
515-
#[derive(Copy, Clone, Debug, Hash, Eq, Ord, PartialEq, PartialOrd)]
516-
pub struct EmptyProducer<T>(PhantomData<fn(T) -> T>);
517-
518-
impl<T> EmptyProducer<T> {
519-
/// Constructs a new [EmptyProducer]
520-
pub fn new() -> Self {
521-
Self::default()
522-
}
523-
}
512+
impl<T, D> StreamProducer<D> for iter::Empty<T>
513+
where
514+
T: Send + Sync + 'static,
515+
{
516+
type Item = T;
517+
type Buffer = Option<Self::Item>;
524518

525-
impl<T> Default for EmptyProducer<T> {
526-
fn default() -> Self {
527-
Self(PhantomData)
519+
fn poll_produce<'a>(
520+
self: Pin<&mut Self>,
521+
_: &mut Context<'_>,
522+
_: StoreContextMut<'a, D>,
523+
_: Destination<'a, Self::Item, Self::Buffer>,
524+
_: bool,
525+
) -> Poll<Result<StreamResult>> {
526+
Poll::Ready(Ok(StreamResult::Dropped))
528527
}
529528
}
530529

531-
impl<T, D> StreamProducer<D> for EmptyProducer<T>
530+
impl<T, D> StreamProducer<D> for stream::Empty<T>
532531
where
533532
T: Send + Sync + 'static,
534533
{
@@ -640,44 +639,6 @@ impl<D> StreamProducer<D> for bytes::BytesMut {
640639
}
641640
}
642641

643-
/// A [`FutureProducer`], which is ready to produce an element.
644-
#[derive(Copy, Clone, Debug, Hash, Eq, Ord, PartialEq, PartialOrd)]
645-
pub struct ReadyProducer<T>(Option<T>);
646-
647-
impl<T> ReadyProducer<T> {
648-
/// Constructs a new [ReadyProducer].
649-
pub fn new(v: T) -> Self {
650-
Self(Some(v))
651-
}
652-
}
653-
654-
impl<T> From<T> for ReadyProducer<T> {
655-
fn from(v: T) -> Self {
656-
Self(Some(v))
657-
}
658-
}
659-
660-
impl<T, D> FutureProducer<D> for ReadyProducer<T>
661-
where
662-
T: Unpin + Send + 'static,
663-
{
664-
type Item = T;
665-
666-
fn poll_produce(
667-
self: Pin<&mut Self>,
668-
_: &mut Context<'_>,
669-
_: StoreContextMut<D>,
670-
_: bool,
671-
) -> Poll<Result<Option<T>>> {
672-
let v = self
673-
.get_mut()
674-
.0
675-
.take()
676-
.context("polled after returning `Ready`")?;
677-
Poll::Ready(Ok(Some(v)))
678-
}
679-
}
680-
681642
/// Represents the buffer for a host- or guest-initiated stream write.
682643
pub struct Source<'a, T> {
683644
instance: Instance,
@@ -4234,22 +4195,7 @@ mod tests {
42344195
poll_future_producer(fut.as_mut(), true),
42354196
Poll::Ready(Ok(None)),
42364197
));
4237-
}
4238-
4239-
#[test]
4240-
fn ready_producer() {
4241-
assert!(matches!(
4242-
poll_future_producer(pin!(ReadyProducer::from(())), false),
4243-
Poll::Ready(Ok(Some(()))),
4244-
));
4245-
assert!(matches!(
4246-
poll_future_producer(pin!(ReadyProducer::from(())), true),
4247-
Poll::Ready(Ok(Some(()))),
4248-
));
4249-
}
42504198

4251-
#[test]
4252-
fn oneshot_producer() {
42534199
let (tx, rx) = oneshot::channel();
42544200
let mut rx = pin!(rx);
42554201
assert!(matches!(

crates/wasmtime/src/runtime/component/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ pub use self::component::{Component, ComponentExportIndex};
120120
#[cfg(feature = "component-model-async")]
121121
pub use self::concurrent::{
122122
Access, Accessor, AccessorTask, AsAccessor, Destination, DirectDestination, DirectSource,
123-
EmptyProducer, ErrorContext, FutureConsumer, FutureProducer, FutureReader, GuardedFutureReader,
124-
GuardedStreamReader, JoinHandle, ReadBuffer, ReadyProducer, Source, StreamConsumer,
125-
StreamProducer, StreamReader, StreamResult, VMComponentAsyncStore, VecBuffer, WriteBuffer,
123+
ErrorContext, FutureConsumer, FutureProducer, FutureReader, GuardedFutureReader,
124+
GuardedStreamReader, JoinHandle, ReadBuffer, Source, StreamConsumer, StreamProducer,
125+
StreamReader, StreamResult, VMComponentAsyncStore, VecBuffer, WriteBuffer,
126126
};
127127
#[cfg(feature = "component-model-async")]
128128
pub use self::func::TaskExit;

0 commit comments

Comments
 (0)