Skip to content

Commit 6631407

Browse files
authored
feat: Add RustFS Scanner Module and Multiple Bug Fixes (rustfs#1579)
1 parent 6c5f8e5 commit 6631407

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+6633
-578
lines changed

Cargo.lock

Lines changed: 34 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ members = [
3535
"crates/targets", # Target-specific configurations and utilities
3636
"crates/s3select-api", # S3 Select API interface
3737
"crates/s3select-query", # S3 Select query engine
38+
"crates/scanner", # Scanner for data integrity checks and health monitoring
3839
"crates/signer", # client signer
3940
"crates/checksums", # client checksums
4041
"crates/utils", # Utility functions and helpers
@@ -88,6 +89,7 @@ rustfs-protos = { path = "crates/protos", version = "0.0.5" }
8889
rustfs-rio = { path = "crates/rio", version = "0.0.5" }
8990
rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" }
9091
rustfs-s3select-query = { path = "crates/s3select-query", version = "0.0.5" }
92+
rustfs-scanner = { path = "crates/scanner", version = "0.0.5" }
9193
rustfs-signer = { path = "crates/signer", version = "0.0.5" }
9294
rustfs-targets = { path = "crates/targets", version = "0.0.5" }
9395
rustfs-utils = { path = "crates/utils", version = "0.0.5" }

crates/ahm/src/heal/channel.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl HealChannelProcessor {
183183
HealType::Object {
184184
bucket: request.bucket.clone(),
185185
object: prefix.clone(),
186-
version_id: None,
186+
version_id: request.object_version_id.clone(),
187187
}
188188
} else {
189189
HealType::Bucket {
@@ -366,6 +366,7 @@ mod tests {
366366
id: "test-id".to_string(),
367367
bucket: "test-bucket".to_string(),
368368
object_prefix: None,
369+
object_version_id: None,
369370
disk: None,
370371
priority: HealChannelPriority::Normal,
371372
scan_mode: None,
@@ -394,6 +395,7 @@ mod tests {
394395
id: "test-id".to_string(),
395396
bucket: "test-bucket".to_string(),
396397
object_prefix: Some("test-object".to_string()),
398+
object_version_id: None,
397399
disk: None,
398400
priority: HealChannelPriority::High,
399401
scan_mode: Some(HealScanMode::Deep),
@@ -425,6 +427,7 @@ mod tests {
425427
id: "test-id".to_string(),
426428
bucket: "test-bucket".to_string(),
427429
object_prefix: None,
430+
object_version_id: None,
428431
disk: Some("pool_0_set_1".to_string()),
429432
priority: HealChannelPriority::Critical,
430433
scan_mode: None,
@@ -453,6 +456,7 @@ mod tests {
453456
id: "test-id".to_string(),
454457
bucket: "test-bucket".to_string(),
455458
object_prefix: None,
459+
object_version_id: None,
456460
disk: Some("invalid-disk-id".to_string()),
457461
priority: HealChannelPriority::Normal,
458462
scan_mode: None,
@@ -488,6 +492,7 @@ mod tests {
488492
id: "test-id".to_string(),
489493
bucket: "test-bucket".to_string(),
490494
object_prefix: None,
495+
object_version_id: None,
491496
disk: None,
492497
priority: channel_priority,
493498
scan_mode: None,
@@ -516,6 +521,7 @@ mod tests {
516521
id: "test-id".to_string(),
517522
bucket: "test-bucket".to_string(),
518523
object_prefix: None,
524+
object_version_id: None,
519525
disk: None,
520526
priority: HealChannelPriority::Normal,
521527
scan_mode: None,
@@ -545,6 +551,7 @@ mod tests {
545551
id: "test-id".to_string(),
546552
bucket: "test-bucket".to_string(),
547553
object_prefix: Some("".to_string()), // Empty prefix should be treated as bucket heal
554+
object_version_id: None,
548555
disk: None,
549556
priority: HealChannelPriority::Normal,
550557
scan_mode: None,

crates/common/src/heal_channel.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl Display for HealItemType {
5050
}
5151
}
5252

53-
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
53+
#[derive(Debug, Serialize, Deserialize)]
5454
pub enum DriveState {
5555
Ok,
5656
Offline,
@@ -59,7 +59,7 @@ pub enum DriveState {
5959
PermissionDenied,
6060
Faulty,
6161
RootMount,
62-
Unknown,
62+
Unknown(String),
6363
Unformatted, // only returned by disk
6464
}
6565

@@ -73,12 +73,28 @@ impl DriveState {
7373
DriveState::PermissionDenied => "permission-denied",
7474
DriveState::Faulty => "faulty",
7575
DriveState::RootMount => "root-mount",
76-
DriveState::Unknown => "unknown",
76+
DriveState::Unknown(reason) => reason,
7777
DriveState::Unformatted => "unformatted",
7878
}
7979
}
8080
}
8181

82+
impl Clone for DriveState {
83+
fn clone(&self) -> Self {
84+
match self {
85+
DriveState::Unknown(reason) => DriveState::Unknown(reason.clone()),
86+
DriveState::Ok => DriveState::Ok,
87+
DriveState::Offline => DriveState::Offline,
88+
DriveState::Corrupt => DriveState::Corrupt,
89+
DriveState::Missing => DriveState::Missing,
90+
DriveState::PermissionDenied => DriveState::PermissionDenied,
91+
DriveState::Faulty => DriveState::Faulty,
92+
DriveState::RootMount => DriveState::RootMount,
93+
DriveState::Unformatted => DriveState::Unformatted,
94+
}
95+
}
96+
}
97+
8298
impl Display for DriveState {
8399
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84100
write!(f, "{}", self.to_str())
@@ -212,6 +228,8 @@ pub struct HealChannelRequest {
212228
pub bucket: String,
213229
/// Object prefix (optional)
214230
pub object_prefix: Option<String>,
231+
/// Object version ID (optional)
232+
pub object_version_id: Option<String>,
215233
/// Force start heal
216234
pub force_start: bool,
217235
/// Priority
@@ -346,6 +364,7 @@ pub fn create_heal_request(
346364
id: Uuid::new_v4().to_string(),
347365
bucket,
348366
object_prefix,
367+
object_version_id: None,
349368
force_start,
350369
priority: priority.unwrap_or_default(),
351370
pool_index: None,
@@ -374,6 +393,7 @@ pub fn create_heal_request_with_options(
374393
id: Uuid::new_v4().to_string(),
375394
bucket,
376395
object_prefix,
396+
object_version_id: None,
377397
force_start,
378398
priority: priority.unwrap_or_default(),
379399
pool_index,
@@ -501,6 +521,7 @@ pub async fn send_heal_disk(set_disk_id: String, priority: Option<HealChannelPri
501521
bucket: "".to_string(),
502522
object_prefix: None,
503523
disk: Some(set_disk_id),
524+
object_version_id: None,
504525
force_start: false,
505526
priority: priority.unwrap_or_default(),
506527
pool_index: None,

crates/config/src/constants/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ pub(crate) mod profiler;
2323
pub(crate) mod protocols;
2424
pub(crate) mod quota;
2525
pub(crate) mod runtime;
26+
pub(crate) mod scanner;
2627
pub(crate) mod targets;
2728
pub(crate) mod tls;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2024 RustFS Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/// Environment variable name that specifies the data scanner start delay in seconds.
16+
/// - Purpose: Define the delay between data scanner operations.
17+
/// - Unit: seconds (u64).
18+
/// - Valid values: any positive integer.
19+
/// - Semantics: This delay controls how frequently the data scanner checks for and processes data; shorter delays lead to more responsive scanning but may increase system load.
20+
/// - Example: `export RUSTFS_DATA_SCANNER_START_DELAY_SECS=10`
21+
/// - Note: Choose an appropriate delay that balances scanning responsiveness with overall system performance.
22+
pub const ENV_DATA_SCANNER_START_DELAY_SECS: &str = "RUSTFS_DATA_SCANNER_START_DELAY_SECS";
23+
24+
/// Default data scanner start delay in seconds if not specified in the environment variable.
25+
/// - Value: 10 seconds.
26+
/// - Rationale: This default interval provides a reasonable balance between scanning responsiveness and system load for most deployments.
27+
/// - Adjustments: Users may modify this value via the `RUSTFS_DATA_SCANNER_START_DELAY_SECS` environment variable based on their specific scanning requirements and system performance.
28+
pub const DEFAULT_DATA_SCANNER_START_DELAY_SECS: u64 = 60;

crates/config/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ pub use constants::quota::*;
3737
#[cfg(feature = "constants")]
3838
pub use constants::runtime::*;
3939
#[cfg(feature = "constants")]
40+
pub use constants::scanner::*;
41+
#[cfg(feature = "constants")]
4042
pub use constants::targets::*;
4143
#[cfg(feature = "constants")]
4244
pub use constants::tls::*;

crates/ecstore/src/bucket/bucket_target_sys.rs

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::bucket::target::{self, BucketTarget, BucketTargets, Credentials};
2323
use crate::bucket::versioning_sys::BucketVersioningSys;
2424
use aws_credential_types::Credentials as SdkCredentials;
2525
use aws_sdk_s3::config::Region as SdkRegion;
26+
use aws_sdk_s3::error::ProvideErrorMetadata;
2627
use aws_sdk_s3::error::SdkError;
2728
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput;
2829
use aws_sdk_s3::operation::head_bucket::HeadBucketError;
@@ -1094,8 +1095,7 @@ pub struct TargetClient {
10941095

10951096
impl TargetClient {
10961097
pub fn to_url(&self) -> Url {
1097-
let scheme = if self.secure { "https" } else { "http" };
1098-
Url::parse(&format!("{scheme}://{}", self.endpoint)).unwrap()
1098+
Url::parse(&self.endpoint).unwrap()
10991099
}
11001100

11011101
pub async fn bucket_exists(&self, bucket: &str) -> Result<bool, S3ClientError> {
@@ -1104,9 +1104,17 @@ impl TargetClient {
11041104
Err(e) => match e {
11051105
SdkError::ServiceError(oe) => match oe.into_err() {
11061106
HeadBucketError::NotFound(_) => Ok(false),
1107-
other => Err(S3ClientError::new(format!(
1108-
"failed to check bucket exists for bucket:{bucket} please check the bucket name and credentials, error:{other:?}"
1109-
))),
1107+
other => {
1108+
warn!(
1109+
"failed to check bucket exists for bucket:{bucket} please check the bucket name and credentials, error:{:?}",
1110+
other
1111+
);
1112+
let message = other.meta().meta();
1113+
Err(S3ClientError::new(format!(
1114+
"failed to check bucket exists for bucket:{bucket} please check the bucket name and credentials, error:{:?}",
1115+
message
1116+
)))
1117+
}
11101118
},
11111119
SdkError::DispatchFailure(e) => Err(S3ClientError::new(format!(
11121120
"failed to dispatch bucket exists for bucket:{bucket} error:{e:?}"
@@ -1154,10 +1162,17 @@ impl TargetClient {
11541162
body: ByteStream,
11551163
opts: &PutObjectOptions,
11561164
) -> Result<(), S3ClientError> {
1157-
let headers = opts.header();
1165+
let mut headers = opts.header();
11581166

11591167
let builder = self.client.put_object();
11601168

1169+
let version_id = opts.internal.source_version_id.clone();
1170+
if !version_id.is_empty()
1171+
&& let Ok(header_value) = HeaderValue::from_str(&version_id)
1172+
{
1173+
headers.insert(RUSTFS_BUCKET_SOURCE_VERSION_ID, header_value);
1174+
}
1175+
11611176
match builder
11621177
.bucket(bucket)
11631178
.key(object)
@@ -1185,9 +1200,33 @@ impl TargetClient {
11851200
&self,
11861201
bucket: &str,
11871202
object: &str,
1188-
_opts: &PutObjectOptions,
1203+
opts: &PutObjectOptions,
11891204
) -> Result<String, S3ClientError> {
1190-
match self.client.create_multipart_upload().bucket(bucket).key(object).send().await {
1205+
let mut headers = HeaderMap::new();
1206+
let version_id = opts.internal.source_version_id.clone();
1207+
if !version_id.is_empty()
1208+
&& let Ok(header_value) = HeaderValue::from_str(&version_id)
1209+
{
1210+
headers.insert(RUSTFS_BUCKET_SOURCE_VERSION_ID, header_value);
1211+
}
1212+
1213+
match self
1214+
.client
1215+
.create_multipart_upload()
1216+
.bucket(bucket)
1217+
.key(object)
1218+
.customize()
1219+
.map_request(move |mut req| {
1220+
for (k, v) in headers.clone().into_iter() {
1221+
let key_str = k.unwrap().as_str().to_string();
1222+
let value_str = v.to_str().unwrap_or("").to_string();
1223+
req.headers_mut().insert(key_str, value_str);
1224+
}
1225+
Result::<_, aws_smithy_types::error::operation::BuildError>::Ok(req)
1226+
})
1227+
.send()
1228+
.await
1229+
{
11911230
Ok(res) => Ok(res.upload_id.unwrap_or_default()),
11921231
Err(e) => Err(e.into()),
11931232
}

crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,7 @@ impl LifecycleOps for ObjectInfo {
953953
lifecycle::ObjectOpts {
954954
name: self.name.clone(),
955955
user_tags: self.user_tags.clone(),
956-
version_id: self.version_id.map(|v| v.to_string()).unwrap_or_default(),
956+
version_id: self.version_id.clone(),
957957
mod_time: self.mod_time,
958958
size: self.size as usize,
959959
is_latest: self.is_latest,
@@ -1067,7 +1067,7 @@ pub async fn eval_action_from_lifecycle(
10671067
event
10681068
}
10691069

1070-
async fn apply_transition_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
1070+
pub async fn apply_transition_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
10711071
if oi.delete_marker || oi.is_dir {
10721072
return false;
10731073
}
@@ -1161,7 +1161,7 @@ pub async fn apply_expiry_on_non_transitioned_objects(
11611161
true
11621162
}
11631163

1164-
async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
1164+
pub async fn apply_expiry_rule(event: &lifecycle::Event, src: &LcEventSrc, oi: &ObjectInfo) -> bool {
11651165
let mut expiry_state = GLOBAL_ExpiryState.write().await;
11661166
expiry_state.enqueue_by_days(oi, event, src).await;
11671167
true

0 commit comments

Comments
 (0)