Skip to content

Commit 6e9d465

Browse files
authored
update processing (#7743)
* update processing * modify pdf * modify pdf * modify pdf * modify pdf * update pdf
1 parent b055be6 commit 6e9d465

File tree

2 files changed

+125
-84
lines changed

2 files changed

+125
-84
lines changed

pipelines/pipelines/nodes/file_converter/pdf.py

Lines changed: 13 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ def extract_pages(page_list, file_path):
4040
end = page_list[1]
4141
page_text = []
4242
pdf = pypdf.PdfReader(file_path)
43-
for page in pdf.pages[start:end]:
44-
paragraphs = page.extract_text()
45-
page_text.append(paragraphs)
43+
for index, page in enumerate(pdf.pages[start:end]):
44+
try:
45+
paragraphs = page.extract_text()
46+
paragraphs = paragraphs.encode("UTF-8", "ignore").decode("UTF-8")
47+
page_text.append(paragraphs)
48+
except Exception as e:
49+
logger.warning("Page %d of the file cannot be parsed correctly %s" % (index + start + 1, str(e)))
4650
return page_text
4751

4852

@@ -109,58 +113,14 @@ def convert(
109113
not one of the valid languages, then it might likely be encoding error resulting
110114
in garbled text.
111115
"""
112-
pages = self._read_pdf(file_path, layout=False, process_num=process_num)
113-
if remove_numeric_tables is None:
114-
remove_numeric_tables = self.remove_numeric_tables
115-
if valid_languages is None:
116-
valid_languages = self.valid_languages
117-
if language is None:
118-
language = self.language
119-
cleaned_pages = []
116+
pages = self._read_pdf(file_path, process_num=process_num)
117+
documents = []
120118
for page in pages:
121-
# pdftotext tool provides an option to retain the original physical layout of a PDF page. This behaviour
122-
# can be toggled by using the layout param.
123-
# layout=True
124-
# + table structures get retained better
125-
# - multi-column pages(eg, research papers) gets extracted with text from multiple columns on same line
126-
# layout=False
127-
# + keeps strings in content stream order, hence multi column layout works well
128-
# - cells of tables gets split across line
129-
#
130-
# Here, as a "safe" default, layout is turned off.
131-
lines = page.splitlines()
132-
133-
cleaned_lines = []
134-
for line in lines:
135-
if self.language == "chinese":
136-
words = list(line)
137-
else:
138-
words = line.split()
139-
digits = [word for word in words if any(i.isdigit() for i in word)]
140-
141-
# remove lines having > 40% of words as digits AND not ending with a period(.)
142-
if remove_numeric_tables:
143-
if words and len(digits) / len(words) > 0.4 and not line.strip().endswith("."):
144-
logger.debug(f"Removing line '{line}' from {file_path}")
145-
continue
146-
cleaned_lines.append(line)
147-
148-
page = "\n".join(cleaned_lines)
149-
cleaned_pages.append(page)
150-
151-
if valid_languages:
152-
document_text = "".join(cleaned_pages)
153-
if not self.validate_language(document_text, valid_languages):
154-
logger.warning(
155-
f"The language for {file_path} is not one of {valid_languages}. The file may not have "
156-
f"been decoded in the correct text format."
157-
)
158-
159-
text = "\f".join(cleaned_pages)
160-
document = {"content": text, "content_type": "text", "meta": meta}
161-
return [document]
119+
document = {"content": page, "content_type": "text", "meta": meta}
120+
documents.append(document)
121+
return documents
162122

163-
def _read_pdf(self, file_path: Path, layout: bool, process_num: int) -> List[str]:
123+
def _read_pdf(self, file_path: Path, process_num: int) -> List[str]:
164124
"""
165125
Extract pages from the pdf file at file_path.
166126

pipelines/pipelines/utils/preprocessing.py

Lines changed: 112 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import functools
1516
import logging
17+
import multiprocessing
18+
import os
1619
import re
1720
from pathlib import Path
1821
from typing import Callable, Dict, List, Optional
@@ -35,6 +38,85 @@
3538
)
3639

3740
logger = logging.getLogger(__name__)
41+
import copy
42+
43+
44+
def document_rough_split(document_list, max_token=4500):
45+
document_index_rough = []
46+
for item in document_list:
47+
if len(item["content"]) < max_token or "\n" in item:
48+
document_index_rough.append(item)
49+
else:
50+
all_token = len(item["content"])
51+
token_index = [i for i in range(0, all_token + 1, max_token)]
52+
if all_token > token_index[-1]:
53+
token_index.append(all_token)
54+
token_index_combine = [item["content"][start:end] for start, end in zip(token_index, token_index[1:])]
55+
for txt in token_index_combine:
56+
txt_split = copy.deepcopy(item)
57+
txt_split["content"] = txt
58+
document_index_rough.append(txt_split)
59+
return document_index_rough
60+
61+
62+
def split_document(document_index, all_document, split_text, split_paragraphs: bool, clean_func, path, split_answers):
63+
start = document_index[0]
64+
end = document_index[1]
65+
documents = []
66+
for item in all_document[start:end]:
67+
text = item["content"]
68+
if clean_func:
69+
text = clean_func(text)
70+
if split_paragraphs is True:
71+
text_splits = split_text.split_text(text)
72+
for txt in text_splits:
73+
if not txt.strip(): # skip empty paragraphs
74+
continue
75+
if split_answers:
76+
query, answer = txt.split("\t")
77+
meta_data = {"name": path.name, "answer": answer}
78+
# Add image list parsed from docx into meta
79+
if item["meta"] is not None and "images" in item["meta"]:
80+
meta_data["images"] = item["meta"]["images"]
81+
documents.append({"content": query, "meta": meta_data})
82+
else:
83+
meta_data = {
84+
"name": path.name,
85+
}
86+
# Add image list parsed from docx into meta
87+
if item["meta"] is not None and "images" in item["meta"]:
88+
meta_data["images"] = item["meta"]["images"]
89+
documents.append({"content": txt, "meta": meta_data})
90+
else:
91+
documents.append({"content": text, "meta": item["meta"] if "meta" in item else {"name": path.name}})
92+
return documents
93+
94+
95+
def run_process(
96+
document_combination_index,
97+
list_documents,
98+
split_text,
99+
process_num,
100+
split_paragraphs,
101+
clean_func,
102+
path,
103+
split_answers,
104+
):
105+
process_num = min(os.cpu_count(), process_num)
106+
pool = multiprocessing.Pool(process_num)
107+
split_document_c = functools.partial(
108+
split_document,
109+
all_document=list_documents,
110+
split_text=split_text,
111+
split_paragraphs=split_paragraphs,
112+
clean_func=clean_func,
113+
path=path,
114+
split_answers=split_answers,
115+
)
116+
result = pool.map_async(split_document_c, document_combination_index)
117+
pool.close()
118+
pool.join()
119+
return result.get()
38120

39121

40122
def convert_files_to_dicts(
@@ -43,6 +125,7 @@ def convert_files_to_dicts(
43125
split_paragraphs: bool = False,
44126
split_answers: bool = False,
45127
encoding: Optional[str] = None,
128+
process_num: int = 20,
46129
) -> List[dict]:
47130
"""
48131
Convert all files(.txt, .pdf, .docx) in the sub-directories of the given path to Python dicts that can be written to a
@@ -136,6 +219,7 @@ def convert_files_to_dicts_splitter(
136219
chunk_size: int = 300,
137220
chunk_overlap: int = 0,
138221
language: str = "chinese",
222+
process_num: int = 10,
139223
) -> List[dict]:
140224
"""
141225
Convert all files(.txt, .pdf, .docx) in the sub-directories of the given path to Python dicts that can be written to a
@@ -184,6 +268,9 @@ def convert_files_to_dicts_splitter(
184268
docx_splitter = SpacyTextSplitter(
185269
separator=separator, filters=filters, chunk_size=chunk_size, chunk_overlap=chunk_overlap
186270
)
271+
pdf_splitter = SpacyTextSplitter(
272+
separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, filters=filters
273+
)
187274
else:
188275
docx_splitter = SpacyTextSplitter(
189276
separator=separator,
@@ -192,12 +279,13 @@ def convert_files_to_dicts_splitter(
192279
chunk_overlap=chunk_overlap,
193280
pipeline="en_core_web_sm",
194281
)
282+
pdf_splitter = SpacyTextSplitter(
283+
separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, filters=filters
284+
)
195285
text_splitter = CharacterTextSplitter(
196286
separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, filters=filters
197287
)
198-
pdf_splitter = CharacterTextSplitter(
199-
separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, filters=filters
200-
)
288+
201289
imgage_splitter = CharacterTextSplitter(
202290
separator=separator, chunk_size=chunk_size, chunk_overlap=chunk_overlap, filters=filters
203291
)
@@ -230,34 +318,27 @@ def convert_files_to_dicts_splitter(
230318
encoding=encoding,
231319
language=language,
232320
)
233-
for document in list_documents:
234-
text = document["content"]
235-
if clean_func:
236-
text = clean_func(text)
237-
if split_paragraphs is True:
238-
text_splits = suffix2splitter[suffix].split_text(text)
239-
for txt in text_splits:
240-
if not txt.strip(): # skip empty paragraphs
241-
continue
242-
if split_answers:
243-
query, answer = txt.split("\t")
244-
meta_data = {"name": path.name, "answer": answer}
245-
# Add image list parsed from docx into meta
246-
if document["meta"] is not None and "images" in document["meta"]:
247-
meta_data["images"] = document["meta"]["images"]
248-
documents.append({"content": query, "meta": meta_data})
249-
else:
250-
meta_data = {
251-
"name": path.name,
252-
}
253-
# Add image list parsed from docx into meta
254-
if document["meta"] is not None and "images" in document["meta"]:
255-
meta_data["images"] = document["meta"]["images"]
256-
documents.append({"content": txt, "meta": meta_data})
257-
else:
258-
documents.append(
259-
{"content": text, "meta": document["meta"] if "meta" in document else {"name": path.name}}
260-
)
321+
list_documents = document_rough_split(list_documents)
322+
document_number = len(list_documents)
323+
split_len = document_number // process_num
324+
if split_len == 0:
325+
split_len = document_number
326+
document_list = [i for i in range(0, document_number, split_len)]
327+
if document_number > document_list[-1]:
328+
document_list.append(document_number)
329+
document_combination_index = [(start, end) for start, end in zip(document_list, document_list[1:])]
330+
document_mul = run_process(
331+
document_combination_index=document_combination_index,
332+
list_documents=list_documents,
333+
split_text=suffix2splitter[suffix],
334+
process_num=process_num,
335+
split_paragraphs=split_paragraphs,
336+
clean_func=clean_func,
337+
path=path,
338+
split_answers=split_answers,
339+
)
340+
for item in document_mul:
341+
documents.extend(item)
261342
if filters is not None and len(filters) > 0:
262343
documents = clean(documents, filters)
263344
return documents

0 commit comments

Comments
 (0)