11"""
22Helper functions related to get files content or storages info.
33"""
4+ from fnmatch import fnmatch
45from os import environ , path
56from pathlib import Path
6- from typing import TypedDict
7+ from typing import Literal , Optional , TypedDict
78
9+ from . import mimetype
810from .config import CONFIG
911from .db_requests import (
1012 get_directory_list ,
11- get_mimetype_id ,
13+ get_fileid_info ,
14+ get_fileids_info ,
1215 get_non_direct_access_filesize_limit ,
1316 get_paths_by_ids ,
1417 get_storages_info ,
@@ -21,7 +24,8 @@ class FsNodeInfo(TypedDict):
2124 id : int
2225 is_dir : bool
2326 is_local : bool
24- mimetype : str
27+ mimetype : int
28+ mimepart : int
2529 name : str
2630 internal_path : str
2731 abs_path : str
@@ -37,52 +41,92 @@ class FsNodeInfo(TypedDict):
3741 direct_access : bool
3842
3943
44+ FsNodeInfoField = Literal ["is_dir" , "is_local" , "mimetype" , "mimepart" , "name" , "direct_access" ]
45+
46+
4047USER_ID = environ .get ("USER_ID" , "" )
41- DIR_MIMETYPE = get_mimetype_id ("'httpd/unix-directory'" )
4248STORAGES_INFO = get_storages_info ()
4349ND_ACCESS_LIMIT = get_non_direct_access_filesize_limit ()
4450"""A value from the config that defines the maximum file size allowed to be requested from php."""
4551
4652
47- def list_directory (file_id : int , user_id = USER_ID ) -> list [FsNodeInfo ]:
53+ def fs_get_obj_info (file_id : int ) -> Optional [FsNodeInfo ]:
54+ raw_result = get_fileid_info (file_id )
55+ if raw_result :
56+ return db_record_to_fs_node (raw_result )
57+ return None
58+
59+
60+ def fs_get_objs_info (file_ids : list [int ]) -> list [FsNodeInfo ]:
61+ raw_result = get_fileids_info (file_ids )
62+ return [db_record_to_fs_node (i ) for i in raw_result ]
63+
64+
65+ def fs_list_directory (file_id : int , user_id = USER_ID ) -> list [FsNodeInfo ]:
4866 _ = user_id # noqa # will be used in 0.4.0 version
4967 dir_info = get_paths_by_ids ([file_id ])
5068 file_mounts = []
5169 if dir_info :
5270 file_mounts = get_mounts_to (dir_info [0 ]["storage" ], dir_info [0 ]["path" ])
5371 raw_result = get_directory_list (file_id , file_mounts )
54- result : list [FsNodeInfo ] = []
55- for i in raw_result :
56- result .append (
57- {
58- "id" : i ["fileid" ],
59- "is_dir" : i ["mimetype" ] == DIR_MIMETYPE ,
60- "is_local" : is_local_storage (i ["storage" ]),
61- "mimetype" : i ["mimetype" ],
62- "name" : i ["name" ],
63- "internal_path" : i ["path" ],
64- "abs_path" : get_file_full_path (i ["storage" ], i ["path" ]),
65- "size" : i ["size" ],
66- "permissions" : i ["permissions" ],
67- "mtime" : i ["mtime" ],
68- "checksum" : i ["checksum" ],
69- "encrypted" : i ["encrypted" ],
70- "etag" : i ["etag" ],
71- "ownerName" : get_storage_user_id (i ["storage" ]),
72- "storageId" : i ["storage" ],
73- "mountId" : get_storage_root_id (i ["storage" ]),
74- "direct_access" : can_directly_access_file (i ),
75- }
76- )
77- return result
78-
79-
80- def get_file_data (file_info : FsNodeInfo ) -> bytes :
72+ return [db_record_to_fs_node (i ) for i in raw_result ]
73+
74+
75+ def fs_apply_exclude_lists (fs_objs : list [FsNodeInfo ], excl_file_ids : list [int ], excl_mask : list [str ]) -> None :
76+ """Purge all records according to exclude_(mask/fileid) from `where_to_purge`(or from fs_records)."""
77+
78+ indexes_to_purge = []
79+ for index , fs_obj in enumerate (fs_objs ):
80+ if fs_obj ["id" ] in excl_file_ids :
81+ indexes_to_purge .append (index )
82+ elif is_path_in_exclude (fs_obj ["internal_path" ], excl_mask ):
83+ indexes_to_purge .append (index )
84+ for index in reversed (indexes_to_purge ):
85+ del fs_objs [index ]
86+
87+
88+ def fs_extract_sub_dirs (fs_objs : list [FsNodeInfo ]) -> list [FsNodeInfo ]:
89+ sub_dirs = []
90+ indexes_to_purge = []
91+ for index , fs_obj in enumerate (fs_objs ):
92+ if fs_obj ["mimetype" ] == mimetype .DIR :
93+ sub_dirs .append (fs_obj )
94+ indexes_to_purge .append (index )
95+ for index in reversed (indexes_to_purge ):
96+ del fs_objs [index ]
97+ return sub_dirs
98+
99+
100+ def fs_apply_ignore_flags (fs_objs : list [FsNodeInfo ]) -> None :
101+ ignore_flag = any (fs_obj ["name" ] in (".noimage" , ".nomedia" ) for fs_obj in fs_objs )
102+ if ignore_flag :
103+ fs_filter_by (fs_objs , "mimepart" , [mimetype .IMAGE , mimetype .VIDEO ], reverse_filter = True )
104+ fs_apply_exclude_lists (fs_objs , [], [".noimage" , ".nomedia" ])
105+
106+
107+ def fs_filter_by (fs_objs : list [FsNodeInfo ], field : FsNodeInfoField , values : list , reverse_filter = False ) -> None :
108+ indexes_to_purge = []
109+ if reverse_filter :
110+ for index , fs_obj in enumerate (fs_objs ):
111+ if fs_obj [field ] in values :
112+ indexes_to_purge .append (index )
113+ else :
114+ for index , fs_obj in enumerate (fs_objs ):
115+ if fs_obj [field ] not in values :
116+ indexes_to_purge .append (index )
117+ for index in reversed (indexes_to_purge ):
118+ del fs_objs [index ]
119+
120+
121+ def fs_sort_by_id (fs_objs : list [FsNodeInfo ]) -> list [FsNodeInfo ]:
122+ return sorted (fs_objs , key = lambda i : i ["id" ])
123+
124+
125+ def fs_get_file_data (file_info : FsNodeInfo ) -> bytes :
81126 if file_info ["direct_access" ]:
82127 try :
83128 with open (file_info ["abs_path" ], "rb" ) as h_file :
84- data = h_file .read ()
85- return data
129+ return h_file .read ()
86130 except Exception : # noqa # pylint: disable=broad-except
87131 log .exception ("Exception during reading %s" , file_info ["abs_path" ])
88132 return request_file_from_php (file_info )
@@ -172,3 +216,36 @@ def get_mounts_to(storage_id: int, dir_path: str) -> list[int]:
172216 if mount_point_with_dir_path == str (Path (storage_info ["mount_point" ]).parent ):
173217 return_list .append (storage_info ["root_id" ])
174218 return return_list
219+
220+
221+ def db_record_to_fs_node (fs_record : dict ) -> FsNodeInfo :
222+ return {
223+ "id" : fs_record ["fileid" ],
224+ "is_dir" : fs_record ["mimetype" ] == mimetype .DIR ,
225+ "is_local" : is_local_storage (fs_record ["storage" ]),
226+ "mimetype" : fs_record ["mimetype" ],
227+ "mimepart" : fs_record ["mimepart" ],
228+ "name" : fs_record ["name" ],
229+ "internal_path" : fs_record ["path" ],
230+ "abs_path" : get_file_full_path (fs_record ["storage" ], fs_record ["path" ]),
231+ "size" : fs_record ["size" ],
232+ "permissions" : fs_record ["permissions" ],
233+ "mtime" : fs_record ["mtime" ],
234+ "checksum" : fs_record ["checksum" ],
235+ "encrypted" : fs_record ["encrypted" ],
236+ "etag" : fs_record ["etag" ],
237+ "ownerName" : get_storage_user_id (fs_record ["storage" ]),
238+ "storageId" : fs_record ["storage" ],
239+ "mountId" : get_storage_root_id (fs_record ["storage" ]),
240+ "direct_access" : can_directly_access_file (fs_record ),
241+ }
242+
243+
244+ def is_path_in_exclude (fs_path : str , exclude_patterns : list [str ]) -> bool :
245+ """Checks with fnmatch if `path` is in `exclude_patterns`. Returns ``True`` if yes."""
246+
247+ name = path .basename (fs_path )
248+ for pattern in exclude_patterns :
249+ if fnmatch (name , pattern ):
250+ return True
251+ return False
0 commit comments