Skip to content

Commit 9287f88

Browse files
authored
Support relative paths with home directory resolution (#282)
1 parent 7386e24 commit 9287f88

File tree

6 files changed

+215
-21
lines changed

6 files changed

+215
-21
lines changed

rust/src/client.rs

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::file::{FileReader, FileWriter};
1414
use crate::hdfs::protocol::NamenodeProtocol;
1515
use crate::hdfs::proxy::NameServiceProxy;
1616
use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17+
use crate::security::user::User;
1718

1819
use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
1920
use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
@@ -118,16 +119,46 @@ impl MountLink {
118119
struct MountTable {
119120
mounts: Vec<MountLink>,
120121
fallback: MountLink,
122+
home_dir: String,
121123
}
122124

123125
impl MountTable {
124126
fn resolve(&self, src: &str) -> (&MountLink, String) {
127+
let path = if src.starts_with('/') {
128+
src.to_string()
129+
} else {
130+
format!("{}/{}", self.home_dir, src)
131+
};
132+
125133
for link in self.mounts.iter() {
126-
if let Some(resolved) = link.resolve(src) {
134+
if let Some(resolved) = link.resolve(&path) {
127135
return (link, resolved);
128136
}
129137
}
130-
(&self.fallback, self.fallback.resolve(src).unwrap())
138+
(&self.fallback, self.fallback.resolve(&path).unwrap())
139+
}
140+
}
141+
142+
fn build_home_dir(
143+
scheme: &str,
144+
host: Option<&str>,
145+
config: &Configuration,
146+
username: &str,
147+
) -> String {
148+
let prefix = match scheme {
149+
"hdfs" => config.get("dfs.user.home.dir.prefix"),
150+
"viewfs" => {
151+
host.and_then(|host| config.get(&format!("fs.viewfs.mounttable.{host}.homedir")))
152+
}
153+
_ => None,
154+
}
155+
.unwrap_or("/user");
156+
157+
let prefix = prefix.trim_end_matches('/');
158+
if prefix.is_empty() {
159+
format!("/{username}")
160+
} else {
161+
format!("{prefix}/{username}")
131162
}
132163
}
133164

@@ -334,6 +365,19 @@ impl Client {
334365

335366
let rt_holder = RuntimeHolder::new(rt);
336367

368+
let user_info = User::get_user_info();
369+
let username = user_info
370+
.effective_user
371+
.as_deref()
372+
.or(user_info.real_user.as_deref())
373+
.expect("User info must include a username");
374+
let home_dir = build_home_dir(
375+
resolved_url.scheme(),
376+
resolved_url.host_str(),
377+
config.as_ref(),
378+
username,
379+
);
380+
337381
let mount_table = match url.scheme() {
338382
"hdfs" => {
339383
let proxy = NameServiceProxy::new(
@@ -346,13 +390,15 @@ impl Client {
346390
MountTable {
347391
mounts: Vec::new(),
348392
fallback: MountLink::new("/", "/", protocol),
393+
home_dir,
349394
}
350395
}
351396
"viewfs" => Self::build_mount_table(
352397
// Host is guaranteed to be present.
353398
resolved_url.host_str().expect("URL must have a host"),
354399
Arc::clone(&config),
355400
rt_holder.get_handle(),
401+
home_dir,
356402
)?,
357403
_ => {
358404
return Err(HdfsError::InvalidArgument(
@@ -372,6 +418,7 @@ impl Client {
372418
host: &str,
373419
config: Arc<Configuration>,
374420
handle: Handle,
421+
home_dir: String,
375422
) -> Result<MountTable> {
376423
let mut mounts: Vec<MountLink> = Vec::new();
377424
let mut fallback: Option<MountLink> = None;
@@ -408,7 +455,11 @@ impl Client {
408455
mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
409456
mounts.reverse();
410457

411-
Ok(MountTable { mounts, fallback })
458+
Ok(MountTable {
459+
mounts,
460+
fallback,
461+
home_dir,
462+
})
412463
} else {
413464
Err(HdfsError::InvalidArgument(
414465
"No viewfs fallback mount found".to_string(),
@@ -1180,6 +1231,7 @@ mod test {
11801231
let mount_table = MountTable {
11811232
mounts: vec![link1, link2, link3],
11821233
fallback,
1234+
home_dir: "/user/test".to_string(),
11831235
};
11841236

11851237
// Exact mount path resolves to the exact HDFS path
@@ -1204,6 +1256,39 @@ mod test {
12041256
let (link, resolved) = mount_table.resolve("/mount3/nested/file");
12051257
assert_eq!(link.viewfs_path, "/mount3/nested");
12061258
assert_eq!(resolved, "/path3/file");
1259+
1260+
let (link, resolved) = mount_table.resolve("file");
1261+
assert_eq!(link.viewfs_path, "");
1262+
assert_eq!(resolved, "/path4/user/test/file");
1263+
1264+
let (link, resolved) = mount_table.resolve("dir/subdir");
1265+
assert_eq!(link.viewfs_path, "");
1266+
assert_eq!(resolved, "/path4/user/test/dir/subdir");
1267+
1268+
let mount_table = MountTable {
1269+
mounts: vec![
1270+
MountLink::new(
1271+
"/mount1",
1272+
"/path1/nested",
1273+
create_protocol("hdfs://127.0.0.1:9000"),
1274+
),
1275+
MountLink::new(
1276+
"/mount2",
1277+
"/path2",
1278+
create_protocol("hdfs://127.0.0.1:9001"),
1279+
),
1280+
],
1281+
fallback: MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")),
1282+
home_dir: "/mount1/user".to_string(),
1283+
};
1284+
1285+
let (link, resolved) = mount_table.resolve("file");
1286+
assert_eq!(link.viewfs_path, "/mount1");
1287+
assert_eq!(resolved, "/path1/nested/user/file");
1288+
1289+
let (link, resolved) = mount_table.resolve("dir/subdir");
1290+
assert_eq!(link.viewfs_path, "/mount1");
1291+
assert_eq!(resolved, "/path1/nested/user/dir/subdir");
12071292
}
12081293

12091294
#[test]

rust/src/minidfs.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,7 @@ impl MiniDfs {
9595

9696
if features.contains(&DfsFeatures::Security) {
9797
let krb_conf = output.next().unwrap().unwrap();
98-
let kdestroy_exec = which("kdestroy").expect("Failed to find kdestroy executable");
99-
Command::new(kdestroy_exec).spawn().unwrap().wait().unwrap();
98+
Self::kdestroy();
10099

101100
if !PathBuf::from("target/test/hdfs.keytab").exists() {
102101
panic!("Failed to find keytab");
@@ -147,6 +146,17 @@ impl MiniDfs {
147146
url: url.to_string(),
148147
}
149148
}
149+
150+
fn kdestroy() {
151+
let kdestroy_exec = which("kdestroy").expect("Failed to find kdestroy executable");
152+
Command::new(kdestroy_exec)
153+
.stdout(Stdio::null())
154+
.stderr(Stdio::null())
155+
.spawn()
156+
.unwrap()
157+
.wait()
158+
.unwrap();
159+
}
150160
}
151161

152162
impl Default for MiniDfs {
@@ -162,5 +172,7 @@ impl Drop for MiniDfs {
162172
stdin.write_all(b"\n").unwrap();
163173
self.process.kill().unwrap();
164174
self.process.wait().unwrap();
175+
176+
Self::kdestroy();
165177
}
166178
}

rust/src/security/gssapi.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,64 @@ impl Drop for GssName {
209209
}
210210
}
211211

212+
struct GssCred {
213+
cred: bindings::gss_cred_id_t,
214+
}
215+
216+
impl GssCred {
217+
fn acquire_default() -> crate::Result<Self> {
218+
let mut minor = 0;
219+
let mut cred = ptr::null_mut();
220+
let major = unsafe {
221+
libgssapi()?.gss_acquire_cred(
222+
&mut minor,
223+
ptr::null_mut(),
224+
bindings::_GSS_C_INDEFINITE,
225+
ptr::null_mut(),
226+
bindings::GSS_C_INITIATE as bindings::gss_cred_usage_t,
227+
&mut cred,
228+
ptr::null_mut(),
229+
ptr::null_mut(),
230+
)
231+
};
232+
check_gss_ok(major, minor)?;
233+
Ok(Self { cred })
234+
}
235+
236+
fn name(&self) -> crate::Result<GssName> {
237+
let mut minor = 0;
238+
let mut name = ptr::null_mut::<bindings::gss_name_struct>();
239+
let major = unsafe {
240+
libgssapi()?.gss_inquire_cred(
241+
&mut minor,
242+
self.cred,
243+
&mut name as *mut bindings::gss_name_t,
244+
ptr::null_mut(),
245+
ptr::null_mut(),
246+
ptr::null_mut(),
247+
)
248+
};
249+
check_gss_ok(major, minor)?;
250+
Ok(GssName { name })
251+
}
252+
}
253+
254+
impl Drop for GssCred {
255+
fn drop(&mut self) {
256+
if !self.cred.is_null() {
257+
let mut minor = bindings::GSS_S_COMPLETE;
258+
let major = unsafe {
259+
libgssapi()
260+
.unwrap()
261+
.gss_release_cred(&mut minor, &mut self.cred)
262+
};
263+
if let Err(e) = check_gss_ok(major, minor) {
264+
warn!("Failed to release GSSAPI credential: {:?}", e);
265+
}
266+
}
267+
}
268+
}
269+
212270
struct GssClientCtx {
213271
ctx: bindings::gss_ctx_id_t,
214272
target: GssName,
@@ -440,6 +498,12 @@ impl GssapiSession {
440498
let state = GssapiState::Pending(GssClientCtx::new(target));
441499
Ok(Self { state })
442500
}
501+
502+
pub(crate) fn get_default_principal() -> crate::Result<String> {
503+
let cred = GssCred::acquire_default()?;
504+
let name = cred.name()?;
505+
name.display_name()
506+
}
443507
}
444508

445509
impl SaslSession for GssapiSession {

rust/src/security/sasl.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ fn select_method(
198198
auth.server_id().to_string(),
199199
token,
200200
);
201-
// let session = GSASLSession::new(auth.protocol(), auth.server_id(), token)?;
202201

203202
return Ok((auth.clone(), Some(Box::new(session))));
204203
}
@@ -261,7 +260,7 @@ impl SaslReader {
261260
}
262261

263262
pub(crate) async fn read_exact(&mut self, buf: &mut [u8]) -> Result<usize> {
264-
if self.session.is_some() {
263+
if let Some(session) = self.session.clone() {
265264
let read_len = buf.len();
266265
let mut bytes_remaining = read_len;
267266
while bytes_remaining > 0 {
@@ -271,14 +270,7 @@ impl SaslReader {
271270
todo!();
272271
}
273272

274-
// let mut writer = BytesMut::with_capacity(response.token().len()).writer();
275-
let decoded = self
276-
.session
277-
.as_ref()
278-
.unwrap()
279-
.lock()
280-
.unwrap()
281-
.decode(response.token())?;
273+
let decoded = session.lock().unwrap().decode(response.token())?;
282274
self.buffer = Bytes::from(decoded)
283275
}
284276
let copy_len = usize::min(bytes_remaining, self.buffer.remaining());
@@ -340,17 +332,13 @@ impl SaslWriter {
340332
}
341333

342334
pub(crate) async fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
343-
if self.session.is_some() {
335+
if let Some(session) = &self.session {
344336
let mut rpc_sasl = RpcSaslProto {
345337
state: SaslState::Wrap as i32,
346338
..Default::default()
347339
};
348340

349-
// let mut writer = Vec::with_capacity(buf.len()).writer();
350-
let encoded = self
351-
.session
352-
.as_ref()
353-
.unwrap()
341+
let encoded = session
354342
.lock()
355343
.unwrap()
356344
.encode(buf)

rust/src/security/user.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::proto::common::TokenProto;
1717
use crate::proto::hdfs::AccessModeProto;
1818
use crate::proto::hdfs::BlockTokenSecretProto;
1919
use crate::proto::hdfs::StorageTypeProto;
20+
use crate::security::gssapi::GssapiSession;
2021

2122
const HADOOP_USER_NAME: &str = "HADOOP_USER_NAME";
2223
const HADOOP_PROXY_USER: &str = "HADOOP_PROXY_USER";
@@ -381,6 +382,14 @@ impl User {
381382
}
382383
}
383384

385+
pub(crate) fn get_user_info() -> UserInfo {
386+
if let Ok(principal) = GssapiSession::get_default_principal() {
387+
return User::get_user_info_from_principal(&principal);
388+
}
389+
390+
User::get_simple_user()
391+
}
392+
384393
pub(crate) fn get_user_from_principal(principal: &str) -> String {
385394
// If there's a /, take the part before it.
386395
if let Some(index) = principal.find('/') {

0 commit comments

Comments
 (0)