Skip to content

Commit 214470e

Browse files
authored
update tools (#16)
1 parent b604aa8 commit 214470e

File tree

2 files changed

+608
-0
lines changed

2 files changed

+608
-0
lines changed

tools/classify_image.py

Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
"""
4+
根据文件创建日期自动分类文件的工具
5+
支持按 YYYY/MM 格式将文件分类到目标目录
6+
"""
7+
8+
import os
9+
import sys
10+
import shutil
11+
import argparse
12+
from pathlib import Path
13+
from datetime import datetime
14+
from typing import Optional, Tuple
15+
import logging
16+
17+
# 配置日志
18+
logging.basicConfig(
19+
level=logging.INFO,
20+
format='%(asctime)s - %(levelname)s - %(message)s',
21+
datefmt='%Y-%m-%d %H:%M:%S'
22+
)
23+
logger = logging.getLogger(__name__)
24+
25+
26+
def get_file_creation_time(file_path: Path) -> Optional[datetime]:
27+
"""
28+
获取文件的创建时间
29+
30+
在 macOS 上使用 st_birthtime
31+
在其他系统上尝试使用 st_ctime
32+
33+
Args:
34+
file_path: 文件路径
35+
36+
Returns:
37+
文件创建时间的 datetime 对象,如果获取失败则返回 None
38+
"""
39+
try:
40+
stat_info = os.stat(file_path)
41+
# macOS 使用 st_birthtime 获取创建时间
42+
if hasattr(stat_info, 'st_birthtime'):
43+
timestamp = stat_info.st_birthtime
44+
else:
45+
# 其他系统使用 st_ctime(在某些系统上是创建时间,某些是修改时间)
46+
timestamp = stat_info.st_ctime
47+
return datetime.fromtimestamp(timestamp)
48+
except (OSError, IOError) as e:
49+
logger.warning(f"无法获取文件 {file_path} 的创建时间: {e}")
50+
return None
51+
52+
53+
def get_target_path(root_dir: Path, file_date: datetime, filename: str) -> Path:
54+
"""
55+
根据文件创建日期生成目标路径
56+
57+
Args:
58+
root_dir: 目标根目录
59+
file_date: 文件创建日期
60+
filename: 文件名
61+
62+
Returns:
63+
完整的目标路径 (root_dir/YYYY/MM/filename)
64+
"""
65+
year = file_date.strftime('%Y')
66+
month = file_date.strftime('%m')
67+
return root_dir / year / month / filename
68+
69+
70+
def resolve_conflict(target_path: Path) -> Path:
71+
"""
72+
解决目标路径文件名冲突
73+
74+
如果目标文件已存在,在文件名后添加数字后缀
75+
76+
Args:
77+
target_path: 原始目标路径
78+
79+
Returns:
80+
解决冲突后的路径
81+
"""
82+
if not target_path.exists():
83+
return target_path
84+
85+
base = target_path.stem
86+
suffix = target_path.suffix
87+
parent = target_path.parent
88+
counter = 1
89+
90+
while True:
91+
new_name = f"{base}_{counter}{suffix}"
92+
new_path = parent / new_name
93+
if not new_path.exists():
94+
return new_path
95+
counter += 1
96+
97+
98+
def scan_directory(source_dir: Path, extensions: Optional[Tuple[str, ...]] = None) -> list[Path]:
99+
"""
100+
扫描目录获取所有文件
101+
102+
Args:
103+
source_dir: 源目录
104+
extensions: 可选的文件扩展名过滤,如 ('.jpg', '.png', '.heic')
105+
106+
Returns:
107+
文件路径列表
108+
"""
109+
files = []
110+
111+
if not source_dir.exists():
112+
logger.error(f"源目录不存在: {source_dir}")
113+
return files
114+
115+
if not source_dir.is_dir():
116+
logger.error(f"源路径不是目录: {source_dir}")
117+
return files
118+
119+
logger.info(f"开始扫描目录: {source_dir}")
120+
121+
for item in source_dir.rglob('*'):
122+
if item.is_file():
123+
if extensions is None or item.suffix.lower() in extensions:
124+
files.append(item)
125+
126+
logger.info(f"找到 {len(files)} 个文件")
127+
return files
128+
129+
130+
def classify_files(
131+
source_dir: Path,
132+
target_dir: Path,
133+
extensions: Optional[Tuple[str, ...]] = None,
134+
dry_run: bool = False,
135+
copy: bool = False,
136+
skip_existing: bool = False
137+
) -> dict:
138+
"""
139+
分类文件到目标目录
140+
141+
Args:
142+
source_dir: 源目录
143+
target_dir: 目标目录
144+
extensions: 文件扩展名过滤
145+
dry_run: 预览模式,不实际移动文件
146+
copy: 复制而非移动文件
147+
skip_existing: 跳过已存在的文件
148+
149+
Returns:
150+
统计信息字典
151+
"""
152+
# 扫描源目录
153+
files = scan_directory(source_dir, extensions)
154+
155+
stats = {
156+
'total': len(files),
157+
'processed': 0,
158+
'skipped': 0,
159+
'failed': 0,
160+
'errors': []
161+
}
162+
163+
for file_path in files:
164+
# 获取创建时间
165+
creation_time = get_file_creation_time(file_path)
166+
if creation_time is None:
167+
stats['skipped'] += 1
168+
continue
169+
170+
# 生成目标路径
171+
target_path = get_target_path(target_dir, creation_time, file_path.name)
172+
173+
# 检查目标文件是否已存在
174+
if target_path.exists():
175+
if skip_existing:
176+
logger.debug(f"文件已存在,跳过: {target_path}")
177+
stats['skipped'] += 1
178+
continue
179+
else:
180+
target_path = resolve_conflict(target_path)
181+
182+
# 创建目标目录
183+
target_parent = target_path.parent
184+
if not dry_run:
185+
target_parent.mkdir(parents=True, exist_ok=True)
186+
187+
# 执行操作
188+
action = "复制" if copy else "移动"
189+
mode_str = "[预览] " if dry_run else ""
190+
191+
logger.info(f"{mode_str}{action}: {file_path} -> {target_path}")
192+
193+
if not dry_run:
194+
try:
195+
if copy:
196+
shutil.copy2(file_path, target_path)
197+
else:
198+
shutil.move(str(file_path), str(target_path))
199+
stats['processed'] += 1
200+
except Exception as e:
201+
logger.error(f"处理文件失败 {file_path}: {e}")
202+
stats['failed'] += 1
203+
stats['errors'].append(str(file_path))
204+
else:
205+
stats['processed'] += 1
206+
207+
return stats
208+
209+
210+
def parse_arguments() -> argparse.Namespace:
211+
"""
212+
解析命令行参数
213+
214+
支持的格式:
215+
1. 位置参数:python classify_image.py <source_dir> <target_dir>
216+
2. 配置文件:python classify_image.py --config config.json
217+
3. 完整参数:python classify_image.py --source /path/to/source --target /path/to/target ...
218+
"""
219+
parser = argparse.ArgumentParser(
220+
description='根据文件创建日期自动分类文件',
221+
formatter_class=argparse.RawDescriptionHelpFormatter,
222+
epilog="""
223+
示例:
224+
# 预览模式(推荐先运行)
225+
python classify_image.py /Volumes/SD Card/DCIM ~/Pictures/Sorted --dry-run
226+
227+
# 移动文件
228+
python classify_image.py ~/Downloads/Photos ~/Pictures/Sorted
229+
230+
# 只处理图片文件
231+
python classify_image.py ~/Downloads/Photos ~/Pictures/Sorted --extensions .jpg .png .heic
232+
233+
# 复制而非移动
234+
python classify_image.py ~/Downloads/Photos ~/Pictures/Sorted --copy
235+
236+
# 跳过已存在的文件
237+
python classify_image.py ~/Downloads/Photos ~/Pictures/Sorted --skip-existing
238+
"""
239+
)
240+
241+
parser.add_argument(
242+
'source',
243+
nargs='?',
244+
help='源目录路径'
245+
)
246+
247+
parser.add_argument(
248+
'target',
249+
nargs='?',
250+
help='目标目录路径'
251+
)
252+
253+
parser.add_argument(
254+
'--source-dir',
255+
type=str,
256+
help='源目录路径(与位置参数二选一)'
257+
)
258+
259+
parser.add_argument(
260+
'--target-dir',
261+
type=str,
262+
help='目标目录路径(与位置参数二选一)'
263+
)
264+
265+
parser.add_argument(
266+
'--extensions',
267+
nargs='+',
268+
default=None,
269+
help='要处理的文件扩展名(如 .jpg .png .heic),默认处理所有文件'
270+
)
271+
272+
parser.add_argument(
273+
'--dry-run',
274+
action='store_true',
275+
help='预览模式,不实际移动/复制文件'
276+
)
277+
278+
parser.add_argument(
279+
'--copy',
280+
action='store_true',
281+
help='复制文件而非移动'
282+
)
283+
284+
parser.add_argument(
285+
'--skip-existing',
286+
action='store_true',
287+
help='跳过目标目录中已存在的文件'
288+
)
289+
290+
parser.add_argument(
291+
'-v', '--verbose',
292+
action='store_true',
293+
help='详细输出'
294+
)
295+
296+
return parser.parse_args()
297+
298+
299+
def main():
300+
"""主函数"""
301+
args = parse_arguments()
302+
303+
# 设置日志级别
304+
if args.verbose:
305+
logging.getLogger().setLevel(logging.DEBUG)
306+
307+
# 获取源目录和目标目录
308+
source_dir = Path(args.source or args.source_dir)
309+
target_dir = Path(args.target or args.target_dir)
310+
311+
# 验证参数
312+
if not source_dir or not target_dir:
313+
logger.error("必须指定源目录和目标目录")
314+
logger.error("使用 --help 查看帮助信息")
315+
sys.exit(1)
316+
317+
# 转换为绝对路径,支持 ~ 和外接硬盘
318+
source_dir = source_dir.expanduser().resolve()
319+
target_dir = target_dir.expanduser().resolve()
320+
321+
# 验证源目录
322+
if not source_dir.exists():
323+
logger.error(f"源目录不存在: {source_dir}")
324+
sys.exit(1)
325+
326+
# 处理扩展名参数
327+
extensions = None
328+
if args.extensions:
329+
extensions = tuple(ext if ext.startswith('.') else f'.{ext}' for ext in args.extensions)
330+
logger.info(f"只处理以下扩展名的文件: {', '.join(extensions)}")
331+
332+
# 显示配置
333+
logger.info("=" * 50)
334+
logger.info("文件分类工具")
335+
logger.info("=" * 50)
336+
logger.info(f"源目录: {source_dir}")
337+
logger.info(f"目标目录: {target_dir}")
338+
logger.info(f"模式: {'预览' if args.dry_run else '执行'}")
339+
logger.info(f"操作: {'复制' if args.copy else '移动'}")
340+
if args.skip_existing:
341+
logger.info(f"已存在文件: 跳过")
342+
logger.info("=" * 50)
343+
344+
# 确认执行(非预览模式)
345+
if not args.dry_run:
346+
response = input(f"确定要{'复制' if args.copy else '移动'}文件吗?(y/N): ")
347+
if response.lower() != 'y':
348+
logger.info("操作已取消")
349+
sys.exit(0)
350+
351+
# 执行分类
352+
stats = classify_files(
353+
source_dir=source_dir,
354+
target_dir=target_dir,
355+
extensions=extensions,
356+
dry_run=args.dry_run,
357+
copy=args.copy,
358+
skip_existing=args.skip_existing
359+
)
360+
361+
# 显示统计
362+
logger.info("=" * 50)
363+
logger.info("统计信息")
364+
logger.info("=" * 50)
365+
logger.info(f"总文件数: {stats['total']}")
366+
logger.info(f"已处理: {stats['processed']}")
367+
logger.info(f"已跳过: {stats['skipped']}")
368+
logger.info(f"失败: {stats['failed']}")
369+
370+
if stats['errors']:
371+
logger.warning("失败的文件:")
372+
for error in stats['errors']:
373+
logger.warning(f" - {error}")
374+
375+
logger.info("=" * 50)
376+
377+
if args.dry_run:
378+
logger.info("预览模式完成,使用 --dry-run=false 实际执行")
379+
380+
381+
if __name__ == '__main__':
382+
main()

0 commit comments

Comments
 (0)