44import shutil
55import uuid
66import hashlib
7- from typing import Iterable , Tuple , Optional
7+ from typing import Iterable , Tuple , Optional , Union
88
99option = parser .options ("file" )
1010print ("[syncFile] is imported.\n " )
@@ -25,10 +25,11 @@ class Component:
2525 """
2626
2727 def __init__ (self , path = "" , top_path = None ):
28- if top_path :
29- self .root_path = self .ajoin (top_path , path )
30- else :
31- self .root_path = self .ajoin (option ["path" ], path )
28+ if not top_path :
29+ top_path = option ["path" ]
30+
31+ self .root_path = self .ajoin (top_path , path )
32+ self .rel_root_path = self .join (top_path , path )
3233
3334 self .write_chunk = self .read_chunk = 65536
3435 self .write_fp = self .read_fp = None
@@ -59,7 +60,18 @@ def join(*args) -> str:
5960 return os .path .normpath (os .path .join (* args ))
6061
6162 @staticmethod
62- def ensure_dir (path : str ) -> bool :
63+ def touch (path : str ) -> None :
64+ """
65+
66+ 快速创建一个空文件
67+
68+ :param path: 文件路径
69+ :return: None
70+
71+ """
72+ open (path , "wb" ).close ()
73+
74+ def ensure_dir (self , path : Union [str ] = None ) -> bool :
6375 """
6476
6577 确保文件夹存在,不存在则创建它
@@ -68,24 +80,15 @@ def ensure_dir(path: str) -> bool:
6880 :return: 返回判断之前是否存在该文件
6981
7082 """
83+ if path is None :
84+ path = self .root_path
85+
7186 if os .path .exists (path ):
7287 return True
7388 else :
7489 os .makedirs (path )
7590 return False
7691
77- @staticmethod
78- def touch (path : str ) -> None :
79- """
80-
81- 快速创建一个空文件
82-
83- :param path: 文件路径
84- :return: None
85-
86- """
87- open (path , "wb" ).close ()
88-
8992 def get_safe_path (self , * args ) -> str :
9093 """
9194
@@ -246,7 +249,7 @@ def receive_file(self, files, arg_name="image", is_only=True, file_name=None) ->
246249 :param arg_name: 参数名称
247250 :param is_only: 是否生成唯一ID文件名
248251 :param file_name: 当is_only=False时,提供的自定义文件名称,如果不提供则来自上传的文件名,拓展名以上传文件为准无需附加
249- :return: (文件名称[不携带拓展名],文件保存本地地址, 文件拓展名)
252+ :return: (文件名称[不携带拓展名],文件拓展名,文件相对服务器保存路径 )
250253
251254 """
252255 file_list = files .get (arg_name , None )
@@ -273,4 +276,56 @@ def receive_file(self, files, arg_name="image", is_only=True, file_name=None) ->
273276 save_path = self .get_safe_path (name + source_ext )
274277 self .write (save_path , source_file .body )
275278
276- return name , source_ext , save_path
279+ return name , source_ext , self .join (self .rel_root_path , name + source_ext )
280+
281+ @staticmethod
282+ def rollback_receive_file (path : str ) -> None :
283+ """
284+
285+ 有时接收文件后将接收的文件名传入到数据库中,如果这时数据库错误需要回退操作,那么你可以使用该方法删除接收的文件,实现回退
286+
287+ :param path: 文件路径
288+ :return: None
289+
290+ """
291+ try :
292+ os .remove (path )
293+ except FileNotFoundError :
294+ pass
295+
296+
297+ def insert2fp (file_path , offset , content , per_size = 2048 ):
298+ """
299+
300+ 允许你在文件指定位置进行内容插入
301+
302+ :param file_path: 文件路径
303+ :param offset: 文件偏移位置
304+ :param content: 插入的内容
305+ :param per_size: 每片读取大小限制
306+ :return: None
307+
308+ """
309+ copies = offset // per_size
310+
311+ f_dir , f_name = os .path .split (file_path )
312+ temp_path = os .path .join (f_dir , f_name + ".temp" )
313+
314+ with open (temp_path , "w" ) as w_fp :
315+ with open (file_path , "r" ) as fp :
316+ fp .seek (0 )
317+
318+ for c in range (1 , copies + 1 + int (offset % per_size > 0 )):
319+ if c * per_size >= offset :
320+ result = fp .read (offset - fp .tell ())
321+ else :
322+ result = fp .read (per_size )
323+ w_fp .write (result )
324+
325+ w_fp .write (content )
326+
327+ for c in fp :
328+ w_fp .write (c )
329+
330+ os .remove (file_path )
331+ os .rename (temp_path , file_path )
0 commit comments