Skip to content

Commit 219e11a

Browse files
authored
Notification registration API baseline and email confirmation (#3)
1 parent 0118f23 commit 219e11a

File tree

11 files changed

+1512
-58
lines changed

11 files changed

+1512
-58
lines changed

Cargo.lock

Lines changed: 475 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@ tracing = "0.1.41"
2121
tracing-subscriber = "0.3.19"
2222
tower-http = { version = "0.6.6", features = ["cors"] }
2323
clap = { version = "4.5.39", features = ["derive", "env"] }
24-
chrono = "0.4.41"
24+
chrono = "0.4"
2525
serde_json = "1"
2626
serde = { version = "1", features = ["derive"] }
2727
async-trait = "0.1"
28-
tokio-postgres = "0.7.13"
29-
deadpool-postgres = "0.14.1"
28+
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
29+
deadpool-postgres = "0.14"
30+
hex = { version = "0.4" }
31+
rand = { version = "0.8" }
32+
url = { version = "2.5" }
33+
email_address = "0.2"
34+
bitflags = "2.9"
35+
reqwest = { version = "0.12", default-features = false, features = ["default", "json"] }
36+
base64 = "0.22"

src/blossom/file_store.rs

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::str::FromStr;
22

3+
use crate::db::PostgresStore;
34
use async_trait::async_trait;
4-
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
55
use nostr::hashes::sha256::Hash as Sha256Hash;
6-
use tokio_postgres::{NoTls, Row};
6+
use tokio_postgres::Row;
77

88
use super::File;
99

@@ -13,40 +13,8 @@ pub trait FileStoreApi: Send + Sync {
1313
async fn insert(&self, file: File) -> Result<(), anyhow::Error>;
1414
}
1515

16-
pub struct PostgresFileStore {
17-
pool: Pool,
18-
}
19-
20-
impl PostgresFileStore {
21-
pub async fn new(conn_str: &str) -> Result<Self, anyhow::Error> {
22-
let cfg: tokio_postgres::Config = conn_str.parse()?;
23-
let mgr_config = ManagerConfig {
24-
recycling_method: RecyclingMethod::Fast,
25-
};
26-
let pool = Pool::builder(Manager::from_config(cfg, NoTls, mgr_config))
27-
.max_size(16)
28-
.build()?;
29-
30-
Ok(Self { pool })
31-
}
32-
33-
/// Creates the table, if it doesn't exist yet
34-
pub async fn init(&self) -> Result<(), anyhow::Error> {
35-
let qry = r#"
36-
CREATE TABLE IF NOT EXISTS files (
37-
hash CHAR(64) PRIMARY KEY,
38-
data BYTEA NOT NULL,
39-
size INTEGER NOT NULL
40-
)
41-
"#;
42-
43-
self.pool.get().await?.execute(qry, &[]).await?;
44-
Ok(())
45-
}
46-
}
47-
4816
#[async_trait]
49-
impl FileStoreApi for PostgresFileStore {
17+
impl FileStoreApi for PostgresStore {
5018
async fn get(&self, hash: &Sha256Hash) -> Result<Option<File>, anyhow::Error> {
5119
let row = self
5220
.pool

src/db/mod.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
2+
use tokio_postgres::NoTls;
3+
4+
pub struct PostgresStore {
5+
pub pool: Pool,
6+
}
7+
8+
impl PostgresStore {
9+
pub async fn new(conn_str: &str) -> Result<Self, anyhow::Error> {
10+
let cfg: tokio_postgres::Config = conn_str.parse()?;
11+
let mgr_config = ManagerConfig {
12+
recycling_method: RecyclingMethod::Fast,
13+
};
14+
let pool = Pool::builder(Manager::from_config(cfg, NoTls, mgr_config))
15+
.max_size(16)
16+
.build()?;
17+
18+
Ok(Self { pool })
19+
}
20+
21+
/// Creates the tables, if they don't exist yet
22+
pub async fn init(&self) -> Result<(), anyhow::Error> {
23+
// File Store
24+
let qry = r#"
25+
CREATE TABLE IF NOT EXISTS files (
26+
hash CHAR(64) PRIMARY KEY,
27+
data BYTEA NOT NULL,
28+
size INTEGER NOT NULL
29+
)
30+
"#;
31+
self.pool.get().await?.execute(qry, &[]).await?;
32+
33+
// Notification Store
34+
let qry = r#"
35+
CREATE TABLE IF NOT EXISTS notif_challenges (
36+
npub TEXT PRIMARY KEY,
37+
challenge TEXT NOT NULL,
38+
created_at TIMESTAMPTZ DEFAULT (NOW() AT TIME ZONE 'UTC')
39+
)
40+
"#;
41+
self.pool.get().await?.execute(qry, &[]).await?;
42+
43+
let qry = r#"
44+
CREATE TABLE IF NOT EXISTS notif_email_verification (
45+
npub TEXT PRIMARY KEY,
46+
email TEXT NOT NULL,
47+
confirmed BOOLEAN DEFAULT FALSE,
48+
token TEXT,
49+
sent_at TIMESTAMPTZ DEFAULT (NOW() AT TIME ZONE 'UTC')
50+
)
51+
"#;
52+
self.pool.get().await?.execute(qry, &[]).await?;
53+
54+
let qry = r#"
55+
CREATE TABLE IF NOT EXISTS notif_email_preferences (
56+
npub TEXT PRIMARY KEY,
57+
enabled BOOLEAN DEFAULT FALSE,
58+
token TEXT NOT NULL,
59+
email TEXT NOT NULL,
60+
email_confirmed BOOLEAN DEFAULT FALSE,
61+
ebill_url TEXT NOT NULL,
62+
flags BIGINT NOT NULL
63+
)
64+
"#;
65+
self.pool.get().await?.execute(qry, &[]).await?;
66+
Ok(())
67+
}
68+
}

src/main.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
mod blossom;
2+
mod db;
3+
mod notification;
24
mod relay;
5+
mod util;
36

47
use std::{net::SocketAddr, sync::Arc};
58

@@ -8,7 +11,7 @@ use axum::{
811
extract::{ConnectInfo, State},
912
http::{StatusCode, Uri},
1013
response::IntoResponse,
11-
routing::{any, delete, get, head, put},
14+
routing::{any, delete, get, head, post, put},
1215
serve, Router,
1316
};
1417
use axum_raw_websocket::RawSocketUpgrade;
@@ -20,6 +23,14 @@ use relay::RelayConfig;
2023
use tower_http::cors::{Any, CorsLayer};
2124
use tracing::{error, info};
2225

26+
use crate::notification::{
27+
email::{
28+
mailjet::{MailjetConfig, MailjetService},
29+
EmailService,
30+
},
31+
notification_store::NotificationStoreApi,
32+
};
33+
2334
#[tokio::main]
2435
async fn main() -> Result<()> {
2536
tracing_subscriber::fmt::init();
@@ -43,6 +54,12 @@ async fn main() -> Result<()> {
4354
.route("/{hash}", get(blossom::handle_get_file))
4455
.route("/{hash}", head(blossom::handle_get_file_head))
4556
.route("/", delete(blossom::handle_delete))
57+
.route("/notifications/v1/start", post(notification::start))
58+
.route("/notifications/v1/register", post(notification::register))
59+
.route(
60+
"/notifications/confirm_email",
61+
get(notification::confirm_email),
62+
)
4663
.route("/", any(websocket_handler))
4764
.fallback(handle_404)
4865
.with_state(app_state)
@@ -80,26 +97,38 @@ async fn handle_404(uri: Uri) -> impl IntoResponse {
8097
#[derive(Clone)]
8198
struct AppConfig {
8299
pub host_url: Url,
100+
pub email_from_address: String,
83101
}
84102

85103
#[derive(Clone)]
86104
struct AppState {
87105
pub relay: LocalRelay,
88106
pub cfg: AppConfig,
89107
pub file_store: Arc<dyn FileStoreApi>,
108+
pub notification_store: Arc<dyn NotificationStoreApi>,
109+
pub email_service: Arc<dyn EmailService>,
90110
}
91111

92112
impl AppState {
93113
pub async fn new(config: &RelayConfig) -> Result<Self> {
94-
let file_store =
95-
blossom::file_store::PostgresFileStore::new(&config.db_connection_string()).await?;
96-
file_store.init().await?;
114+
let db = db::PostgresStore::new(&config.db_connection_string()).await?;
115+
db.init().await?;
116+
let store = Arc::new(db);
117+
118+
let email_service = MailjetService::new(&MailjetConfig {
119+
api_key: config.email_api_key.clone(),
120+
api_secret_key: config.email_api_secret_key.clone(),
121+
url: config.email_url.clone(),
122+
});
97123
Ok(Self {
98124
relay: relay::init(config).await?,
99125
cfg: AppConfig {
100126
host_url: config.host_url.clone(),
127+
email_from_address: config.email_from_address.clone(),
101128
},
102-
file_store: Arc::new(file_store),
129+
file_store: store.clone(),
130+
notification_store: store,
131+
email_service: Arc::new(email_service),
103132
})
104133
}
105134
}

src/notification/email/mailjet.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use crate::notification::email::{EmailMessage, EmailService};
2+
use anyhow::anyhow;
3+
use async_trait::async_trait;
4+
use serde::{Deserialize, Serialize};
5+
use tracing::error;
6+
7+
#[derive(Debug, Clone)]
8+
pub struct MailjetConfig {
9+
pub api_key: String,
10+
pub api_secret_key: String,
11+
pub url: url::Url,
12+
}
13+
14+
pub struct MailjetService {
15+
config: MailjetConfig,
16+
client: reqwest::Client,
17+
}
18+
19+
impl MailjetService {
20+
pub fn new(config: &MailjetConfig) -> Self {
21+
let client = reqwest::Client::new();
22+
Self {
23+
config: config.to_owned(),
24+
client,
25+
}
26+
}
27+
}
28+
29+
#[derive(Debug, Clone, Serialize)]
30+
struct MailjetReq {
31+
#[serde(rename = "Messages")]
32+
pub messages: Vec<MailjetMessage>,
33+
}
34+
35+
#[derive(Debug, Clone, Deserialize)]
36+
struct MailjetResp {
37+
#[serde(rename = "Messages")]
38+
pub messages: Vec<MailjetRespMessage>,
39+
}
40+
41+
#[derive(Debug, Clone, Serialize)]
42+
struct MailjetMessage {
43+
#[serde(rename = "From")]
44+
pub from: MailjetFrom,
45+
#[serde(rename = "To")]
46+
pub to: Vec<MailjetTo>,
47+
#[serde(rename = "Subject")]
48+
pub subject: String,
49+
#[serde(rename = "HTMLPart")]
50+
pub html_part: String,
51+
}
52+
53+
impl From<EmailMessage> for MailjetMessage {
54+
fn from(value: EmailMessage) -> Self {
55+
Self {
56+
from: MailjetFrom { email: value.from },
57+
to: vec![MailjetTo { email: value.to }],
58+
subject: value.subject,
59+
html_part: value.body,
60+
}
61+
}
62+
}
63+
64+
#[derive(Debug, Clone, Serialize)]
65+
struct MailjetFrom {
66+
#[serde(rename = "Email")]
67+
pub email: String,
68+
}
69+
70+
#[derive(Debug, Clone, Serialize)]
71+
struct MailjetTo {
72+
#[serde(rename = "Email")]
73+
pub email: String,
74+
}
75+
76+
#[derive(Debug, Clone, Deserialize)]
77+
struct MailjetRespMessage {
78+
#[serde(rename = "Status")]
79+
pub status: String,
80+
}
81+
82+
#[async_trait]
83+
impl EmailService for MailjetService {
84+
async fn send(&self, msg: super::EmailMessage) -> Result<(), anyhow::Error> {
85+
let mailjet_msg = MailjetReq {
86+
messages: vec![MailjetMessage::from(msg)],
87+
};
88+
89+
let url = self.config.url.join("/v3.1/send").expect("mailjet path");
90+
let request = self.client.post(url).json(&mailjet_msg).basic_auth(
91+
self.config.api_key.clone(),
92+
Some(self.config.api_secret_key.clone()),
93+
);
94+
let res = request.send().await.map_err(|e| {
95+
error!("Failed to send email: {e}");
96+
anyhow!("Failed to send email")
97+
})?;
98+
99+
let resp: MailjetResp = res.json().await.map_err(|e| {
100+
error!("Failed to parse email response: {e}");
101+
anyhow!("Failed to parse email response")
102+
})?;
103+
104+
match resp.messages.first() {
105+
Some(msg) => {
106+
if msg.status != "success" {
107+
error!("Invalid email sending response: {}", &msg.status);
108+
Err(anyhow!("Invalid email sending response: {}", &msg.status))
109+
} else {
110+
Ok(())
111+
}
112+
}
113+
None => {
114+
error!("Invalid email response - got no status");
115+
Err(anyhow!("Invalid email response - got no status"))
116+
}
117+
}
118+
}
119+
}

src/notification/email/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use async_trait::async_trait;
2+
3+
pub mod mailjet;
4+
5+
#[async_trait]
6+
pub trait EmailService: Send + Sync {
7+
async fn send(&self, msg: EmailMessage) -> Result<(), anyhow::Error>;
8+
}
9+
10+
/// A simple email message. We can add more features (like html, multi recipient, etc.) later.
11+
#[derive(Debug, Clone)]
12+
pub struct EmailMessage {
13+
pub from: String,
14+
pub to: String,
15+
pub subject: String,
16+
pub body: String,
17+
}
18+
19+
pub fn build_email_confirmation_message(
20+
host_url: &url::Url,
21+
from: &str,
22+
to: &str,
23+
token: &str,
24+
) -> EmailMessage {
25+
// build email confirmation link
26+
let mut link = host_url
27+
.join("/notifications/confirm_email")
28+
.expect("email confirmation mail");
29+
link.set_query(Some(&format!("token={token}")));
30+
31+
// build template
32+
let body = format!(
33+
"<html><head></head><body><a href=\"{link}\">Click here to confirm</a><br /><br />This link is valid for 1 day.</body></html>"
34+
);
35+
36+
EmailMessage {
37+
from: from.to_owned(),
38+
to: to.to_owned(),
39+
subject: "Confirm your E-Mail".to_owned(),
40+
body: body.to_owned(),
41+
}
42+
}

0 commit comments

Comments
 (0)