3737import subprocess
3838import sys
3939import warnings
40+ from pathlib import Path
4041from typing import (
4142 Any ,
4243 Callable ,
@@ -1091,6 +1092,80 @@ def uname_attr(self, attribute: str) -> str:
10911092 """
10921093 return self ._uname_info .get (attribute , "" )
10931094
1095+ @staticmethod
1096+ def __abs_path_join (root_path : Path , abs_path : Path ) -> Path :
1097+ rel_path = os .path .splitdrive (abs_path )[1 ].lstrip (os .sep )
1098+ if os .altsep is not None :
1099+ rel_path = rel_path .lstrip (os .altsep )
1100+
1101+ return root_path / Path (rel_path )
1102+
1103+ def __resolve_path_relatively_to_chroot (self , path : str ) -> Path :
1104+ """
1105+ Resolves potential symlinks encountered in ``path`` against
1106+ ``self.root_dir`` if inside the chroot, else just return the original
1107+ path.
1108+ We're doing this check at a central place, to make calling code more
1109+ readable and to de-duplicate.
1110+ """
1111+ path_to_resolve = Path (path )
1112+
1113+ if self .root_dir is None :
1114+ return path_to_resolve
1115+
1116+ # resolve `self.root_dir`, once and for all.
1117+ chroot_path = Path (self .root_dir ).resolve ()
1118+
1119+ # consider non-absolute `path_to_resolve` relative to chroot
1120+ if not path_to_resolve .is_absolute ():
1121+ path_to_resolve = chroot_path / path_to_resolve
1122+
1123+ seen_paths = set ()
1124+ while True :
1125+ # although `path_to_resolve` _should_ be relative to chroot (either
1126+ # passed from trusted code or already resolved by previous loop
1127+ # iteration), we enforce this check as some input files are
1128+ # made available through API
1129+ try :
1130+ relative_parts = path_to_resolve .relative_to (chroot_path ).parts
1131+ except ValueError :
1132+ raise FileNotFoundError
1133+
1134+ for i , part in enumerate (relative_parts , start = 1 ):
1135+ if part == os .pardir :
1136+ # normalize path parts up to this segment (relatively to chroot)
1137+ path_to_resolve = self .__abs_path_join (
1138+ chroot_path ,
1139+ Path (os .path .normpath ("/" / Path (* relative_parts [:i ]))),
1140+ ) / Path (* relative_parts [i :])
1141+ break # restart path resolution
1142+
1143+ # attempt symbolic link resolution
1144+ symlink_candidate = chroot_path / Path (* relative_parts [:i ])
1145+ try :
1146+ symlink_resolved = Path (os .readlink (symlink_candidate ))
1147+ except OSError : # not a symlink, go to next path part
1148+ continue
1149+
1150+ # "bend" absolute resolved path inside the chroot
1151+ # consider non-absolute resolved path relatively to chroot
1152+ if symlink_resolved .is_absolute ():
1153+ path_to_resolve = self .__abs_path_join (
1154+ chroot_path , symlink_resolved
1155+ )
1156+ else :
1157+ path_to_resolve = symlink_candidate .parent / symlink_resolved
1158+ path_to_resolve /= Path (* relative_parts [i :])
1159+ break # restart path resolution
1160+ else :
1161+ # return final path as it can be considered resolved
1162+ return path_to_resolve
1163+
1164+ # prevent symlinks infinite loop by tracking successive resolutions
1165+ if path_to_resolve in seen_paths :
1166+ raise FileNotFoundError
1167+ seen_paths .add (path_to_resolve )
1168+
10941169 @cached_property
10951170 def _os_release_info (self ) -> Dict [str , str ]:
10961171 """
@@ -1099,10 +1174,14 @@ def _os_release_info(self) -> Dict[str, str]:
10991174 Returns:
11001175 A dictionary containing all information items.
11011176 """
1102- if os .path .isfile (self .os_release_file ):
1103- with open (self .os_release_file , encoding = "utf-8" ) as release_file :
1177+ try :
1178+ with open (
1179+ self .__resolve_path_relatively_to_chroot (self .os_release_file ),
1180+ encoding = "utf-8" ,
1181+ ) as release_file :
11041182 return self ._parse_os_release_content (release_file )
1105- return {}
1183+ except FileNotFoundError :
1184+ return {}
11061185
11071186 @staticmethod
11081187 def _parse_os_release_content (lines : TextIO ) -> Dict [str , str ]:
@@ -1225,7 +1304,10 @@ def _oslevel_info(self) -> str:
12251304 def _debian_version (self ) -> str :
12261305 try :
12271306 with open (
1228- os .path .join (self .etc_dir , "debian_version" ), encoding = "ascii"
1307+ self .__resolve_path_relatively_to_chroot (
1308+ os .path .join (self .etc_dir , "debian_version" )
1309+ ),
1310+ encoding = "ascii" ,
12291311 ) as fp :
12301312 return fp .readline ().rstrip ()
12311313 except FileNotFoundError :
@@ -1235,7 +1317,10 @@ def _debian_version(self) -> str:
12351317 def _armbian_version (self ) -> str :
12361318 try :
12371319 with open (
1238- os .path .join (self .etc_dir , "armbian-release" ), encoding = "ascii"
1320+ self .__resolve_path_relatively_to_chroot (
1321+ os .path .join (self .etc_dir , "armbian-release" )
1322+ ),
1323+ encoding = "ascii" ,
12391324 ) as fp :
12401325 return self ._parse_os_release_content (fp ).get ("version" , "" )
12411326 except FileNotFoundError :
@@ -1287,9 +1372,10 @@ def _distro_release_info(self) -> Dict[str, str]:
12871372 try :
12881373 basenames = [
12891374 basename
1290- for basename in os .listdir (self .etc_dir )
1375+ for basename in os .listdir (
1376+ self .__resolve_path_relatively_to_chroot (self .etc_dir )
1377+ )
12911378 if basename not in _DISTRO_RELEASE_IGNORE_BASENAMES
1292- and os .path .isfile (os .path .join (self .etc_dir , basename ))
12931379 ]
12941380 # We sort for repeatability in cases where there are multiple
12951381 # distro specific files; e.g. CentOS, Oracle, Enterprise all
@@ -1305,12 +1391,13 @@ def _distro_release_info(self) -> Dict[str, str]:
13051391 match = _DISTRO_RELEASE_BASENAME_PATTERN .match (basename )
13061392 if match is None :
13071393 continue
1308- filepath = os .path .join (self .etc_dir , basename )
1309- distro_info = self ._parse_distro_release_file (filepath )
1394+ # NOTE: _parse_distro_release_file below will be resolving for us
1395+ unresolved_filepath = os .path .join (self .etc_dir , basename )
1396+ distro_info = self ._parse_distro_release_file (unresolved_filepath )
13101397 # The name is always present if the pattern matches.
13111398 if "name" not in distro_info :
13121399 continue
1313- self .distro_release_file = filepath
1400+ self .distro_release_file = unresolved_filepath
13141401 break
13151402 else : # the loop didn't "break": no candidate.
13161403 return {}
@@ -1344,7 +1431,10 @@ def _parse_distro_release_file(self, filepath: str) -> Dict[str, str]:
13441431 A dictionary containing all information items.
13451432 """
13461433 try :
1347- with open (filepath , encoding = "utf-8" ) as fp :
1434+ with open (
1435+ self .__resolve_path_relatively_to_chroot (filepath ),
1436+ encoding = "utf-8" ,
1437+ ) as fp :
13481438 # Only parse the first line. For instance, on SLES there
13491439 # are multiple lines. We don't want them...
13501440 return self ._parse_distro_release_content (fp .readline ())
0 commit comments