Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions asynq-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "asynq-server"
path = "src/main.rs"

[dependencies]
# Core asynq library
# Core asynq library (Redis is always included as the default backend)
asynq = { path = "../asynq" }

# Web framework
Expand Down Expand Up @@ -56,9 +56,10 @@ chrono = { version = "0.4", features = ["serde"], default-features = false }

[features]
default = []
# Enable Redis backend support
redis = ["asynq/cluster"]
# Enable PostgresSQL backend support
# Enable Redis cluster and sentinel support
cluster = ["asynq/cluster"]
sentinel = ["asynq/sentinel"]
# Enable PostgresSQL backend support (in addition to Redis)
postgresql = ["asynq/postgresql"]

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions asynq-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ Configure via code (not command-line):

```rust
use asynq_server::AsynqServer;
use asynq::redis::RedisConnectionType;
use asynq::backend::RedisConnectionType;

let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
let redis_config = asynq::backend::RedisConnectionType::single("redis://127.0.0.1:6379")?;
let server = AsynqServer::with_redis("127.0.0.1:8080", redis_config).await?;
server.run().await?;
```
Expand Down
13 changes: 10 additions & 3 deletions asynq-server/examples/multi_tenant_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

use asynq_server::{AsynqServer, BackendType, MultiTenantAuth};
use std::str::FromStr;
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main]
Expand Down Expand Up @@ -67,10 +68,16 @@ async fn main() -> anyhow::Result<()> {
tracing::info!("");
tracing::info!("WebSocket endpoint: ws://{}/ws", addr);
tracing::info!("Health check: http://{}/health", addr);

let socket = std::net::SocketAddr::from_str(addr)?;
info!("Starting asynq-server on {}", addr);
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://tenant1:secure_pass123@localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = asynq::backend::RedisConnectionType::single(redis_url.clone())?;
let broker = asynq::backend::RedisBroker::new(redis_config).await?;
let server =
AsynqServer::with_broker(socket, std::sync::Arc::new(broker)).with_multi_tenant_auth(auth);
// Create and run server
let server = AsynqServer::from_str(addr)?.with_multi_tenant_auth(auth);

server.run().await?;

Ok(())
Expand Down
80 changes: 44 additions & 36 deletions asynq-server/examples/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,56 @@ use tracing::info;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("asynq_server=info".parse()?)
.add_directive("websocket_server=info".parse()?),
)
.init();
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("asynq_server=info".parse()?)
.add_directive("websocket_server=info".parse()?),
)
.init();

// Get server address from environment or use default
let addr = env::var("ASYNQ_SERVER_ADDR").unwrap_or_else(|_| "127.0.0.1:8080".to_string());
// Get server address from environment or use default
let addr = env::var("ASYNQ_SERVER_ADDR").unwrap_or_else(|_| "127.0.0.1:8080".to_string());

// Get authentication credentials from environment
let username = env::var("ASYNQ_USERNAME").ok();
let password = env::var("ASYNQ_PASSWORD").ok();

let auth_enabled = username.is_some() && password.is_some();

if auth_enabled {
info!("🔐 HTTP Basic Authentication enabled");
info!(" Clients must provide username and password in Authorization header");
} else {
info!("⚠️ Authentication disabled - all connections allowed");
info!(" Set ASYNQ_USERNAME and ASYNQ_PASSWORD environment variables to enable authentication");
}
// Get authentication credentials from environment
let username = env::var("ASYNQ_USERNAME").ok();
let password = env::var("ASYNQ_PASSWORD").ok();

info!("🚀 Starting asynq-server on {}", addr);
let auth_enabled = username.is_some() && password.is_some();

// Create server with memory backend (no external dependencies)
let mut server: AsynqServer = AsynqServer::from_str(addr.as_str())?;
if auth_enabled {
info!("🔐 HTTP Basic Authentication enabled");
info!(" Clients must provide username and password in Authorization header");
} else {
info!("⚠️ Authentication disabled - all connections allowed");
info!(
" Set ASYNQ_USERNAME and ASYNQ_PASSWORD environment variables to enable authentication"
);
}

// Add authentication if credentials are provided
if let (Some(user), Some(pass)) = (username, password) {
server = server.with_basic_auth(user, pass);
}
info!("🚀 Starting asynq-server on {}", addr);

info!("📡 WebSocket endpoint: ws://{}/ws", addr);
info!("🏥 Health check endpoint: http://{}/health", addr);
info!("Press Ctrl+C to stop");
let socket = std::net::SocketAddr::from_str(&addr)?;
info!("Starting asynq-server on {}", addr);
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://tenant1:secure_pass123@localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = asynq::backend::RedisConnectionType::single(redis_url.clone())?;
let broker = asynq::backend::RedisBroker::new(redis_config).await?;
let mut server = AsynqServer::with_broker(socket, std::sync::Arc::new(broker));

// Run the server
server.run().await?;
// Add authentication if credentials are provided
if let (Some(user), Some(pass)) = (username, password) {
server = server.with_basic_auth(user, pass);
}

Ok(())
info!("📡 WebSocket endpoint: ws://{}/ws", addr);
info!("🏥 Health check endpoint: http://{}/health", addr);
info!("Press Ctrl+C to stop");

// Run the server
server.run().await?;

Ok(())
}
81 changes: 43 additions & 38 deletions asynq-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub enum BackendType {
}

/// Multi-tenant authentication manager with backend-based authentication
///
///
/// This manager authenticates tenants by attempting to connect to their
/// own backend (Redis or PostgresSQL) using their credentials. Successful
/// connections are cached to avoid repeated connection attempts.
Expand Down Expand Up @@ -107,7 +107,7 @@ impl std::fmt::Debug for MultiTenantAuth {

impl MultiTenantAuth {
/// Create a new multi-tenant authentication manager with backend-based auth
///
///
/// # Arguments
/// * `backend_type` - The type of backend to use (Redis or PostgresSQL)
/// * `backend_template` - Connection string template where credentials will be substituted
Expand Down Expand Up @@ -160,11 +160,13 @@ impl MultiTenantAuth {
/// Record a failed authentication attempt
fn record_failed_attempt(&self, username: &str) {
if let Ok(mut attempts) = self.failed_attempts.write() {
let entry = attempts.entry(username.to_string()).or_insert(FailedAttempt {
count: 0,
last_attempt: Instant::now(),
});

let entry = attempts
.entry(username.to_string())
.or_insert(FailedAttempt {
count: 0,
last_attempt: Instant::now(),
});

// Reset count if outside the rate limit window
if entry.last_attempt.elapsed() >= self.rate_limit_window {
entry.count = 1;
Expand All @@ -186,16 +188,21 @@ impl MultiTenantAuth {

/// Build connection string from template and credentials
fn build_connection_string(&self, username: &str, password: &str) -> String {
self.backend_template
self
.backend_template
.replace("{username}", username)
.replace("{password}", password)
}

/// Attempt to authenticate by connecting to the backend
///
///
/// Returns Ok(TenantConfig) if authentication succeeds, Err otherwise.
/// On success, the tenant is cached for future requests.
pub async fn authenticate(&self, username: &str, password: &str) -> Result<TenantConfig, AuthError> {
pub async fn authenticate(
&self,
username: &str,
password: &str,
) -> Result<TenantConfig, AuthError> {
// Check rate limiting first
if self.is_rate_limited(username) {
return Err(AuthError::RateLimited);
Expand All @@ -215,7 +222,7 @@ impl MultiTenantAuth {

// Attempt to connect to backend with credentials
let connection_string = self.build_connection_string(username, password);

match self.try_backend_connection(&connection_string).await {
Ok(()) => {
// Success! Cache this tenant
Expand All @@ -226,15 +233,15 @@ impl MultiTenantAuth {
queue_prefix: Some(username.to_string()),
backend_connection: Some(connection_string.clone()),
};

if let Ok(mut tenants) = self.tenants.write() {
tenants.insert(username.to_string(), tenant.clone());
}
// If caching fails due to poisoned lock, still return success

// Clear any failed attempts
self.clear_failed_attempts(username);

Ok(tenant)
}
Err(e) => {
Expand All @@ -254,32 +261,23 @@ impl MultiTenantAuth {
}

/// Try to connect to Redis
#[cfg(feature = "redis")]
async fn try_redis_connection(&self, connection_string: &str) -> Result<(), AuthError> {
use asynq::redis::RedisConnectionType;
use asynq::rdb::RedisBroker;

match RedisConnectionType::single(connection_string) {
Ok(config) => {
match RedisBroker::new(config).await {
Ok(_broker) => Ok(()),
Err(_) => Err(AuthError::ConnectionFailed),
}
}
use asynq::backend::RedisBroker;

match asynq::backend::RedisConnectionType::single(connection_string) {
Ok(config) => match RedisBroker::new(config).await {
Ok(_broker) => Ok(()),
Err(_) => Err(AuthError::ConnectionFailed),
},
Err(_) => Err(AuthError::InvalidCredentials),
}
}

#[cfg(not(feature = "redis"))]
async fn try_redis_connection(&self, _connection_string: &str) -> Result<(), AuthError> {
Err(AuthError::BackendNotAvailable)
}

/// Try to connect to PostgresSQL
#[cfg(feature = "postgresql")]
async fn try_postgresql_connection(&self, connection_string: &str) -> Result<(), AuthError> {
use asynq::pgdb::PostgresBroker;
use asynq::backend::PostgresBroker;

match PostgresBroker::new(connection_string).await {
Ok(_broker) => Ok(()),
Err(_) => Err(AuthError::ConnectionFailed),
Expand All @@ -301,13 +299,18 @@ impl MultiTenantAuth {

/// Remove a tenant from the cache
pub fn remove_tenant(&self, username: &str) -> Option<TenantConfig> {
self.tenants.write().ok()
self
.tenants
.write()
.ok()
.and_then(|mut tenants| tenants.remove(username))
}

/// List all cached tenants
pub fn list_tenants(&self) -> Vec<String> {
self.tenants.read()
self
.tenants
.read()
.map(|tenants| tenants.values().map(|t| t.id.clone()).collect())
.unwrap_or_default()
}
Expand All @@ -320,7 +323,9 @@ impl MultiTenantAuth {

/// Get the number of cached tenants
pub fn tenant_count(&self) -> usize {
self.tenants.read()
self
.tenants
.read()
.map(|tenants| tenants.len())
.unwrap_or(0)
}
Expand Down Expand Up @@ -384,7 +389,7 @@ mod tests {
fn test_multi_tenant_auth_caching() {
let auth = MultiTenantAuth::new(
BackendType::Redis,
"redis://{username}:{password}@localhost:6379".to_string()
"redis://{username}:{password}@localhost:6379".to_string(),
);
assert!(auth.is_enabled()); // Always enabled with backend auth

Expand Down Expand Up @@ -414,7 +419,7 @@ mod tests {
fn test_rate_limiting() {
let auth = MultiTenantAuth::new(
BackendType::Redis,
"redis://{username}:{password}@localhost:6379".to_string()
"redis://{username}:{password}@localhost:6379".to_string(),
)
.with_max_failed_attempts(3)
.with_rate_limit_window(Duration::from_secs(60));
Expand All @@ -436,7 +441,7 @@ mod tests {
fn test_connection_string_building() {
let auth = MultiTenantAuth::new(
BackendType::Redis,
"redis://{username}:{password}@localhost:6379".to_string()
"redis://{username}:{password}@localhost:6379".to_string(),
);

let conn_str = auth.build_connection_string("testuser", "testpass");
Expand Down
Loading
Loading