Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions autoconnect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ docopt = "1.1"
default = ["bigtable", "reliable_report"]
bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"]
emulator = ["bigtable"]
urgency = ["autoconnect_ws/urgency"]
log_vapid = []
reliable_report = [
"autoconnect_common/reliable_report",
Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ autopush_common.workspace = true

[features]
test-support = []
urgency = []
reliable_report = ["autopush_common/reliable_report"]
13 changes: 13 additions & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use uuid::Uuid;

use autopush_common::notification::Notification;

#[cfg(feature = "urgency")]
use autopush_common::db::Urgency;

#[derive(Debug, Eq, PartialEq, Serialize)]
#[serde(untagged)]
pub enum BroadcastValue {
Expand Down Expand Up @@ -66,6 +69,11 @@ pub enum ClientMessage {
version: String,
},

#[cfg(feature = "urgency")]
Urgency {
min: Urgency,
},

Ping,
}

Expand Down Expand Up @@ -137,6 +145,11 @@ pub enum ServerMessage {

Notification(Notification),

#[cfg(feature = "urgency")]
Urgency {
status: u32,
},

Ping,
}

Expand Down
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ reliable_report = [
"autopush_common/reliable_report",
"autoconnect_ws_sm/reliable_report",
]
urgency = ["autoconnect_ws_sm/urgency"]
1 change: 1 addition & 0 deletions autoconnect/autoconnect-ws/autoconnect-ws-sm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ autoconnect_common = { workspace = true, features = ["test-support"] }

[features]
reliable_report = []
urgency = ["autoconnect_common/urgency"]
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use autoconnect_common::{

use autoconnect_settings::{AppState, Settings};
use autopush_common::{
db::User,
db::{Urgency, User},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully isolated:

Suggested change
db::{Urgency, User},
#[cfg(feature = "urgency" )]
use autopush_common::Urgency;
use autopush_common::{
db::User,
notification::Notification,
util::{ms_since_epoch, user_agent::UserAgentInfo},
};

notification::Notification,
util::{ms_since_epoch, user_agent::UserAgentInfo},
};
Expand Down Expand Up @@ -309,6 +309,8 @@ pub struct ClientFlags {
pub old_record_version: bool,
/// First time a user has connected "today"
pub emit_channel_metrics: bool,
/// Minimum urgency
pub min_urgency: Urgency,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub min_urgency: Urgency,
#[cfg(feature = "urgency")]
pub min_urgency: Urgency,

}

impl Default for ClientFlags {
Expand All @@ -319,6 +321,7 @@ impl Default for ClientFlags {
check_storage: false,
old_record_version: false,
emit_channel_metrics: false,
min_urgency: Urgency::VeryLow,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
min_urgency: Urgency::VeryLow,
#[cfg(feature = "urgency")]
min_urgency: Urgency::VeryLow,

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use autoconnect_common::{
};
use autopush_common::{endpoint::make_endpoint, util::sec_since_epoch};

#[cfg(feature = "urgency")]
use autopush_common::{db::Urgency, util::ms_since_epoch};

use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};

Expand Down Expand Up @@ -38,6 +41,8 @@ impl WebPushClient {
self.nack(code);
Ok(vec![])
}
#[cfg(feature = "urgency")]
ClientMessage::Urgency { min } => Ok(self.change_min_urgency(min).await?),
ClientMessage::Ping => Ok(vec![self.ping()?]),
}
}
Expand Down Expand Up @@ -339,4 +344,40 @@ impl WebPushClient {
Ok(vec![])
}
}

/// Update minimum urgency for the user and the flag
///
/// If the new urgency is lower than the previous one,
/// We check pending messages, to send messages that were
/// retained because of their urgency
#[cfg(feature = "urgency")]
async fn change_min_urgency(
&mut self,
new_min: Urgency,
) -> Result<Vec<ServerMessage>, SMError> {
// Change the min urgency
self.flags.min_urgency = new_min;

if let Some(mut user) = self.app_state.db.get_user(&self.uaid).await? {
// If the user hasn't set a minimum urgency yet, they receive all messages,
// which is equivalent to setting very-low as a minimum
let current_urgency = user.urgency.unwrap_or(Urgency::VeryLow);

// We update the user
user.urgency = Some(new_min);
user.connected_at = ms_since_epoch();
self.app_state.db.update_user(&mut user).await?;

let mut res = vec![ServerMessage::Urgency { status: 200 }];
// if new urgency < previous: fetch pending messages
if new_min < current_urgency {
self.ack_state.unacked_stored_highest = None;
self.current_timestamp = None;
res.append(&mut self.check_storage().await?);
}
Ok(res)
} else {
Ok(vec![ServerMessage::Urgency { status: 404 }])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use autopush_common::{
};

use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};
use crate::{
error::{SMError, SMErrorKind},
identified::Urgency,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[cfg(feature = "urgency")]
use crate::identified::Urgency;

};

impl WebPushClient {
/// Handle a `ServerNotification` for this user
Expand Down Expand Up @@ -123,7 +126,11 @@ impl WebPushClient {
// inner msg.clone()
messages.retain(|msg| {
if !msg.expired(now_sec) {
return true;
if let Some(headers) = msg.headers.as_ref() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

               #[cfg(feature = "urgency")]
               if let Some(headers) = msg.headers.as_ref() {
                    return Urgency::from(headers.get("urgency")) >= self.flags.min_urgency;
                } else {
                    return true;
                }
            }
           #[cfg(not(feature="urgency"))]
           return true;

return Urgency::from(headers.get("urgency")) >= self.flags.min_urgency;
} else {
return true;
}
}
if msg.sortkey_timestamp.is_none() {
expired_messages.push(msg.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use autoconnect_common::{
};
use autoconnect_settings::{AppState, Settings};
use autopush_common::{
db::{User, USER_RECORD_VERSION},
db::{Urgency, User, USER_RECORD_VERSION},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[cfg(feature = "urgency")]
use autopush_common::db::Urgency
use autopush_common::{
    db::{User, USER_RECORD_VERSION},

util::{ms_since_epoch, ms_utc_midnight},
};

Expand Down Expand Up @@ -145,6 +145,7 @@ impl UnidentifiedClient {
.record_version
.is_none_or(|rec_ver| rec_ver < USER_RECORD_VERSION),
emit_channel_metrics: user.connected_at < ms_utc_midnight(),
min_urgency: user.urgency.unwrap_or(Urgency::VeryLow),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
min_urgency: user.urgency.unwrap_or(Urgency::VeryLow),
#[cfg(feature = "urgency")]
min_urgency: user.urgency.unwrap_or(Urgency::VeryLow),

..Default::default()
};
user.node_id = Some(self.app_state.router_url.to_owned());
Expand Down
22 changes: 21 additions & 1 deletion autoendpoint/src/extractors/notification_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use std::cmp::min;
use std::collections::HashMap;
use validator::Validate;
use validator::{Validate, ValidationError};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we want to isolate the urgency stuff to a feature flag, so the bits that are urgency based need to be behind that flag.

use validator_derive::Validate;

lazy_static! {
Expand Down Expand Up @@ -37,6 +37,9 @@ pub struct NotificationHeaders {
)]
pub topic: Option<String>,

#[validate(custom(function = "validate_urgency"))]
pub urgency: Option<String>,

// These fields are validated separately, because the validation is complex
// and based upon the content encoding
pub encoding: Option<String>,
Expand All @@ -45,10 +48,21 @@ pub struct NotificationHeaders {
pub crypto_key: Option<String>,
}

fn validate_urgency(value: &str) -> Result<(), ValidationError> {
if ["very-low", "low", "normal", "high"].contains(&value) {
Ok(())
} else {
Err(ValidationError::new(
"Value not equal to \"very-low\", \"low\", \"normal\" or \"high\"",
))
}
}

impl From<NotificationHeaders> for HashMap<String, String> {
fn from(headers: NotificationHeaders) -> Self {
let mut map = HashMap::new();

map.insert_opt("urgency", headers.urgency);
map.insert_opt("encoding", headers.encoding);
map.insert_opt("encryption", headers.encryption);
map.insert_opt("encryption_key", headers.encryption_key);
Expand All @@ -73,11 +87,13 @@ impl NotificationHeaders {
.map(|ttl| min(ttl, MAX_NOTIFICATION_TTL.num_seconds()))
.ok_or(ApiErrorKind::NoTTL)?;
let topic = get_owned_header(req, "topic");
let urgency = get_owned_header(req, "urgency");

let headers = if has_data {
NotificationHeaders {
ttl,
topic,
urgency,
encoding: get_owned_header(req, "content-encoding"),
encryption: get_owned_header(req, "encryption").map(Self::strip_header),
encryption_key: get_owned_header(req, "encryption-key"),
Expand All @@ -88,6 +104,7 @@ impl NotificationHeaders {
NotificationHeaders {
ttl,
topic,
urgency,
encoding: None,
encryption: None,
encryption_key: None,
Expand Down Expand Up @@ -365,6 +382,7 @@ mod tests {
NotificationHeaders {
ttl: 10,
topic: None,
urgency: None,
encoding: Some("aesgcm".to_string()),
encryption: Some("salt=foo".to_string()),
encryption_key: None,
Expand All @@ -390,6 +408,7 @@ mod tests {
NotificationHeaders {
ttl: 10,
topic: None,
urgency: None,
encoding: Some("aes128gcm".to_string()),
encryption: Some("notsalt=foo".to_string()),
encryption_key: None,
Expand All @@ -416,6 +435,7 @@ mod tests {
NotificationHeaders {
ttl: 10,
topic: None,
urgency: None,
encoding: Some("aesgcm".to_string()),
encryption: Some("salt=foo".to_string()),
encryption_key: None,
Expand Down
1 change: 1 addition & 0 deletions autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub mod tests {
},
headers: NotificationHeaders {
ttl: 0,
urgency: None,
topic: Some("test-topic".to_string()),
encoding: Some("test-encoding".to_string()),
encryption: Some("test-encryption".to_string()),
Expand Down
12 changes: 11 additions & 1 deletion autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use autopush_common::db::Urgency;
#[cfg(feature = "reliable_report")]
use autopush_common::reliability::PushReliability;
use cadence::{Counted, CountedExt, StatsdClient, Timed};
Expand Down Expand Up @@ -55,8 +56,17 @@ impl Router for WebPushRouter {
);
trace!("✉ Notification = {:?}", notification);

let notif_urgency = Urgency::from(notification.headers.urgency.as_ref());
// If the notification urgency is lower than the user one, we do not send it
// If the user hasn't set a minimum urgency, we accept all notifications
if notif_urgency < user.urgency.unwrap_or(Urgency::VeryLow) {
trace!(
"✉ Notification has an urgency lower than the user one: {:?} < {:?}",
&notif_urgency,
&user.urgency
);
// Check if there is a node connected to the client
if let Some(node_id) = &user.node_id {
} else if let Some(node_id) = &user.node_id {
trace!(
"✉ User has a node ID, sending notification to node: {}",
&node_id
Expand Down
31 changes: 31 additions & 0 deletions autopush-common/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ pub struct User {
/// Last node/port the client was or may be connected to
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
/// Last minimum urgency set by the client
#[serde(skip_serializing_if = "Option::is_none")]
pub urgency: Option<Urgency>,
/// Record version
#[serde(skip_serializing_if = "Option::is_none")]
pub record_version: Option<u64>,
Expand Down Expand Up @@ -180,6 +183,7 @@ impl Default for User {
router_type: "webpush".to_string(),
router_data: None,
node_id: None,
urgency: None,
record_version: Some(USER_RECORD_VERSION),
current_timestamp: None,
version: Some(Uuid::new_v4()),
Expand All @@ -199,6 +203,33 @@ impl User {
}
}

#[repr(u8)]
#[derive(Debug, PartialEq, PartialOrd, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "kebab-case")]
pub enum Urgency {
VeryLow = 0,
Low = 1,
Normal = 2,
High = 3,
}

impl From<&str> for Urgency {
fn from(value: &str) -> Self {
match value.to_lowercase().as_str() {
"high" => Urgency::High,
"low" => Urgency::Low,
"very-low" => Urgency::VeryLow,
_ => Urgency::Normal,
}
}
}

impl From<Option<&String>> for Urgency {
fn from(value: Option<&String>) -> Self {
Urgency::from(value.map(|v| v.as_str()).unwrap_or(""))
}
}

#[cfg(test)]
mod tests {
use super::{User, USER_RECORD_VERSION};
Expand Down
4 changes: 4 additions & 0 deletions autopush-common/src/db/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub(crate) struct NotificationHeaders {
encryption_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
encoding: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
urgency: Option<String>,
}

#[allow(clippy::implicit_hasher)]
Expand All @@ -33,6 +35,7 @@ impl From<NotificationHeaders> for HashMap<String, String> {
map.insert_opt("encryption", val.encryption);
map.insert_opt("encryption_key", val.encryption_key);
map.insert_opt("encoding", val.encoding);
map.insert_opt("urgency", val.urgency);
map
}
}
Expand All @@ -44,6 +47,7 @@ impl From<HashMap<String, String>> for NotificationHeaders {
encryption: val.get("encryption").map(|v| v.to_string()),
encryption_key: val.get("encryption_key").map(|v| v.to_string()),
encoding: val.get("encoding").map(|v| v.to_string()),
urgency: val.get("urgency").map(|v| v.to_string()),
}
}
}
Expand Down