Skip to content

Commit a7294e6

Browse files
authored
Merge pull request #13 from cn-kali-team/main
multi-tenant
2 parents 0bf80c6 + 1bae312 commit a7294e6

25 files changed

+974
-271
lines changed

asynq-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ tower = "0.5"
2929
tower-http = { version = "0.6", features = ["cors", "trace"] }
3030

3131
# Async runtime
32-
tokio = { version = "1.0", features = ["rt", "macros", "signal", "rt-multi-thread", "sync"] }
32+
tokio = { version = "1.0", features = ["rt", "macros", "signal", "rt-multi-thread", "sync", "time"] }
3333

3434
# WebSocket
3535
tokio-tungstenite = "0.28"

asynq/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "asynq"
3-
version = "0.1.5"
3+
version = "0.1.6"
44
license.workspace = true
55
edition.workspace = true
66
authors.workspace = true
@@ -18,7 +18,7 @@ path = "src/lib.rs"
1818
[dependencies]
1919
# Redis 连接池和客户端(始终启用作为默认后端)
2020
# Redis connection pool and client (always enabled as the default backend)
21-
redis = { version = "1.0.2", features = ["tokio-comp", "aio", "connection-manager", "streams", "acl"] }
21+
redis = { version = "1.0.3", features = ["tokio-comp", "aio", "connection-manager", "streams", "acl"] }
2222
#
2323
asynq-macros = { version = "0.1.0", optional = true }
2424
# 序列化和反序列化

asynq/examples/acl_example.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4242
Ok(exists) => {
4343
if exists {
4444
println!(" User '{}' already exists", node_config.username);
45+
manager
46+
.delete_tenant_user(&acl_config.node_config.username)
47+
.await?;
48+
match manager.create_tenant_user(&acl_config).await {
49+
Ok(_) => println!(" ✅ Created user '{}'", node_config.username),
50+
Err(e) => println!(" ❌ Failed to create user: {e}"),
51+
}
4552
} else {
4653
println!(" User '{}' does not exist", node_config.username);
4754
// 创建租户用户(需要管理员权限)
@@ -78,15 +85,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7885
Err(e) => {
7986
println!(" ⚠️ Could not connect to Redis: {e}");
8087
println!(" Make sure Redis is running at: {redis_url}");
81-
println!("\n Showing ACL command that would be generated:");
82-
83-
// 即使没有连接,也可以显示配置信息
84-
// Even without connection, we can show configuration info
85-
println!(" ACL SETUSER {} on +@all -@dangerous +keys +info|memory +info|clients -select +select|{} >*** {:?}",
86-
acl_config.node_config.username,
87-
acl_config.node_config.db,
88-
acl_config.node_config.asynq_key_pattern()
89-
);
9088
}
9189
}
9290
Ok(())

asynq/src/acl/mod.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,29 @@ impl AclConfig {
156156

157157
/// 获取默认的键模式列表
158158
/// Get default key pattern list
159-
pub fn default_key_patterns() -> Vec<Rule> {
159+
pub fn default_key_patterns(tenant: &str) -> Vec<Rule> {
160160
vec![
161-
Rule::Pattern("asynq:queues".to_string()),
162-
Rule::Pattern("asynq:servers:*".to_string()),
163-
Rule::Pattern("asynq:servers".to_string()),
164-
Rule::Pattern("asynq:workers".to_string()),
165-
Rule::Pattern("asynq:workers:*".to_string()),
166-
Rule::Pattern("asynq:schedulers".to_string()),
167-
Rule::Pattern("asynq:schedulers:*".to_string()),
168-
Rule::Other("&asynq:cancel".to_string()),
161+
// Add default key patterns
162+
Rule::Pattern(crate::base::keys::ALL_QUEUES.to_string()),
163+
Rule::Pattern(format!(
164+
"{}{{{}:*",
165+
crate::base::keys::SERVERS_PREFIX,
166+
tenant
167+
)),
168+
Rule::Pattern(crate::base::keys::ALL_SERVERS.to_string()),
169+
Rule::Pattern(crate::base::keys::ALL_WORKERS.to_string()),
170+
Rule::Pattern(format!(
171+
"{}{{{}:*",
172+
crate::base::keys::WORKERS_PREFIX,
173+
tenant
174+
)),
175+
Rule::Pattern(crate::base::keys::ALL_SCHEDULERS.to_string()),
176+
Rule::Pattern(format!(
177+
"{}{{{}:*",
178+
crate::base::keys::SCHEDULED_PREFIX,
179+
tenant
180+
)),
181+
Rule::Channel(crate::base::keys::CANCEL_CHANNEL.to_string()),
169182
]
170183
}
171184
}

asynq/src/acl/redis_acl.rs

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -73,37 +73,27 @@ impl RedisAclManager {
7373
/// 生成 ACL 规则列表
7474
/// Generate ACL rules list
7575
fn build_acl_rules(&self, config: &AclConfig) -> Vec<Rule> {
76-
let mut rules = Vec::new();
77-
78-
// 基本权限: on, +@all, -@dangerous, +keys, +info|memory, +info|clients
79-
// Basic permissions: on, +@all, -@dangerous, +keys, +info|memory, +info|clients
80-
rules.push(Rule::On);
81-
rules.push(Rule::AllCommands);
82-
rules.push(Rule::RemoveCategory("dangerous".to_string()));
83-
rules.push(Rule::AddCommand("keys".to_string()));
84-
rules.push(Rule::RemoveCommand("info".to_string()));
85-
// 数据库限制: -select, +select|<db>
86-
// Database restrictions: -select, +select|<db>
87-
rules.push(Rule::RemoveCommand("select".to_string()));
88-
// 密码
89-
// Password
90-
rules.push(Rule::AddPass(config.node_config.password.clone()));
91-
92-
// 添加默认队列
93-
// Add default queue pattern - uses hash tag {DEFAULT_QUEUE_NAME} for Redis cluster routing
94-
rules.push(Rule::Pattern(format!("asynq:{{{}}}:*", DEFAULT_QUEUE_NAME)));
95-
96-
// 添加租户特定的键模式
97-
// Add tenant-specific key patterns
98-
// 从 "~asynq:{username:*" 格式提取模式部分(去掉前导的 ~)
99-
rules.push(config.node_config.asynq_key_pattern());
100-
101-
// 添加默认的键模式
102-
// Add default key patterns
103-
for pattern in AclConfig::default_key_patterns() {
104-
rules.push(pattern);
105-
}
106-
76+
let mut rules = vec![
77+
// Basic permissions: on, +@all, -@dangerous, +keys, -info
78+
Rule::On,
79+
Rule::ResetChannels,
80+
Rule::AllCommands,
81+
Rule::RemoveCategory("dangerous".to_string()),
82+
Rule::RemoveCategory("admin".to_string()),
83+
Rule::RemoveCommand("keys".to_string()),
84+
Rule::RemoveCommand("info".to_string()),
85+
// Database restrictions: -select
86+
Rule::RemoveCommand("select".to_string()),
87+
// Password
88+
Rule::AddPass(config.node_config.password.clone()),
89+
// Add default queue pattern - uses hashtag {DEFAULT_QUEUE_NAME} for Redis cluster routing
90+
Rule::Pattern(format!("asynq:{{{}}}:*", DEFAULT_QUEUE_NAME)),
91+
// Add tenant-specific key patterns
92+
config.node_config.asynq_key_pattern(),
93+
];
94+
rules.extend(AclConfig::default_key_patterns(
95+
&config.node_config.username,
96+
));
10797
// 添加只写键
10898
// Add write-only keys
10999
for key in &config.write_only_keys {

asynq/src/backend/pgdb/broker.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,13 @@ impl Broker for PostgresBroker {
942942

943943
/// 写入服务器状态。
944944
/// Write server state.
945-
async fn write_server_state(&self, server_info: &ServerInfo, ttl: Duration) -> Result<()> {
945+
async fn write_server_state(
946+
&self,
947+
server_info: &ServerInfo,
948+
_worker: Vec<WorkerInfo>,
949+
ttl: Duration,
950+
_tenant: Option<&str>,
951+
) -> Result<()> {
946952
let server_key = format!(
947953
"{}:{}:{}",
948954
server_info.host, server_info.pid, server_info.server_id
@@ -992,7 +998,13 @@ impl Broker for PostgresBroker {
992998

993999
/// 清除服务器状态。
9941000
/// Clear server state.
995-
async fn clear_server_state(&self, host: &str, pid: i32, server_id: &str) -> Result<()> {
1001+
async fn clear_server_state(
1002+
&self,
1003+
host: &str,
1004+
pid: i32,
1005+
server_id: &str,
1006+
_tenant: Option<&str>,
1007+
) -> Result<()> {
9961008
let server_key = format!("{host}:{pid}:{server_id}");
9971009

9981010
// Clear all workers for this server

asynq/src/backend/rdb/broker.rs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::base::keys;
55
use crate::base::keys::TaskState;
66
use crate::base::Broker;
77
use crate::error::{Error, Result};
8-
use crate::proto::{ServerInfo, TaskMessage};
8+
use crate::proto::{ServerInfo, TaskMessage, WorkerInfo};
99
use crate::task::{generate_task_id, Task, TaskInfo};
1010
use async_trait::async_trait;
1111
use chrono::{DateTime, Utc};
@@ -811,14 +811,23 @@ impl Broker for RedisBroker {
811811
}
812812

813813
/// 写入服务器状态 - Go: WriteServerState
814-
async fn write_server_state(&self, server_info: &ServerInfo, ttl: Duration) -> Result<()> {
814+
async fn write_server_state(
815+
&self,
816+
server_info: &ServerInfo,
817+
workers: Vec<WorkerInfo>,
818+
ttl: Duration,
819+
tenant: Option<&str>,
820+
) -> Result<()> {
815821
let mut conn = self.get_async_connection().await?;
816822

817823
// 使用 ServerInfo 中的信息构建键
818824
// Build keys using information from ServerInfo
819-
let server_key =
820-
keys::server_info_key(&server_info.host, server_info.pid, &server_info.server_id);
821-
let workers_key = keys::workers_key(&server_info.host, server_info.pid, &server_info.server_id);
825+
let (server_key, workers_key) = keys::server_and_workers_keys(
826+
tenant,
827+
&server_info.host,
828+
server_info.pid,
829+
&server_info.server_id,
830+
);
822831

823832
// 计算过期时间戳
824833
// Calculate expiration timestamp
@@ -842,44 +851,44 @@ impl Broker for RedisBroker {
842851
// Use Lua script to atomically write server state
843852
let server_info_bytes = server_info.encode_to_vec();
844853
let keys = vec![server_key, workers_key];
845-
let string_args = vec![ttl.as_secs().to_string()];
846-
let binary_args = vec![server_info_bytes];
854+
let mut args = vec![
855+
RedisArg::Int(ttl.as_secs() as i64),
856+
RedisArg::Bytes(server_info_bytes),
857+
];
847858

848859
// ARGV[1] = TTL in seconds
849860
// ARGV[2] = server info (encoded protobuf as binary)
850861
// ARGV[3+] = worker info (暂时为空,需要支持 WorkerInfo)
851862
// ARGV[3+] = worker info (currently empty, needs to support WorkerInfo)
852-
863+
for worker in workers {
864+
let worker_info = worker.encode_to_vec();
865+
args.push(RedisArg::Str(worker.task_id));
866+
args.push(RedisArg::Bytes(worker_info));
867+
}
853868
let _: String = self
854869
.script_manager
855-
.eval_script_with_binary_args(
856-
&mut conn,
857-
"write_server_state",
858-
&keys,
859-
&string_args,
860-
&binary_args,
861-
)
870+
.eval_script(&mut conn, "write_server_state", &keys, &args)
862871
.await?;
863872

864873
Ok(())
865874
}
866875

867876
/// 清除服务器状态 - Go: ClearServerState
868-
async fn clear_server_state(&self, host: &str, pid: i32, server_id: &str) -> Result<()> {
877+
async fn clear_server_state(
878+
&self,
879+
host: &str,
880+
pid: i32,
881+
server_id: &str,
882+
tenant: Option<&str>,
883+
) -> Result<()> {
869884
let mut conn = self.get_async_connection().await?;
870885

871886
// 生成键
872887
// Generate keys
873-
let server_key = keys::server_info_key(host, pid, server_id);
874-
let workers_key = keys::workers_key(host, pid, server_id);
875-
876-
// 构造完整的 server_id (hostname:pid:uuid 格式)
877-
// Construct full server_id (hostname:pid:uuid format)
878-
let full_server_id = format!("{host}:{pid}:{server_id}");
879-
888+
let (server_key, workers_key) = keys::server_and_workers_keys(tenant, host, pid, server_id);
880889
// 1. 从 AllServers ZSET 中删除服务器 (服务器跟踪)
881890
// Remove the server from AllServers ZSET (server tracking)
882-
let _: () = conn.zrem(keys::ALL_SERVERS, &full_server_id).await?;
891+
let _: () = conn.zrem(keys::ALL_SERVERS, &server_key).await?;
883892

884893
// 2. 从 AllWorkers ZSET 中删除工作者键 (如果存在)
885894
// Remove the worker key from AllWorkers ZSET (if exists)
@@ -888,18 +897,9 @@ impl Broker for RedisBroker {
888897
// 3. 使用 Lua 脚本原子性地清除服务器状态
889898
// Use Lua script to atomically clear server state
890899
let keys = vec![server_key, workers_key];
891-
let string_args: Vec<String> = vec![];
892-
let binary_args: Vec<Vec<u8>> = vec![];
893-
894900
let _: String = self
895901
.script_manager
896-
.eval_script_with_binary_args(
897-
&mut conn,
898-
"clear_server_state",
899-
&keys,
900-
&string_args,
901-
&binary_args,
902-
)
902+
.eval_script(&mut conn, "clear_server_state", &keys, &[])
903903
.await?;
904904

905905
Ok(())

asynq/src/backend/rdb/inspect.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,8 @@ impl RedisBroker {
990990

991991
// 使用 clear_server_state 来正确地注销服务器
992992
// Use clear_server_state to correctly unregister the server
993-
self.clear_server_state(hostname, pid, uuid).await
993+
// Note: This is an administrative action, so no tenant isolation is applied
994+
self.clear_server_state(hostname, pid, uuid, None).await
994995
}
995996

996997
/// 心跳检测。

asynq/src/backend/rdb/redis_scripts.rs

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,67 +1582,6 @@ impl ScriptManager {
15821582
pub fn get_script_sha(&self, name: &str) -> Option<&String> {
15831583
self.script_sha1.get(name)
15841584
}
1585-
1586-
/// 执行脚本(支持二进制参数)
1587-
pub async fn eval_script_with_binary_args<T>(
1588-
&self,
1589-
conn: &mut RedisConnection,
1590-
script_name: &str,
1591-
keys: &[String],
1592-
string_args: &[String],
1593-
binary_args: &[Vec<u8>],
1594-
) -> Result<T>
1595-
where
1596-
T: redis::FromRedisValue,
1597-
{
1598-
// 首先尝试使用 EVALSHA 如果脚本已加载
1599-
if let Some(sha) = self.get_script_sha(script_name) {
1600-
let mut cmd = redis::cmd("EVALSHA");
1601-
cmd.arg(sha).arg(keys.len()).arg(keys);
1602-
1603-
// 添加字符串参数
1604-
for arg in string_args {
1605-
cmd.arg(arg);
1606-
}
1607-
1608-
// 添加二进制参数
1609-
for arg in binary_args {
1610-
cmd.arg(arg);
1611-
}
1612-
1613-
match cmd.query_async::<T>(conn).await {
1614-
Ok(result) => return Ok(result),
1615-
Err(e) if e.to_string().contains("NOSCRIPT") => {
1616-
// 脚本被清理了,继续使用 EVAL
1617-
}
1618-
Err(e) => return Err(e.into()),
1619-
}
1620-
}
1621-
1622-
// 如果脚本未加载或 EVALSHA 失败,使用 EVAL
1623-
let script = match script_name {
1624-
"write_server_state" => scripts::WRITE_SERVER_STATE,
1625-
"clear_server_state" => scripts::CLEAR_SERVER_STATE,
1626-
_ => return Err(Error::other(format!("Script not loaded: {script_name}"))),
1627-
};
1628-
1629-
let mut cmd = redis::cmd("EVAL");
1630-
cmd.arg(script).arg(keys.len()).arg(keys);
1631-
1632-
// 添加字符串参数
1633-
for arg in string_args {
1634-
cmd.arg(arg);
1635-
}
1636-
1637-
// 添加二进制参数
1638-
for arg in binary_args {
1639-
cmd.arg(arg);
1640-
}
1641-
1642-
let result: T = cmd.query_async(conn).await?;
1643-
Ok(result)
1644-
}
1645-
16461585
/// 执行脚本
16471586
pub async fn eval_script<T>(
16481587
&self,

0 commit comments

Comments
 (0)