Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions crates/fluss/src/client/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,55 @@ impl CredentialsCache {
Ok(props)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::client::metadata::Metadata;
use crate::cluster::Cluster;

#[test]
fn convert_hadoop_key_to_opendal_maps_known_keys() {
let (key, invert) = convert_hadoop_key_to_opendal("fs.s3a.endpoint").expect("key");
assert_eq!(key, "endpoint");
assert!(!invert);

let (key, invert) =
convert_hadoop_key_to_opendal("fs.s3a.path.style.access").expect("key");
assert_eq!(key, "enable_virtual_host_style");
assert!(invert);

assert!(convert_hadoop_key_to_opendal("fs.s3a.connection.ssl.enabled").is_none());
assert!(convert_hadoop_key_to_opendal("unknown.key").is_none());
}

#[tokio::test]
async fn credentials_cache_returns_cached_props() -> Result<()> {
let cached = CachedToken {
access_key_id: "ak".to_string(),
secret_access_key: "sk".to_string(),
security_token: Some("token".to_string()),
addition_infos: HashMap::from([(
"fs.s3a.path.style.access".to_string(),
"true".to_string(),
)]),
cached_at: Instant::now(),
};

let cache = CredentialsCache {
inner: RwLock::new(Some(cached)),
rpc_client: Arc::new(RpcClient::new()),
metadata: Arc::new(Metadata::new_for_test(Arc::new(Cluster::default()))),
};

let props = cache.get_or_refresh().await?;
assert_eq!(props.get("access_key_id"), Some(&"ak".to_string()));
assert_eq!(props.get("secret_access_key"), Some(&"sk".to_string()));
assert_eq!(props.get("security_token"), Some(&"token".to_string()));
assert_eq!(
props.get("enable_virtual_host_style"),
Some(&"false".to_string())
);
Ok(())
}
}
90 changes: 88 additions & 2 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,93 @@ impl Metadata {
guard.clone()
}

pub fn leader_for(&self, _table_bucket: &TableBucket) -> Option<&ServerNode> {
todo!()
pub fn leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> {
let cluster = self.cluster.read();
cluster.leader_for(table_bucket).cloned()
}
}

#[cfg(test)]
impl Metadata {
pub(crate) fn new_for_test(cluster: Arc<Cluster>) -> Self {
Metadata {
cluster: RwLock::new(cluster),
connections: Arc::new(RpcClient::new()),
bootstrap: Arc::from(""),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::{BucketLocation, Cluster, ServerNode, ServerType};
use crate::metadata::{DataField, DataTypes, Schema, TableDescriptor, TableInfo, TablePath};
use std::collections::HashMap;

fn build_table_info(table_path: TablePath, table_id: i64) -> TableInfo {
let row_type = DataTypes::row(vec![DataField::new(
"id".to_string(),
DataTypes::int(),
None,
)]);
let mut schema_builder = Schema::builder().with_row_type(&row_type);
let schema = schema_builder.build().expect("schema build");
let table_descriptor = TableDescriptor::builder()
.schema(schema)
.distributed_by(Some(1), vec![])
.build()
.expect("descriptor build");
TableInfo::of(table_path, table_id, 1, table_descriptor, 0, 0)
}

fn build_cluster(table_path: &TablePath, table_id: i64) -> Arc<Cluster> {
let server = ServerNode::new(1, "127.0.0.1".to_string(), 9092, ServerType::TabletServer);
let table_bucket = TableBucket::new(table_id, 0);
let bucket_location =
BucketLocation::new(table_bucket.clone(), Some(server.clone()), table_path.clone());

let mut servers = HashMap::new();
servers.insert(server.id(), server);

let mut locations_by_path = HashMap::new();
locations_by_path.insert(table_path.clone(), vec![bucket_location.clone()]);

let mut locations_by_bucket = HashMap::new();
locations_by_bucket.insert(table_bucket, bucket_location);

let mut table_id_by_path = HashMap::new();
table_id_by_path.insert(table_path.clone(), table_id);

let mut table_info_by_path = HashMap::new();
table_info_by_path.insert(table_path.clone(), build_table_info(table_path.clone(), table_id));

Arc::new(Cluster::new(
None,
servers,
locations_by_path,
locations_by_bucket,
table_id_by_path,
table_info_by_path,
))
}

#[test]
fn leader_for_returns_server() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let cluster = build_cluster(&table_path, 1);
let metadata = Metadata::new_for_test(cluster);
let leader = metadata.leader_for(&TableBucket::new(1, 0)).expect("leader");
assert_eq!(leader.id(), 1);
}

#[test]
fn invalidate_server_removes_leader() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let cluster = build_cluster(&table_path, 1);
let metadata = Metadata::new_for_test(cluster);
metadata.invalidate_server(&1, vec![1]);
let cluster = metadata.get_cluster();
assert!(cluster.get_tablet_server(1).is_none());
}
}
Loading