Skip to content

Commit 9935883

Browse files
committed
add file reader example
1 parent 89a4c2e commit 9935883

File tree

10 files changed

+206
-46
lines changed

10 files changed

+206
-46
lines changed

actix-codec/src/framed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ impl<T, U> Framed<T, U> {
156156
}
157157

158158
impl<T, U> Framed<T, U> {
159-
/// Serialize item and Write to the inner buffer
159+
/// Serialize item and write to the inner buffer
160160
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
161161
where
162162
T: AsyncWrite,

actix-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ actix-rt = "2.4.0"
4343
bytes = "1"
4444
env_logger = "0.9"
4545
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
46-
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
46+
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }

actix-server/examples/file-reader.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
//! Simple file-reader TCP server with framed stream.
2+
//!
3+
//! Using the following command:
4+
//!
5+
//! ```sh
6+
//! nc 127.0.0.1 8080
7+
//! ```
8+
//!
9+
//! Follow the prompt and enter a file path, relative or absolute.
10+
11+
use std::io;
12+
13+
use actix_codec::{Framed, LinesCodec};
14+
use actix_rt::net::TcpStream;
15+
use actix_server::Server;
16+
use actix_service::{fn_service, ServiceFactoryExt as _};
17+
use futures_util::{SinkExt as _, StreamExt as _};
18+
use tokio::{fs::File, io::AsyncReadExt as _};
19+
20+
async fn run() -> io::Result<()> {
21+
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
22+
23+
let addr = ("127.0.0.1", 8080);
24+
log::info!("starting server on port: {}", &addr.0);
25+
26+
// Bind socket address and start worker(s). By default, the server uses the number of physical
27+
// CPU cores as the worker count. For this reason, the closure passed to bind needs to return
28+
// a service *factory*; so it can be created once per worker.
29+
Server::build()
30+
.bind("file-reader", addr, move || {
31+
fn_service(move |stream: TcpStream| async move {
32+
// set up codec to use with I/O resource
33+
let mut framed = Framed::new(stream, LinesCodec::default());
34+
35+
loop {
36+
// prompt for file name
37+
framed.send("Type file name to return:").await?;
38+
39+
// wait for next line
40+
match framed.next().await {
41+
Some(Ok(line)) => {
42+
match File::open(line).await {
43+
Ok(mut file) => {
44+
// read file into String buffer
45+
let mut buf = String::new();
46+
file.read_to_string(&mut buf).await?;
47+
48+
// send String into framed object
49+
framed.send(buf).await?;
50+
51+
// break out of loop and
52+
break;
53+
}
54+
Err(err) => {
55+
log::error!("{}", err);
56+
framed
57+
.send("File not found or not readable. Try again.")
58+
.await?;
59+
continue;
60+
}
61+
};
62+
}
63+
64+
// not being able to read a line from the stream is unrecoverable
65+
Some(Err(err)) => return Err(err),
66+
67+
// This EOF won't be hit.
68+
None => continue,
69+
}
70+
}
71+
72+
// close connection after file has been copied to TCP stream
73+
Ok(())
74+
})
75+
.map_err(|err| log::error!("Service Error: {:?}", err))
76+
})?
77+
.workers(2)
78+
.run()
79+
.await
80+
}
81+
82+
#[tokio::main]
83+
async fn main() -> io::Result<()> {
84+
run().await?;
85+
Ok(())
86+
}
87+
88+
// alternatively:
89+
// #[actix_rt::main]
90+
// async fn main() -> io::Result<()> {
91+
// run().await?;
92+
// Ok(())
93+
// }

actix-server/examples/tcp-echo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use log::{error, info};
2626
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
2727

2828
async fn run() -> io::Result<()> {
29-
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
29+
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
3030

3131
let count = Arc::new(AtomicUsize::new(0));
3232

actix-server/src/builder.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
66

77
use crate::{
88
server::ServerCommand,
9-
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
9+
service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
1010
socket::{
1111
create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs,
1212
},
@@ -142,7 +142,7 @@ impl ServerBuilder {
142142
/// Add new service to the server.
143143
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
144144
where
145-
F: ServiceFactory<TcpStream>,
145+
F: ServerServiceFactory<TcpStream>,
146146
U: ToSocketAddrs,
147147
{
148148
let sockets = bind_addr(addr, self.backlog)?;
@@ -172,7 +172,7 @@ impl ServerBuilder {
172172
factory: F,
173173
) -> io::Result<Self>
174174
where
175-
F: ServiceFactory<TcpStream>,
175+
F: ServerServiceFactory<TcpStream>,
176176
{
177177
lst.set_nonblocking(true)?;
178178
let addr = lst.local_addr()?;
@@ -213,7 +213,7 @@ impl ServerBuilder {
213213
/// Add new unix domain service to the server.
214214
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
215215
where
216-
F: ServiceFactory<actix_rt::net::UnixStream>,
216+
F: ServerServiceFactory<actix_rt::net::UnixStream>,
217217
N: AsRef<str>,
218218
U: AsRef<std::path::Path>,
219219
{
@@ -240,7 +240,7 @@ impl ServerBuilder {
240240
factory: F,
241241
) -> io::Result<Self>
242242
where
243-
F: ServiceFactory<actix_rt::net::UnixStream>,
243+
F: ServerServiceFactory<actix_rt::net::UnixStream>,
244244
{
245245
use std::net::{IpAddr, Ipv4Addr};
246246
lst.set_nonblocking(true)?;

actix-server/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ mod worker;
2121
pub use self::builder::ServerBuilder;
2222
pub use self::handle::ServerHandle;
2323
pub use self::server::Server;
24-
pub use self::service::ServiceFactory;
24+
pub use self::service::ServerServiceFactory;
2525
pub use self::test_server::TestServer;
2626

2727
#[doc(hidden)]

actix-server/src/service.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1-
use std::marker::PhantomData;
2-
use std::net::SocketAddr;
3-
use std::task::{Context, Poll};
1+
use std::{
2+
marker::PhantomData,
3+
net::SocketAddr,
4+
task::{Context, Poll},
5+
};
46

57
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
68
use actix_utils::future::{ready, Ready};
79
use futures_core::future::LocalBoxFuture;
810
use log::error;
911

10-
use crate::socket::{FromStream, MioStream};
11-
use crate::worker::WorkerCounterGuard;
12+
use crate::{
13+
socket::{FromStream, MioStream},
14+
worker::WorkerCounterGuard,
15+
};
1216

13-
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
17+
#[doc(hidden)]
18+
pub trait ServerServiceFactory<Stream: FromStream>: Send + Clone + 'static {
1419
type Factory: BaseServiceFactory<Stream, Config = ()>;
1520

1621
fn create(&self) -> Self::Factory;
@@ -80,7 +85,7 @@ where
8085
}
8186
}
8287

83-
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
88+
pub(crate) struct StreamNewService<F: ServerServiceFactory<Io>, Io: FromStream> {
8489
name: String,
8590
inner: F,
8691
token: usize,
@@ -90,7 +95,7 @@ pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
9095

9196
impl<F, Io> StreamNewService<F, Io>
9297
where
93-
F: ServiceFactory<Io>,
98+
F: ServerServiceFactory<Io>,
9499
Io: FromStream + Send + 'static,
95100
{
96101
pub(crate) fn create(
@@ -111,7 +116,7 @@ where
111116

112117
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
113118
where
114-
F: ServiceFactory<Io>,
119+
F: ServerServiceFactory<Io>,
115120
Io: FromStream + Send + 'static,
116121
{
117122
fn name(&self, _: usize) -> &str {
@@ -143,7 +148,7 @@ where
143148
}
144149
}
145150

146-
impl<F, T, I> ServiceFactory<I> for F
151+
impl<F, T, I> ServerServiceFactory<I> for F
147152
where
148153
F: Fn() -> T + Send + Clone + 'static,
149154
T: BaseServiceFactory<I, Config = ()>,

actix-server/src/socket.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
pub(crate) use std::net::{
22
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
33
};
4+
use std::{fmt, io};
45

6+
use actix_rt::net::TcpStream;
57
pub(crate) use mio::net::TcpListener as MioTcpListener;
8+
use mio::{event::Source, Interest, Registry, Token};
9+
610
#[cfg(unix)]
711
pub(crate) use {
812
mio::net::UnixListener as MioUnixListener,
913
std::os::unix::net::UnixListener as StdUnixListener,
1014
};
1115

12-
use std::{fmt, io};
13-
14-
use actix_rt::net::TcpStream;
15-
use mio::{event::Source, Interest, Registry, Token};
16-
1716
pub(crate) enum MioListener {
1817
Tcp(MioTcpListener),
1918
#[cfg(unix)]

actix-server/src/test_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{io, net, sync::mpsc, thread};
22

33
use actix_rt::{net::TcpStream, System};
44

5-
use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory};
5+
use crate::{Server, ServerBuilder, ServerHandle, ServerServiceFactory};
66

77
/// A testing server.
88
///
@@ -66,7 +66,7 @@ impl TestServer {
6666
}
6767

6868
/// Start new test server with application factory.
69-
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
69+
pub fn with<F: ServerServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
7070
let (tx, rx) = mpsc::channel();
7171

7272
// run server in separate thread

0 commit comments

Comments
 (0)