-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocument_loader.py
More file actions
473 lines (391 loc) · 16.1 KB
/
document_loader.py
File metadata and controls
473 lines (391 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
import hashlib
import logging
import os
from typing import List, Dict, Any
import pandas as pd
import pytesseract
from PIL import Image
# 分块工具
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import (
TextLoader,
PyMuPDFLoader,
Docx2txtLoader,
UnstructuredMarkdownLoader
)
from config import CHUNK_SIZE, CHUNK_OVERLAP, SUPPORTED_EXTS, MAX_FILE_SIZE_MB
# 导入额外的解析库
try:
import fitz # PyMuPDF
except ImportError:
fitz = None
try:
from docx import Document
except ImportError:
Document = None
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_loader(file_path: str):
"""根据文件扩展名选择合适的加载器"""
ext = os.path.splitext(file_path)[-1].lower()
try:
if ext == '.txt':
return TextLoader(file_path, encoding='utf-8')
elif ext == '.pdf':
return PyMuPDFLoader(file_path) # 修复:使用PyMuPDFLoader
elif ext == '.docx':
return Docx2txtLoader(file_path)
elif ext == '.xlsx':
return None # Excel单独处理
elif ext == '.md':
return UnstructuredMarkdownLoader(file_path)
elif ext in ['.png', '.jpg', '.jpeg']:
return None # 图片单独处理
else:
return TextLoader(file_path, encoding='utf-8') # 默认按文本处理
except Exception as e:
logger.error(f"创建加载器失败 {file_path}: {e}")
return None
def ocr_image(file_path: str) -> str:
"""OCR图片提取文本"""
try:
image = Image.open(file_path)
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
return text.strip()
except Exception as e:
logger.error(f"OCR处理失败 {file_path}: {e}")
return ""
def parse_excel(file_path: str) -> str:
"""解析Excel文件"""
try:
# 读取所有sheet
excel_file = pd.ExcelFile(file_path)
all_text = []
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 将DataFrame转换为文本
sheet_text = f"工作表: {sheet_name}\n"
sheet_text += df.to_string(index=False, na_rep='')
all_text.append(sheet_text)
return '\n\n'.join(all_text)
except Exception as e:
logger.error(f"Excel解析失败 {file_path}: {e}")
return ""
def generate_document_id(file_path: str) -> str:
"""生成文档唯一ID"""
# 使用文件路径和修改时间生成唯一ID
stat = os.stat(file_path)
content = f"{file_path}_{stat.st_mtime}_{stat.st_size}"
return hashlib.md5(content.encode()).hexdigest()
def create_chunks_with_metadata(text: str, file_path: str, page_paragraphs: List[Dict] = None) -> List[Dict[str, Any]]:
"""创建带有页码和段落号的分块"""
if not text.strip():
return []
# 分块
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""]
)
chunks = splitter.split_text(text)
# 生成文档ID和基础元数据
document_id = generate_document_id(file_path)
ext = os.path.splitext(file_path)[-1].lower()
base_meta = {
'document_id': document_id,
'document_name': os.path.basename(file_path),
'document_path': file_path,
'file_size': os.path.getsize(file_path),
'file_ext': ext
}
result = []
for i, chunk in enumerate(chunks):
if not chunk.strip():
continue
# 查找该分块对应的页码和段落号
page_num = None
paragraph_num = None
if page_paragraphs:
# 在原文中查找分块的位置
chunk_start = text.find(chunk.strip()[:50]) # 使用前50个字符定位
if chunk_start >= 0:
# 找到最接近的页码和段落号
for pp in page_paragraphs:
if pp['start_pos'] <= chunk_start <= pp['end_pos']:
page_num = pp['page_num']
paragraph_num = pp['paragraph_num']
break
elif chunk_start >= pp['start_pos']:
# 如果没有完全匹配,使用最近的
page_num = pp['page_num']
paragraph_num = pp['paragraph_num']
chunk_meta = base_meta.copy()
chunk_meta['page_num'] = page_num
chunk_meta['paragraph_num'] = paragraph_num
result.append({
'content': chunk.strip(),
'chunk_index': i,
'meta': chunk_meta
})
return result
def parse_pdf_with_structure(file_path: str) -> List[Dict[str, Any]]:
"""解析PDF文件并提取页码信息"""
try:
import fitz # PyMuPDF
doc = fitz.open(file_path)
full_text = ""
page_paragraphs = []
current_pos = 0
for page_num in range(len(doc)):
page = doc.load_page(page_num)
page_text = page.get_text()
if page_text.strip():
# 按段落分割
paragraphs = [p.strip() for p in page_text.split('\n\n') if p.strip()]
for para_num, paragraph in enumerate(paragraphs, 1):
start_pos = current_pos
end_pos = current_pos + len(paragraph)
page_paragraphs.append({
'page_num': page_num + 1,
'paragraph_num': para_num,
'start_pos': start_pos,
'end_pos': end_pos,
'content': paragraph
})
full_text += paragraph + "\n\n"
current_pos = len(full_text)
doc.close()
if not full_text.strip():
logger.warning(f"PDF文档内容为空: {file_path}")
return []
return create_chunks_with_metadata(full_text, file_path, page_paragraphs)
except Exception as e:
logger.error(f"PDF解析失败 {file_path}: {e}")
# 降级到简单解析
return parse_text_with_structure(file_path)
def parse_docx_with_structure(file_path: str) -> List[Dict[str, Any]]:
"""解析Word文档并提取段落信息"""
try:
from docx import Document
doc = Document(file_path)
full_text = ""
page_paragraphs = []
current_pos = 0
for para_num, paragraph in enumerate(doc.paragraphs, 1):
para_text = paragraph.text.strip()
if para_text:
start_pos = current_pos
end_pos = current_pos + len(para_text)
page_paragraphs.append({
'page_num': 1, # Word文档没有明确的页码概念,统一设为1
'paragraph_num': para_num,
'start_pos': start_pos,
'end_pos': end_pos,
'content': para_text
})
full_text += para_text + "\n\n"
current_pos = len(full_text)
if not full_text.strip():
logger.warning(f"Word文档内容为空: {file_path}")
return []
return create_chunks_with_metadata(full_text, file_path, page_paragraphs)
except Exception as e:
logger.error(f"Word文档解析失败 {file_path}: {e}")
# 降级到简单解析
return parse_text_with_structure(file_path)
def parse_excel_with_structure(file_path: str) -> List[Dict[str, Any]]:
"""解析Excel文件并提取工作表信息"""
try:
excel_file = pd.ExcelFile(file_path)
full_text = ""
page_paragraphs = []
current_pos = 0
for sheet_num, sheet_name in enumerate(excel_file.sheet_names, 1):
df = pd.read_excel(file_path, sheet_name=sheet_name)
sheet_text = f"工作表: {sheet_name}\n"
sheet_text += df.to_string(index=False, na_rep='')
if sheet_text.strip():
start_pos = current_pos
end_pos = current_pos + len(sheet_text)
page_paragraphs.append({
'page_num': sheet_num, # 将工作表编号作为页码
'paragraph_num': 1,
'start_pos': start_pos,
'end_pos': end_pos,
'content': sheet_text
})
full_text += sheet_text + "\n\n"
current_pos = len(full_text)
if not full_text.strip():
logger.warning(f"Excel文档内容为空: {file_path}")
return []
return create_chunks_with_metadata(full_text, file_path, page_paragraphs)
except Exception as e:
logger.error(f"Excel解析失败 {file_path}: {e}")
return []
def parse_markdown_with_structure(file_path: str) -> List[Dict[str, Any]]:
"""解析Markdown文件并提取标题层级信息"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
logger.warning(f"Markdown文档内容为空: {file_path}")
return []
# 按标题分段
lines = content.split('\n')
full_text = ""
page_paragraphs = []
current_pos = 0
para_num = 0
current_section = ""
for line in lines:
if line.strip().startswith('#'):
# 新的标题段落
if current_section.strip():
para_num += 1
start_pos = current_pos
end_pos = current_pos + len(current_section)
page_paragraphs.append({
'page_num': 1,
'paragraph_num': para_num,
'start_pos': start_pos,
'end_pos': end_pos,
'content': current_section.strip()
})
full_text += current_section + "\n"
current_pos = len(full_text)
current_section = line + "\n"
else:
current_section += line + "\n"
# 处理最后一段
if current_section.strip():
para_num += 1
start_pos = current_pos
end_pos = current_pos + len(current_section)
page_paragraphs.append({
'page_num': 1,
'paragraph_num': para_num,
'start_pos': start_pos,
'end_pos': end_pos,
'content': current_section.strip()
})
full_text += current_section
return create_chunks_with_metadata(full_text, file_path, page_paragraphs)
except Exception as e:
logger.error(f"Markdown解析失败 {file_path}: {e}")
return []
def parse_text_with_structure(file_path: str) -> List[Dict[str, Any]]:
"""解析文本文件并按段落分割"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
logger.warning(f"文本文档内容为空: {file_path}")
return []
# 按段落分割
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
full_text = ""
page_paragraphs = []
current_pos = 0
for para_num, paragraph in enumerate(paragraphs, 1):
start_pos = current_pos
end_pos = current_pos + len(paragraph)
page_paragraphs.append({
'page_num': 1,
'paragraph_num': para_num,
'start_pos': start_pos,
'end_pos': end_pos,
'content': paragraph
})
full_text += paragraph + "\n\n"
current_pos = len(full_text)
return create_chunks_with_metadata(full_text, file_path, page_paragraphs)
except Exception as e:
logger.error(f"文本解析失败 {file_path}: {e}")
return []
def parse_image_with_structure(file_path: str) -> List[Dict[str, Any]]:
"""解析图片文件并OCR提取文本"""
try:
text = ocr_image(file_path)
if not text.strip():
logger.warning(f"图片OCR内容为空: {file_path}")
return []
# 图片只有一个段落
page_paragraphs = [{
'page_num': 1,
'paragraph_num': 1,
'start_pos': 0,
'end_pos': len(text),
'content': text
}]
return create_chunks_with_metadata(text, file_path, page_paragraphs)
except Exception as e:
logger.error(f"图片解析失败 {file_path}: {e}")
return []
def parse_document(file_path: str) -> List[Dict[str, Any]]:
"""解析单个文档"""
if not os.path.exists(file_path):
logger.error(f"文件不存在: {file_path}")
return []
# 检查文件大小
try:
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
if file_size_mb > MAX_FILE_SIZE_MB:
logger.warning(f"跳过过大文件: {os.path.basename(file_path)} ({file_size_mb:.1f}MB > {MAX_FILE_SIZE_MB}MB)")
return []
except OSError as e:
logger.error(f"无法获取文件大小: {file_path} - {e}")
return []
ext = os.path.splitext(file_path)[-1].lower()
try:
# 根据文件类型解析并获取结构化信息
if ext == '.pdf':
return parse_pdf_with_structure(file_path)
elif ext == '.docx':
return parse_docx_with_structure(file_path)
elif ext == '.xlsx':
return parse_excel_with_structure(file_path)
elif ext == '.md':
return parse_markdown_with_structure(file_path)
elif ext == '.txt':
return parse_text_with_structure(file_path)
elif ext in ['.png', '.jpg', '.jpeg']:
return parse_image_with_structure(file_path)
else:
# 默认按文本处理
return parse_text_with_structure(file_path)
except Exception as e:
logger.error(f"解析文档失败 {file_path}: {e}")
return []
def parse_directory(directory: str) -> List[Dict[str, Any]]:
"""递归解析目录下的所有支持文档"""
if not os.path.exists(directory):
logger.error(f"目录不存在: {directory}")
return []
if not os.path.isdir(directory):
logger.error(f"路径不是目录: {directory}")
return []
all_chunks = []
processed_files = 0
failed_files = 0
logger.info(f"开始解析目录: {directory}")
for root, _, files in os.walk(directory):
for file in files:
ext = os.path.splitext(file)[-1].lower()
if ext in SUPPORTED_EXTS:
file_path = os.path.join(root, file)
try:
chunks = parse_document(file_path)
all_chunks.extend(chunks)
processed_files += 1
if chunks:
logger.info(f"处理文件 {file}: {len(chunks)} 个分块")
else:
logger.warning(f"文件无内容或处理失败: {file}")
failed_files += 1
except Exception as e:
logger.error(f"处理文件失败 {file}: {e}")
failed_files += 1
logger.info(f"目录解析完成: 处理 {processed_files} 个文件, 失败 {failed_files} 个, 总共 {len(all_chunks)} 个分块")
return all_chunks