88
99from structlog import get_logger
1010
11- from unblob .extractor import is_safe_path
12-
13- from ...file_utils import Endian , InvalidInputFormat , read_until_past , round_up
14- from ...models import Extractor , File , HexString , StructHandler , ValidChunk
11+ from ...file_utils import (
12+ Endian ,
13+ FileSystem ,
14+ InvalidInputFormat ,
15+ read_until_past ,
16+ round_up ,
17+ )
18+ from ...models import (
19+ Extractor ,
20+ ExtractResult ,
21+ File ,
22+ HexString ,
23+ StructHandler ,
24+ ValidChunk ,
25+ )
1526
1627logger = get_logger ()
1728
@@ -164,12 +175,12 @@ class RomFSHeader:
164175 file : File
165176 end_offset : int
166177 inodes : Dict [int , "FileHeader" ]
167- extract_root : Path
178+ fs : FileSystem
168179
169180 def __init__ (
170181 self ,
171182 file : File ,
172- extract_root : Path ,
183+ fs : FileSystem ,
173184 ):
174185 self .file = file
175186 self .file .seek (0 , io .SEEK_END )
@@ -186,7 +197,7 @@ def __init__(
186197 self .header_end_offset = self .file .tell ()
187198 self .inodes = {}
188199
189- self .extract_root = extract_root
200+ self .fs = fs
190201
191202 def valid_checksum (self ) -> bool :
192203 current_position = self .file .tell ()
@@ -242,113 +253,64 @@ def walk_dir(self, addr: int, parent: Optional[FileHeader] = None):
242253 self .inodes [addr ] = file_header
243254 return file_header .next_filehdr
244255
245- def create_symlink (self , extract_root : Path , output_path : Path , inode : FileHeader ):
246- target = inode .content .decode ("utf-8" ).lstrip ("/" )
247-
248- if target .startswith (".." ):
249- target_path = extract_root .joinpath (output_path .parent , target ).resolve ()
250- else :
251- target_path = extract_root .joinpath (target ).resolve ()
256+ def create_symlink (self , output_path : Path , inode : FileHeader ):
257+ target_path = Path (inode .content .decode ("utf-8" ))
258+ self .fs .create_symlink (src = target_path , dst = output_path )
252259
253- if not is_safe_path (extract_root , target_path ):
254- logger .warning (
255- "Path traversal attempt through symlink." , target_path = target_path
256- )
257- return
258- # we create relative paths to make the output directory portable
259- output_path .symlink_to (os .path .relpath (target_path , start = output_path .parent ))
260-
261- def create_hardlink (self , extract_root : Path , link_path : Path , inode : FileHeader ):
260+ def create_hardlink (self , output_path : Path , inode : FileHeader ):
262261 if inode .spec_info in self .inodes :
263- target = str (self .inodes [inode .spec_info ].path ).lstrip ("/" )
264- target_path = extract_root .joinpath (target ).resolve ()
265-
266- # we don't need to check for potential traversal given that, if the inode
267- # is in self.inodes, it already got verified in create_inode.
268- try :
269- os .link (target_path , link_path )
270- except FileNotFoundError :
271- logger .warning (
272- "Hard link target does not exist, discarding." ,
273- target_path = target_path ,
274- link_path = link_path ,
275- )
276- except PermissionError :
277- logger .warning (
278- "Not enough privileges to create hardlink to block/char device, discarding." ,
279- target_path = target_path ,
280- link_path = link_path ,
281- )
262+ target_path = self .inodes [inode .spec_info ].path
263+ self .fs .create_hardlink (dst = output_path , src = target_path )
282264 else :
283265 logger .warning ("Invalid hard link target" , inode_key = inode .spec_info )
284266
285- def create_inode (self , extract_root : Path , inode : FileHeader ):
286- output_path = extract_root .joinpath (inode .path ).resolve ()
287- if not is_safe_path (extract_root , inode .path ):
288- logger .warning (
289- "Path traversal attempt, discarding." , output_path = output_path
290- )
291- return
267+ def create_inode (self , inode : FileHeader ):
268+ output_path = inode .path
292269 logger .info ("dumping inode" , inode = inode , output_path = str (output_path ))
293270
294271 if inode .fs_type == FSType .HARD_LINK :
295- self .create_hardlink (extract_root , output_path , inode )
272+ self .create_hardlink (output_path , inode )
296273 elif inode .fs_type == FSType .SYMLINK :
297- self .create_symlink (extract_root , output_path , inode )
274+ self .create_symlink (output_path , inode )
298275 elif inode .fs_type == FSType .DIRECTORY :
299- output_path . mkdir (mode = inode .mode , exist_ok = True )
276+ self . fs . mkdir (output_path , mode = inode .mode , exist_ok = True )
300277 elif inode .fs_type == FSType .FILE :
301- with output_path .open ("wb" ) as f :
302- f .write (inode .content )
278+ self .fs .write_bytes (output_path , inode .content )
303279 elif inode .fs_type in [FSType .BLOCK_DEV , FSType .CHAR_DEV ]:
304- os . mknod (inode . path , inode .mode , inode .dev )
280+ self . fs . mknod (output_path , mode = inode .mode , device = inode .dev )
305281 elif inode .fs_type == FSType .FIFO :
306- os . mkfifo (output_path , inode .mode )
282+ self . fs . mkfifo (output_path , mode = inode .mode )
307283
308284 def dump_fs (self ):
309- # first we create files and directories
310- fd_inodes = {
311- k : v
312- for k , v in self .inodes .items ()
313- if v .fs_type in [FSType .FILE , FSType .DIRECTORY , FSType .FIFO , FSType .SOCKET ]
314- }
315- for inode in sorted (fd_inodes .values (), key = lambda inode : inode .path ):
316- self .create_inode (self .extract_root , inode )
317-
318- if os .geteuid () != 0 :
319- logger .warning (
320- "root privileges are required to create block and char devices, skipping."
285+ def inodes (* inode_types ):
286+ return sorted (
287+ (v for v in self .inodes .values () if v .fs_type in inode_types ),
288+ key = lambda inode : inode .path ,
321289 )
322- else :
323- # then we create devices if we have enough privileges
324- dev_inodes = {
325- k : v
326- for k , v in self .inodes .items ()
327- if v .fs_type in [FSType .BLOCK_DEV , FSType .CHAR_DEV ]
328- }
329- for inode in sorted (dev_inodes .values (), key = lambda inode : inode .path ):
330- self .create_inode (self .extract_root , inode )
331-
332- # then we create links
333- links_inodes = {
334- k : v
335- for k , v in self .inodes .items ()
336- if v .fs_type in [FSType .SYMLINK , FSType .HARD_LINK ]
337- }
338- for inode in sorted (links_inodes .values (), key = lambda inode : inode .path ):
339- self .create_inode (self .extract_root , inode )
290+
291+ # order of file object creation is important
292+ sorted_inodes = (
293+ inodes (FSType .FILE , FSType .DIRECTORY , FSType .FIFO , FSType .SOCKET )
294+ + inodes (FSType .BLOCK_DEV , FSType .CHAR_DEV )
295+ + inodes (FSType .SYMLINK , FSType .HARD_LINK )
296+ )
297+
298+ for inode in sorted_inodes :
299+ self .create_inode (inode )
340300
341301 def __str__ (self ):
342302 return f"signature: { self .signature } \n full_size: { self .full_size } \n checksum: { self .checksum } \n volume_name: { self .volume_name } "
343303
344304
345305class RomfsExtractor (Extractor ):
346306 def extract (self , inpath : Path , outdir : Path ):
307+ fs = FileSystem (outdir )
347308 with File .from_path (inpath ) as f :
348- header = RomFSHeader (f , outdir )
309+ header = RomFSHeader (f , fs )
349310 header .validate ()
350311 header .recursive_walk (header .header_end_offset , None )
351312 header .dump_fs ()
313+ return ExtractResult (reports = list (fs .problems ))
352314
353315
354316class RomFSFSHandler (StructHandler ):
0 commit comments