Skip to content

Commit 8337909

Browse files
committed
Update async examples (use send feature)
1 parent a86d6ab commit 8337909

File tree

4 files changed

+28
-73
lines changed

4 files changed

+28
-73
lines changed

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ libloading = { version = "0.8", optional = true }
6161

6262
[dev-dependencies]
6363
trybuild = "1.0"
64-
futures = "0.3.5"
6564
hyper = { version = "1.2", features = ["full"] }
6665
hyper-util = { version = "0.1.3", features = ["full"] }
6766
http-body-util = "0.1.1"
@@ -101,11 +100,11 @@ required-features = ["async", "serialize", "macros"]
101100

102101
[[example]]
103102
name = "async_http_server"
104-
required-features = ["async", "macros"]
103+
required-features = ["async", "macros", "send"]
105104

106105
[[example]]
107106
name = "async_tcp_server"
108-
required-features = ["async", "macros"]
107+
required-features = ["async", "macros", "send"]
109108

110109
[[example]]
111110
name = "guided_tour"

examples/async_http_server.rs

Lines changed: 20 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
11
use std::convert::Infallible;
22
use std::future::Future;
33
use std::net::SocketAddr;
4-
use std::rc::Rc;
4+
use std::pin::Pin;
55

6-
use futures::future::LocalBoxFuture;
76
use http_body_util::combinators::BoxBody;
87
use http_body_util::{BodyExt as _, Empty, Full};
98
use hyper::body::{Bytes, Incoming};
9+
use hyper::server::conn::http1;
1010
use hyper::{Request, Response};
1111
use hyper_util::rt::TokioIo;
12-
use hyper_util::server::conn::auto::Builder as ServerConnBuilder;
1312
use tokio::net::TcpListener;
14-
use tokio::task::LocalSet;
1513

16-
use mlua::{
17-
chunk, Error as LuaError, Function, Lua, RegistryKey, String as LuaString, Table, UserData,
18-
UserDataMethods,
19-
};
14+
use mlua::{chunk, Error as LuaError, Function, Lua, String as LuaString, Table, UserData, UserDataMethods};
2015

2116
/// Wrapper around incoming request that implements UserData
2217
struct LuaRequest(SocketAddr, Request<Incoming>);
@@ -32,33 +27,26 @@ impl UserData for LuaRequest {
3227
/// Service that handles incoming requests
3328
#[derive(Clone)]
3429
pub struct Svc {
35-
lua: Rc<Lua>,
36-
handler: Rc<RegistryKey>,
30+
handler: Function,
3731
peer_addr: SocketAddr,
3832
}
3933

4034
impl Svc {
41-
pub fn new(lua: Rc<Lua>, handler: Rc<RegistryKey>, peer_addr: SocketAddr) -> Self {
42-
Self {
43-
lua,
44-
handler,
45-
peer_addr,
46-
}
35+
pub fn new(handler: Function, peer_addr: SocketAddr) -> Self {
36+
Self { handler, peer_addr }
4737
}
4838
}
4939

5040
impl hyper::service::Service<Request<Incoming>> for Svc {
5141
type Response = Response<BoxBody<Bytes, Infallible>>;
5242
type Error = LuaError;
53-
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
43+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
5444

5545
fn call(&self, req: Request<Incoming>) -> Self::Future {
5646
// If handler returns an error then generate 5xx response
57-
let lua = self.lua.clone();
58-
let handler_key = self.handler.clone();
47+
let handler = self.handler.clone();
5948
let lua_req = LuaRequest(self.peer_addr, req);
6049
Box::pin(async move {
61-
let handler: Function = lua.registry_value(&handler_key)?;
6250
match handler.call_async::<_, Table>(lua_req).await {
6351
Ok(lua_resp) => {
6452
let status = lua_resp.get::<_, Option<u16>>("status")?.unwrap_or(200);
@@ -94,10 +82,10 @@ impl hyper::service::Service<Request<Incoming>> for Svc {
9482

9583
#[tokio::main(flavor = "current_thread")]
9684
async fn main() {
97-
let lua = Rc::new(Lua::new());
85+
let lua = Lua::new();
9886

9987
// Create Lua handler function
100-
let handler: RegistryKey = lua
88+
let handler = lua
10189
.load(chunk! {
10290
function(req)
10391
return {
@@ -111,15 +99,13 @@ async fn main() {
11199
}
112100
end
113101
})
114-
.eval()
102+
.eval::<Function>()
115103
.expect("Failed to create Lua handler");
116-
let handler = Rc::new(handler);
117104

118105
let listen_addr = "127.0.0.1:3000";
119106
let listener = TcpListener::bind(listen_addr).await.unwrap();
120107
println!("Listening on http://{listen_addr}");
121108

122-
let local = LocalSet::new();
123109
loop {
124110
let (stream, peer_addr) = match listener.accept().await {
125111
Ok(x) => x,
@@ -129,29 +115,14 @@ async fn main() {
129115
}
130116
};
131117

132-
let svc = Svc::new(lua.clone(), handler.clone(), peer_addr);
133-
local
134-
.run_until(async move {
135-
let result = ServerConnBuilder::new(LocalExec)
136-
.http1()
137-
.serve_connection(TokioIo::new(stream), svc)
138-
.await;
139-
if let Err(err) = result {
140-
eprintln!("Error serving connection: {err:?}");
141-
}
142-
})
143-
.await;
144-
}
145-
}
146-
147-
#[derive(Clone, Copy, Debug)]
148-
struct LocalExec;
149-
150-
impl<F> hyper::rt::Executor<F> for LocalExec
151-
where
152-
F: Future + 'static, // not requiring `Send`
153-
{
154-
fn execute(&self, fut: F) {
155-
tokio::task::spawn_local(fut);
118+
let svc = Svc::new(handler.clone(), peer_addr);
119+
tokio::task::spawn(async move {
120+
if let Err(err) = http1::Builder::new()
121+
.serve_connection(TokioIo::new(stream), svc)
122+
.await
123+
{
124+
eprintln!("Error serving connection: {:?}", err);
125+
}
126+
});
156127
}
157128
}

examples/async_tcp_server.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use std::io;
22
use std::net::SocketAddr;
3-
use std::rc::Rc;
43

54
use tokio::io::{AsyncReadExt, AsyncWriteExt};
65
use tokio::net::{TcpListener, TcpStream};
7-
use tokio::task;
86

9-
use mlua::{chunk, Function, Lua, RegistryKey, String as LuaString, UserData, UserDataMethods};
7+
use mlua::{chunk, Function, Lua, String as LuaString, UserData, UserDataMethods};
108

119
struct LuaTcpStream(TcpStream);
1210

@@ -33,26 +31,21 @@ impl UserData for LuaTcpStream {
3331
}
3432
}
3533

36-
async fn run_server(lua: Lua, handler: RegistryKey) -> io::Result<()> {
34+
async fn run_server(handler: Function) -> io::Result<()> {
3735
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
3836
let listener = TcpListener::bind(addr).await.expect("cannot bind addr");
3937

4038
println!("Listening on {}", addr);
4139

42-
let lua = Rc::new(lua);
43-
let handler = Rc::new(handler);
4440
loop {
4541
let (stream, _) = match listener.accept().await {
4642
Ok(res) => res,
4743
Err(err) if is_transient_error(&err) => continue,
4844
Err(err) => return Err(err),
4945
};
5046

51-
let lua = lua.clone();
5247
let handler = handler.clone();
53-
task::spawn_local(async move {
54-
let handler: Function = lua.registry_value(&handler).expect("cannot get Lua handler");
55-
48+
tokio::task::spawn(async move {
5649
let stream = LuaTcpStream(stream);
5750
if let Err(err) = handler.call_async::<_, ()>(stream).await {
5851
eprintln!("{}", err);
@@ -66,7 +59,7 @@ async fn main() {
6659
let lua = Lua::new();
6760

6861
// Create Lua handler function
69-
let handler_fn = lua
62+
let handler = lua
7063
.load(chunk! {
7164
function(stream)
7265
local peer_addr = stream:peer_addr()
@@ -88,15 +81,7 @@ async fn main() {
8881
.eval::<Function>()
8982
.expect("cannot create Lua handler");
9083

91-
// Store it in the Registry
92-
let handler = lua
93-
.create_registry_value(handler_fn)
94-
.expect("cannot store Lua handler");
95-
96-
task::LocalSet::new()
97-
.run_until(run_server(lua, handler))
98-
.await
99-
.expect("cannot run server")
84+
run_server(handler).await.expect("cannot run server")
10085
}
10186

10287
fn is_transient_error(e: &io::Error) -> bool {

src/thread.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ impl Thread {
276276
///
277277
/// ```
278278
/// # use mlua::{Lua, Result, Thread};
279-
/// use futures::stream::TryStreamExt;
279+
/// use futures_util::stream::TryStreamExt;
280280
/// # #[tokio::main]
281281
/// # async fn main() -> Result<()> {
282282
/// # let lua = Lua::new();

0 commit comments

Comments
 (0)