Skip to content

Commit f4b71b5

Browse files
authored
Merge pull request #27 from dimacurrentai/redis
Redis pub/sub.
2 parents 5decd8d + 4ec8f08 commit f4b71b5

File tree

3 files changed

+79
-4
lines changed

3 files changed

+79
-4
lines changed

step06_redis/code/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@ clap = { version = "4.0", features = ["derive"] }
77
redis = { version = "0.29.2", features = ["tokio-comp"] }
88
tokio = { version = "1", features = ["full"] }
99
anyhow = "1.0"
10+
chrono = "0.4"
11+
futures = "0.3"

step06_redis/code/src/main.rs

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
use anyhow::Result;
2+
use anyhow::anyhow;
3+
use chrono::Local;
14
use clap::Parser;
5+
use futures::StreamExt;
26
use redis::AsyncCommands;
37
use std::process::ExitCode;
4-
use anyhow::Result;
5-
use anyhow::anyhow;
8+
use tokio::select;
9+
use tokio::time::{Duration, sleep};
610

711
#[derive(Parser)]
812
struct Args {
@@ -11,6 +15,12 @@ struct Args {
1115

1216
#[arg(long)]
1317
mode: String,
18+
19+
#[arg(long, default_value = "5.0")]
20+
subscribe_for_seconds: f64,
21+
22+
#[arg(long)]
23+
and_publish_in_seconds: Option<f64>,
1424
}
1525

1626
async fn try_check(args: &Args) -> Result<()> {
@@ -31,13 +41,72 @@ async fn try_test(args: &Args) -> Result<()> {
3141
(val == "value").then(|| ()).ok_or(anyhow!("Wrong value"))
3242
}
3343

44+
async fn try_pub(args: &Args) -> Result<()> {
45+
let client = redis::Client::open(args.redis.to_string())?;
46+
let mut con = client.get_multiplexed_async_connection().await?;
47+
con.publish::<_, _, ()>("redis_channel", format!("published from rust at {}", Local::now())).await?;
48+
Ok(())
49+
}
50+
51+
async fn try_sub(args: &Args) -> Result<()> {
52+
let client = redis::Client::open(args.redis.to_string())?;
53+
let mut pubsub = client.get_async_pubsub().await?;
54+
55+
pubsub.subscribe("redis_channel").await?;
56+
let mut stream = pubsub.on_message();
57+
58+
let mut con = client.get_multiplexed_async_connection().await?;
59+
60+
let timeout = async || sleep(Duration::from_secs_f64(args.subscribe_for_seconds)).await;
61+
let mut optionally_published = false;
62+
let mut and_optionally_publish = async || {
63+
if !optionally_published {
64+
optionally_published = true;
65+
if let Some(delay) = args.and_publish_in_seconds {
66+
sleep(Duration::from_secs_f64(delay)).await;
67+
println!("publishing from rust from a separate green thread");
68+
con.publish::<_, _, ()>("redis_channel", "published from rust after a delay").await.unwrap()
69+
}
70+
}
71+
std::future::pending::<()>().await
72+
};
73+
74+
println!("listening to messages on `redis_channel`");
75+
76+
loop {
77+
select! {
78+
msg = stream.next() => {
79+
let payload: String = msg.unwrap().get_payload()?;
80+
println!(">> {}", payload);
81+
},
82+
_ = timeout() => {
83+
println!("terminating by timeout");
84+
break Ok(())
85+
},
86+
_ = and_optionally_publish() => {
87+
unreachable!()
88+
}
89+
}
90+
}
91+
}
92+
3493
#[tokio::main]
3594
async fn main() -> ExitCode {
3695
let args = Args::parse();
3796

3897
match args.mode.as_str() {
3998
"check" => try_check(&args).await,
4099
"test" => try_test(&args).await,
41-
_ => Err(anyhow!("This `--mode` value is not supported."))
42-
}.map_or_else(|err| { println!("Error: {}", err); 1 }, |_| 0).into()
100+
"pub" => try_pub(&args).await,
101+
"sub" => try_sub(&args).await,
102+
_ => Err(anyhow!("This `--mode` value is not supported.")),
103+
}
104+
.map_or_else(
105+
|err| {
106+
println!("Error: {}", err);
107+
1
108+
},
109+
|_| 0,
110+
)
111+
.into()
43112
}

step06_redis/run.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,9 @@ echo 'Running the test.'
2727
docker run --add-host=host.docker.internal:host-gateway --rm -t demo --mode test --redis redis://host.docker.internal
2828
echo 'Test run successfully.'
2929

30+
echo 'Running the pubsub test.'
31+
docker run --add-host=host.docker.internal:host-gateway --rm -t demo --mode sub --and-publish-in-seconds=2.5 --redis redis://host.docker.internal
32+
echo 'The pubsub test run successfully.'
33+
3034
docker stop redis-for-rust
3135
echo 'Redis stopped.'

0 commit comments

Comments
 (0)