Skip to content

Commit 6e9e874

Browse files
committed
Implemented distributed deployment functionality:
1. Each instance synchronizes the latest data at intervals defined by the configured mounts_monitor_interval time, with a default value of 5 seconds. 2. For the mysql storage backend, used GET_LOCK and RELEASE_LOCK to implement distributed locks 3. For the file storage backend, used lockfile to implement distributed locks. 4. Removed unnecessary mutex locks in mysql storage backend.
1 parent f36ea8a commit 6e9e874

File tree

30 files changed

+785
-355
lines changed

30 files changed

+785
-355
lines changed

.github/workflows/rust.yml

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,21 +100,17 @@ jobs:
100100
my-cnf: |
101101
skip-ssl
102102
port=3306
103-
- name: Set macOS Environment Variables
104-
if: runner.os == 'macOS'
105-
env:
106-
MACOSX_DEPLOYMENT_TARGET: "11.0"
107-
run: |
108-
export MACOSX_DEPLOYMENT_TARGET=11.0
109103
- name: install diesel_cli
110104
run: cargo install diesel_cli --no-default-features --features mysql
111105
- name: init database
112106
run: diesel setup --database-url mysql://root:password@127.0.0.1:3306/vault
113-
- name: Build
114-
run: cargo build --features storage_mysql --verbose
115107
- name: ulimit -n
116108
run: ulimit -n 65535
117-
- name: Run tests
109+
- name: Run tests for macOS
110+
if: runner.os == 'macOS'
111+
run: cargo test --features storage_mysql --lib --bins --tests --verbose
112+
- name: Run tests for non-macOS
113+
if: runner.os != 'macOS'
118114
run: cargo test --features storage_mysql --verbose
119115
- name: Build crate doc
120116
run: cargo doc --no-deps

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ stretto = "0.8"
7878
priority-queue = "2.1"
7979
crossbeam-channel = "0.5"
8080
maybe-async = { version = "0.2", optional = false }
81+
lockfile = "0.4.0"
8182

8283
# optional dependencies
8384
openssl = { version = "*", optional = true }

src/cli/command/server.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,10 @@ impl Server {
150150

151151
let backend = storage::new_backend(storage.stype.as_str(), &storage.config).unwrap();
152152

153-
let barrier = storage::barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend));
154-
155153
let metrics_manager = Arc::new(RwLock::new(MetricsManager::new(config.collection_interval)));
156154
let system_metrics = Arc::clone(&metrics_manager.read().unwrap().system_metrics);
157155

158-
let core = Arc::new(RwLock::new(Core { physical: backend, barrier: Arc::new(barrier), ..Default::default() }));
156+
let core = Arc::new(RwLock::new(Core::new(backend)));
159157

160158
{
161159
let mut c = core.write()?;

src/cli/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ pub struct Config {
4040
pub collection_interval: u64,
4141
#[serde(default = "default_hmac_level")]
4242
pub mount_entry_hmac_level: MountEntryHMACLevel,
43+
#[serde(default = "default_mounts_monitor_interval")]
44+
pub mounts_monitor_interval: u64,
4345
}
4446

4547
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
@@ -58,6 +60,10 @@ fn default_collection_interval() -> u64 {
5860
15
5961
}
6062

63+
fn default_mounts_monitor_interval() -> u64 {
64+
5
65+
}
66+
6167
/// A struct that contains several configurable options for networking stuffs
6268
#[derive(Debug, Clone, Serialize, Deserialize)]
6369
pub struct Listener {

src/core.rs

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88
//! of RustyVault.
99
1010
use std::{
11-
collections::HashMap,
1211
ops::{Deref, DerefMut},
13-
sync::{Arc, Mutex, RwLock},
12+
sync::{Arc, RwLock},
1413
};
1514

1615
use as_any::Downcast;
@@ -30,7 +29,9 @@ use crate::{
3029
pki::PkiModule,
3130
policy::PolicyModule,
3231
},
33-
mount::MountTable,
32+
mount::{
33+
MountTable, MountsMonitor, MountsRouter, CORE_MOUNT_CONFIG_PATH, LOGICAL_BARRIER_PREFIX, SYSTEM_BARRIER_PREFIX,
34+
},
3435
router::Router,
3536
shamir::{ShamirSecret, SHAMIR_OVERHEAD},
3637
storage::{
@@ -70,48 +71,79 @@ pub struct Core {
7071
pub physical: Arc<dyn PhysicalBackend>,
7172
pub barrier: Arc<dyn SecurityBarrier>,
7273
pub system_view: Option<Arc<BarrierView>>,
73-
pub mounts: Arc<MountTable>,
74-
pub router: Arc<Router>,
7574
pub handlers: RwLock<Vec<Arc<dyn Handler>>>,
7675
pub auth_handlers: Arc<RwLock<Vec<Arc<dyn AuthHandler>>>>,
77-
pub logical_backends: Mutex<HashMap<String, Arc<LogicalBackendNewFunc>>>,
76+
pub router: Arc<Router>,
77+
pub mounts_router: Arc<MountsRouter>,
7878
pub module_manager: ModuleManager,
7979
pub sealed: bool,
8080
pub unseal_key_shares: Vec<Vec<u8>>,
8181
pub hmac_key: Vec<u8>,
8282
pub mount_entry_hmac_level: MountEntryHMACLevel,
83+
pub mounts_monitor: Option<MountsMonitor>,
84+
pub mounts_monitor_interval: u64,
8385
}
8486

8587
impl Default for Core {
8688
fn default() -> Self {
8789
let backend: Arc<dyn PhysicalBackend> = Arc::new(physical::mock::MockBackend::new());
88-
let barrier = barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend));
90+
let barrier = Arc::new(barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend)));
91+
let barrier_cloned = Arc::clone(&barrier);
8992
let router = Arc::new(Router::new());
9093

9194
Core {
9295
self_ref: None,
9396
physical: backend,
94-
barrier: Arc::new(barrier),
97+
barrier: barrier_cloned,
9598
system_view: None,
96-
mounts: Arc::new(MountTable::new()),
9799
router: Arc::clone(&router),
100+
mounts_router: Arc::new(MountsRouter::new(
101+
Arc::new(MountTable::new(CORE_MOUNT_CONFIG_PATH)),
102+
Arc::clone(&router),
103+
barrier,
104+
LOGICAL_BARRIER_PREFIX,
105+
"",
106+
)),
98107
handlers: RwLock::new(vec![router]),
99108
auth_handlers: Arc::new(RwLock::new(Vec::new())),
100-
logical_backends: Mutex::new(HashMap::new()),
101109
module_manager: ModuleManager::new(),
102110
sealed: true,
103111
unseal_key_shares: Vec::new(),
104112
hmac_key: Vec::new(),
105113
mount_entry_hmac_level: MountEntryHMACLevel::None,
114+
mounts_monitor: None,
115+
mounts_monitor_interval: 5,
106116
}
107117
}
108118
}
109119

110120
#[maybe_async::maybe_async]
111121
impl Core {
122+
pub fn new(backend: Arc<dyn PhysicalBackend>) -> Self {
123+
let barrier = Arc::new(barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend)));
124+
let barrier_cloned = Arc::clone(&barrier);
125+
let router = Arc::new(Router::new());
126+
127+
Core {
128+
physical: backend,
129+
barrier: barrier_cloned,
130+
router: Arc::clone(&router),
131+
mounts_router: Arc::new(MountsRouter::new(
132+
Arc::new(MountTable::new(CORE_MOUNT_CONFIG_PATH)),
133+
Arc::clone(&router),
134+
barrier,
135+
LOGICAL_BARRIER_PREFIX,
136+
"",
137+
)),
138+
handlers: RwLock::new(vec![router]),
139+
..Default::default()
140+
}
141+
}
142+
112143
pub fn config(&mut self, core: Arc<RwLock<Core>>, config: Option<&Config>) -> Result<(), RvError> {
113144
if let Some(conf) = config {
114145
self.mount_entry_hmac_level = conf.mount_entry_hmac_level;
146+
self.mounts_monitor_interval = conf.mounts_monitor_interval;
115147
}
116148

117149
self.module_manager.set_default_modules(Arc::clone(&core))?;
@@ -141,6 +173,8 @@ impl Core {
141173
let cert_module = CertModule::new(self);
142174
self.module_manager.add_module(Arc::new(RwLock::new(Box::new(cert_module))))?;
143175

176+
self.mounts_monitor = Some(MountsMonitor::new(core, self.mounts_monitor_interval));
177+
144178
let handlers = { self.handlers.read()?.clone() };
145179
for handler in handlers.iter() {
146180
match handler.post_config(self, config) {
@@ -246,27 +280,15 @@ impl Core {
246280
}
247281

248282
pub fn get_logical_backend(&self, logical_type: &str) -> Result<Arc<LogicalBackendNewFunc>, RvError> {
249-
let logical_backends = self.logical_backends.lock().unwrap();
250-
if let Some(backend) = logical_backends.get(logical_type) {
251-
Ok(backend.clone())
252-
} else {
253-
Err(RvError::ErrCoreLogicalBackendNoExist)
254-
}
283+
self.mounts_router.get_backend(logical_type)
255284
}
256285

257286
pub fn add_logical_backend(&self, logical_type: &str, backend: Arc<LogicalBackendNewFunc>) -> Result<(), RvError> {
258-
let mut logical_backends = self.logical_backends.lock().unwrap();
259-
if logical_backends.contains_key(logical_type) {
260-
return Err(RvError::ErrCoreLogicalBackendExist);
261-
}
262-
logical_backends.insert(logical_type.to_string(), backend);
263-
Ok(())
287+
self.mounts_router.add_backend(logical_type, backend)
264288
}
265289

266290
pub fn delete_logical_backend(&self, logical_type: &str) -> Result<(), RvError> {
267-
let mut logical_backends = self.logical_backends.lock().unwrap();
268-
logical_backends.remove(logical_type);
269-
Ok(())
291+
self.mounts_router.delete_backend(logical_type)
270292
}
271293

272294
pub fn add_handler(&self, handler: Arc<dyn Handler>) -> Result<(), RvError> {
@@ -397,16 +419,27 @@ impl Core {
397419

398420
// Perform initial setup
399421
self.hmac_key = self.barrier.derive_hmac_key()?;
400-
self.mounts.load_or_default(self.barrier.as_storage(), Some(&self.hmac_key), self.mount_entry_hmac_level)?;
422+
self.mounts_router.load_or_default(
423+
self.barrier.as_storage(),
424+
Some(&self.hmac_key),
425+
self.mount_entry_hmac_level,
426+
)?;
401427

402-
self.setup_mounts()?;
428+
self.mounts_router.setup(self.self_ref.as_ref().unwrap())?;
429+
430+
self.system_view = Some(Arc::new(BarrierView::new(self.barrier.clone(), SYSTEM_BARRIER_PREFIX)));
403431

404432
self.module_manager.init(self)?;
405433

434+
self.mounts_monitor.as_ref().unwrap().add_mounts_router(self.mounts_router.clone());
435+
self.mounts_monitor.as_mut().unwrap().start();
436+
406437
Ok(())
407438
}
408439

409440
fn pre_seal(&mut self) -> Result<(), RvError> {
441+
self.mounts_monitor.as_ref().unwrap().remove_mounts_router(self.mounts_router.clone());
442+
self.mounts_monitor.as_mut().unwrap().stop();
410443
self.module_manager.cleanup(self)?;
411444
self.unload_mounts()?;
412445
Ok(())
@@ -546,10 +579,10 @@ impl Core {
546579

547580
#[cfg(test)]
548581
mod test {
549-
use crate::test_utils::test_rusty_vault_init;
582+
use crate::test_utils::init_test_rusty_vault;
550583

551584
#[test]
552585
fn test_core_init() {
553-
let _ = test_rusty_vault_init("test_core_init");
586+
let _ = init_test_rusty_vault("test_core_init");
554587
}
555588
}

src/errors.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ pub enum RvError {
173173
ErrCredentailInvalid,
174174
#[error("Credentail is not config.")]
175175
ErrCredentailNotConfig,
176+
#[error("Storage backend doesn't require a lock.")]
177+
ErrStorageBackendLockless,
178+
#[error("Storage backend lock failed.")]
179+
ErrStorageBackendLockFailed,
180+
#[error("Storage backend unlock failed.")]
181+
ErrStorageBackendUnlockFailed,
176182
#[error("Some IO error happened, {:?}", .source)]
177183
IO {
178184
#[from]
@@ -295,6 +301,12 @@ pub enum RvError {
295301
source: std::string::FromUtf8Error,
296302
},
297303

304+
#[error("Some lockfile error happened, {:?}", .source)]
305+
LockfileError {
306+
#[from]
307+
source: lockfile::Error,
308+
},
309+
298310
/// Database Errors Begin
299311
///
300312
#[error("Database type is not support now. Please try postgressql or mysql again.")]

src/logical/backend.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ mod test {
261261
use crate::{
262262
logical::{field::FieldTrait, Field, FieldType, PathOperation},
263263
new_fields, new_fields_internal, new_path, new_path_internal, new_secret, new_secret_internal, storage,
264-
test_utils::test_backend,
264+
test_utils::new_test_backend,
265265
};
266266

267267
struct MyTest;
@@ -304,7 +304,7 @@ mod test {
304304

305305
#[test]
306306
fn test_logical_backend_api() {
307-
let backend = test_backend("test_logical_backend_api");
307+
let backend = new_test_backend("test_logical_backend_api");
308308

309309
let t = MyTest::new();
310310

@@ -461,7 +461,7 @@ mod test {
461461

462462
#[test]
463463
fn test_logical_path_field() {
464-
let backend = test_backend("test_logical_path_field");
464+
let backend = new_test_backend("test_logical_path_field");
465465

466466
let barrier = storage::barrier_aes_gcm::AESGCMBarrier::new(Arc::clone(&backend));
467467

src/modules/auth/expiration.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -687,14 +687,14 @@ mod mod_expiration_tests {
687687
mount::{MountEntry, MOUNT_TABLE_TYPE},
688688
new_fields, new_fields_internal, new_logical_backend, new_logical_backend_internal, new_path,
689689
new_path_internal, new_secret, new_secret_internal,
690-
test_utils::{test_rusty_vault_init, NoopBackend},
690+
test_utils::{init_test_rusty_vault, NoopBackend},
691691
};
692692

693693
macro_rules! mock_expiration_manager {
694694
() => {{
695695
let name = format!("{}_{}", file!(), line!()).replace("/", "_").replace("\\", "_").replace(".", "_");
696-
println!("test_rusty_vault_init, name: {}", name);
697-
let (_, core) = test_rusty_vault_init(&name);
696+
println!("init_test_rusty_vault, name: {}", name);
697+
let (_, core) = init_test_rusty_vault(&name);
698698
let core_cloned = core.clone();
699699
let core_locked = core_cloned.read().unwrap();
700700

0 commit comments

Comments
 (0)