Skip to content

Commit f109e63

Browse files
haipham23igorlukanin
authored andcommitted
chore: use crate
1 parent 753cf52 commit f109e63

File tree

1 file changed

+48
-76
lines changed
  • rust/cubestore/cubestore/src/remotefs

1 file changed

+48
-76
lines changed

rust/cubestore/cubestore/src/remotefs/s3.rs

Lines changed: 48 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ use tokio::fs;
2121
use tokio::fs::File;
2222
use tokio::io::AsyncWriteExt;
2323
use tokio::sync::Mutex;
24+
use tokio::spawn;
25+
use aws_sdk_sts::{Client, Config, Region};
26+
use aws_sdk_sts::model::{AssumeRoleRequest, Credentials};
2427

2528
pub struct S3RemoteFs {
2629
dir: PathBuf,
@@ -42,53 +45,30 @@ impl fmt::Debug for S3RemoteFs {
4245
}
4346

4447
impl S3RemoteFs {
45-
pub fn new(
48+
pub async fn new(
4649
dir: PathBuf,
4750
region: String,
4851
bucket_name: String,
4952
sub_path: Option<String>,
5053
) -> Result<Arc<Self>, CubeError> {
51-
let role_name = env::var("CUBESTORE_AWS_IAM_ROLE").ok();
54+
let region = region.parse::<Region>().map_err(|err| {
55+
CubeError::internal(format!(
56+
"Failed to parse Region '{}': {}",
57+
region,
58+
err.to_string()
59+
))
60+
})?;
61+
62+
let role_name = env::var("CUBESTORE_AWS_ROLE").ok();
5263
let (access_key, secret_key) = match role_name {
5364
Some(role_name) => {
54-
let region = env::var("CUBESTORE_AWS_REGION").expect("CUBESTORE_AWS_REGION must be set");
55-
let account_id = Command::new("aws")
56-
.args(&["sts", "get-caller-identity", "--query", "Account", "--output", "text"])
57-
.output()
58-
.expect("Failed to get account ID")
59-
.stdout
60-
.trim()
61-
.to_string();
62-
63-
let assume_role_output = Command::new("aws")
64-
.args(&[
65-
"sts",
66-
"assume-role",
67-
"--role-arn",
68-
&format!("arn:aws:iam::{}:role/{}", account_id, role_name),
69-
"--role-session-name",
70-
&format!("session-{}", role_name),
71-
"--query",
72-
"Credentials",
73-
"--output",
74-
"text",
75-
])
76-
.output()
77-
.expect("Failed to assume role")
78-
.stdout
79-
.trim()
80-
.to_string();
81-
82-
let access_key = assume_role_output.split_whitespace().nth(1).unwrap().trim_matches('"');
83-
let secret_key = assume_role_output.split_whitespace().nth(3).unwrap().trim_matches('"');
84-
85-
(Some(access_key), Some(secret_key))
65+
assume_role(&role_name, &region.to_string()).await
8666
}
8767
None => (
8868
env::var("CUBESTORE_AWS_ACCESS_KEY_ID").ok(),
8969
env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok(),
9070
),
91-
};
71+
}?;
9272

9373
let credentials = Credentials::new(
9474
access_key.as_deref(),
@@ -97,19 +77,12 @@ impl S3RemoteFs {
9777
None,
9878
None,
9979
)
100-
.map_err(|err| {
80+
.map_err(|err| {
10181
CubeError::internal(format!(
10282
"Failed to create S3 credentials: {}",
10383
err.to_string()
10484
))
10585
})?;
106-
let region = region.parse::<Region>().map_err(|err| {
107-
CubeError::internal(format!(
108-
"Failed to parse Region '{}': {}",
109-
region,
110-
err.to_string()
111-
))
112-
})?;
11386
let bucket = Bucket::new(&bucket_name, region.clone(), credentials)?;
11487
let fs = Arc::new(Self {
11588
dir,
@@ -130,6 +103,34 @@ impl S3RemoteFs {
130103
}
131104
}
132105

106+
async fn assume_role(
107+
role_or_access_key: &str,
108+
region: &str,
109+
) -> Result<(String, String), CubeError> {
110+
let account_id = Client::new(Config::builder().region(Region::new(region)).build())
111+
.get_caller_identity()
112+
.send()
113+
.await
114+
.map_err(|e| CubeError::internal(format!("Failed to get account ID: {}", e)))?
115+
.account;
116+
117+
let assume_role_output = Client::new(Config::builder().region(Region::new(region)).build())
118+
.assume_role(AssumeRoleRequest::builder()
119+
.role_arn(format!("arn:aws:iam::{}:role/{}", account_id, role_or_access_key))
120+
.duration_seconds(28800)
121+
.build())
122+
.send()
123+
.await
124+
.map_err(|e| CubeError::internal(format!("Failed to assume role: {}", e)))?
125+
.credentials
126+
.ok_or_else(|| CubeError::internal("Failed to get credentials".to_string()))?;
127+
128+
let access_key = assume_role_output.access_key_id.ok_or_else(|| CubeError::internal("Failed to get access key".to_string()))?;
129+
let secret_key = assume_role_output.secret_access_key.ok_or_else(|| CubeError::internal("Failed to get secret key".to_string()))?;
130+
131+
Ok((access_key, secret_key))
132+
}
133+
133134
fn spawn_creds_refresh_loop(
134135
role_or_access_key: Option<String>,
135136
is_role: bool,
@@ -144,49 +145,20 @@ fn spawn_creds_refresh_loop(
144145
}
145146

146147
let fs = Arc::downgrade(fs);
147-
std::thread::spawn(move || {
148+
spawn(async move {
148149
log::debug!("Started S3 credentials refresh loop");
149150
loop {
150-
std::thread::sleep(refresh_every);
151+
tokio::time::sleep(refresh_every).await;
151152
let fs = match fs.upgrade() {
152153
None => {
153154
log::debug!("Stopping S3 credentials refresh loop");
154155
return;
155156
}
156157
Some(fs) => fs,
157158
};
158-
let (access_key, secret_key) = if is_role {
159-
let region = env::var("CUBESTORE_AWS_REGION").expect("CUBESTORE_AWS_REGION must be set");
160-
let account_id = Command::new("aws")
161-
.args(&["sts", "get-caller-identity", "--query", "Account", "--output", "text"])
162-
.output()
163-
.expect("Failed to get account ID")
164-
.stdout
165-
.trim()
166-
.to_string();
167-
168-
let assume_role_output = Command::new("aws")
169-
.args(&[
170-
"sts",
171-
"assume-role",
172-
"--role-arn",
173-
&format!("arn:aws:iam::{}:role/{}", account_id, role_or_access_key.as_ref().unwrap()),
174-
"--role-session-name",
175-
&format!("session-{}", role_or_access_key.as_ref().unwrap()),
176-
"--query",
177-
"Credentials",
178-
"--output",
179-
"text",
180-
])
181-
.output()
182-
.expect("Failed to assume role")
183-
.stdout
184-
.trim()
185-
.to_string();
186-
187-
let access_key = assume_role_output.split_whitespace().nth(1).unwrap().trim_matches('"');
188-
let secret_key = assume_role_output.split_whitespace().nth(3).unwrap().trim_matches('"');
189159

160+
let (access_key, secret_key) = if is_role {
161+
let (access_key, secret_key) = assume_role(&role_or_access_key.as_ref().unwrap(), &region.to_string()).await;
190162
(Some(access_key), Some(secret_key))
191163
} else {
192164
let secret_key = env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok();

0 commit comments

Comments
 (0)