Skip to content

Commit bf88cbd

Browse files
z275748353zhanglongbin
andauthored
Fix the bug where tasks get stuck when the trunk parameter configuration is unreasonable. (#65)
* Fix the bug of dataflow with ID #34 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug of dataflow with ID #57 #59 #60 #62 * Fix the bug where tasks get stuck when the trunk parameter configuration is unreasonable. --------- Co-authored-by: zhanglongbin <[email protected]>
1 parent 3fce539 commit bf88cbd

File tree

4 files changed

+193
-67
lines changed

4 files changed

+193
-67
lines changed

data_engine/ops/mapper/md_to_jsonl_chunk_mapper.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from jsonargparse.typing import PositiveInt, NonNegativeInt
2+
from loguru import logger
23

34
from data_engine.utils.availability_utils import AvailabilityChecking
45
from data_engine.utils.model_utils import get_model, prepare_model
@@ -38,10 +39,25 @@ def __init__(self,
3839
super().__init__(*args, **kwargs)
3940
self.chunk_size = chunk_size
4041
self.overlap = overlap
41-
# Ensure overlap is less than chunk_size
42+
self.hf_tokenizer = hf_tokenizer
43+
44+
# Validate parameters and warn if unreasonable
4245
if self.overlap >= self.chunk_size:
46+
logger.warning(
47+
f"Unreasonable parameter: overlap ({self.overlap}) >= chunk_size ({self.chunk_size}). "
48+
f"This will cause overlap to be larger than chunk itself. "
49+
f"Auto-adjusting overlap to 0. "
50+
f"Recommendation: Set overlap < chunk_size (ideally < chunk_size/2)."
51+
)
4352
self.overlap = 0
44-
self.hf_tokenizer = hf_tokenizer
53+
elif self.overlap > self.chunk_size / 2:
54+
logger.warning(
55+
f"Unreasonable parameter: overlap ({self.overlap}) > chunk_size/2 ({self.chunk_size/2:.0f}). "
56+
f"Large overlap ratio may reduce processing efficiency. "
57+
f"Task will continue with current settings. "
58+
f"Recommendation: Consider reducing overlap to < {self.chunk_size/2:.0f} for better efficiency."
59+
)
60+
4561
self.model_key = prepare_model(
4662
model_type='huggingface',
4763
pretrained_model_name_or_path=hf_tokenizer,
@@ -58,8 +74,16 @@ def _split_text_by_tokens(self, text, tokenizer):
5874
if not text or not text.strip():
5975
return []
6076

77+
# Ensure text is properly encoded as UTF-8 string (cross-platform compatible)
78+
if isinstance(text, bytes):
79+
text = text.decode('utf-8', errors='replace')
80+
6181
# Tokenize the entire text
62-
tokens = tokenizer.encode(text, add_special_tokens=False)
82+
try:
83+
tokens = tokenizer.encode(text, add_special_tokens=False)
84+
except Exception as e:
85+
# If tokenization fails, return original text
86+
return [text]
6387

6488
chunks = []
6589
# Calculate step size: if overlap > 0, each chunk moves forward by (chunk_size - overlap)
@@ -68,21 +92,28 @@ def _split_text_by_tokens(self, text, tokenizer):
6892

6993
for i in range(0, len(tokens), step_size):
7094
chunk_tokens = tokens[i:i + self.chunk_size]
71-
# Decode tokens back to text with error handling
95+
# Decode tokens back to text with robust error handling for cross-platform compatibility
7296
try:
7397
chunk_text = tokenizer.decode(
7498
chunk_tokens,
7599
skip_special_tokens=True,
76100
clean_up_tokenization_spaces=False
77101
)
78102
# Ensure valid UTF-8 encoding (replace broken characters instead of ignoring)
79-
# This preserves more content while fixing encoding issues
80-
chunk_text = chunk_text.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
103+
# This is critical for cross-platform compatibility (Windows/Linux)
104+
if isinstance(chunk_text, bytes):
105+
chunk_text = chunk_text.decode('utf-8', errors='replace')
106+
else:
107+
# Re-encode and decode to fix any encoding issues
108+
chunk_text = chunk_text.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
81109
except Exception:
82110
# Fallback: if decode fails, try standard decode
83111
try:
84112
chunk_text = tokenizer.decode(chunk_tokens, skip_special_tokens=True)
85-
chunk_text = chunk_text.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
113+
if isinstance(chunk_text, bytes):
114+
chunk_text = chunk_text.decode('utf-8', errors='replace')
115+
else:
116+
chunk_text = chunk_text.encode('utf-8', errors='replace').decode('utf-8', errors='replace')
86117
except Exception:
87118
# If still fails, skip this chunk
88119
continue

data_engine/ops/mapper/md_to_jsonl_sentence_chunk_mapper.py

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from jsonargparse.typing import PositiveInt, NonNegativeInt
22
import regex as re
3+
from loguru import logger
34

45
from data_engine.utils.availability_utils import AvailabilityChecking
56
from data_engine.utils.model_utils import get_model, prepare_model
@@ -53,12 +54,34 @@ def __init__(self,
5354
super().__init__(*args, **kwargs)
5455
self.chunk_size = chunk_size
5556
self.chunk_overlap = chunk_overlap
56-
# Ensure overlap is less than chunk_size
57-
if self.chunk_overlap >= self.chunk_size:
58-
self.chunk_overlap = 0
5957
self.min_sentences_per_chunk = min_sentences_per_chunk
6058
self.hf_tokenizer = hf_tokenizer
6159

60+
# Validate parameters and warn if unreasonable
61+
if self.chunk_overlap >= self.chunk_size:
62+
logger.warning(
63+
f"Unreasonable parameter: chunk_overlap ({self.chunk_overlap}) >= chunk_size ({self.chunk_size}). "
64+
f"This will cause overlap to be larger than chunk itself. "
65+
f"Auto-adjusting chunk_overlap to 0. "
66+
f"Recommendation: Set chunk_overlap < chunk_size (ideally < chunk_size/2)."
67+
)
68+
self.chunk_overlap = 0
69+
elif self.chunk_overlap > self.chunk_size / 2:
70+
logger.warning(
71+
f"Unreasonable parameter: chunk_overlap ({self.chunk_overlap}) > chunk_size/2 ({self.chunk_size/2:.0f}). "
72+
f"Large overlap may cause frequent chunk overflow in sentence-based chunking. "
73+
f"Task will continue, but expect more ERROR logs about chunk overflow. "
74+
f"Recommendation: Reduce chunk_overlap to < {self.chunk_size/2:.0f} or increase chunk_size."
75+
)
76+
77+
if self.min_sentences_per_chunk < 1:
78+
logger.warning(
79+
f"Unreasonable parameter: min_sentences_per_chunk ({self.min_sentences_per_chunk}) < 1. "
80+
f"Auto-adjusting to 1. "
81+
f"Each chunk must contain at least 1 sentence."
82+
)
83+
self.min_sentences_per_chunk = 1
84+
6285
# Prepare tokenizer model
6386
self.tokenizer_model_key = prepare_model(
6487
model_type='huggingface',
@@ -77,16 +100,28 @@ def _split_text_by_sentences(self, text, tokenizer):
77100
if not text or not text.strip():
78101
return []
79102

103+
# Ensure text is properly encoded as UTF-8 string (cross-platform compatible)
104+
if isinstance(text, bytes):
105+
text = text.decode('utf-8', errors='replace')
106+
80107
# First, split text into sentences using mixed Chinese-English segmentation
81108
sentences = split_sentence_mixed(text)
82109

83110
if not sentences:
84111
return [text] if text.strip() else []
85112

86-
# Calculate token count for each sentence
113+
# Calculate token count for each sentence with error handling
87114
sentence_tokens = []
88115
for sentence in sentences:
89-
tokens = tokenizer.encode(sentence, add_special_tokens=False)
116+
try:
117+
# Ensure sentence is UTF-8 string before tokenization
118+
if isinstance(sentence, bytes):
119+
sentence = sentence.decode('utf-8', errors='replace')
120+
tokens = tokenizer.encode(sentence, add_special_tokens=False)
121+
except Exception:
122+
# If tokenization fails, use length as approximation
123+
tokens = list(range(len(sentence)))
124+
90125
sentence_tokens.append({
91126
'text': sentence,
92127
'tokens': tokens,
@@ -104,6 +139,15 @@ def create_chunk():
104139
# Use empty string join for better Chinese support
105140
chunk_text = ''.join(current_chunk_sentences)
106141
if chunk_text.strip():
142+
# Log if chunk exceeds chunk_size
143+
if current_chunk_token_count > self.chunk_size:
144+
overflow_amount = current_chunk_token_count - self.chunk_size
145+
logger.warning(
146+
f"Chunk overflow detected: Created chunk has {current_chunk_token_count} tokens, "
147+
f"exceeds chunk_size ({self.chunk_size}) by {overflow_amount} tokens. "
148+
f"Contains {len(current_chunk_sentences)} sentence(s). "
149+
f"Operation: Preserving full chunk without truncation."
150+
)
107151
chunks.append(chunk_text.strip())
108152
return True
109153
return False
@@ -134,8 +178,15 @@ def reset_chunk_with_overlap():
134178
current_chunk_sentences[:] = overlap_sentences
135179
if overlap_sentences:
136180
overlap_text = ''.join(overlap_sentences)
137-
current_chunk_tokens = tokenizer.encode(overlap_text, add_special_tokens=False)
138-
current_chunk_token_count = len(current_chunk_tokens)
181+
# Ensure UTF-8 encoding before tokenization
182+
if isinstance(overlap_text, bytes):
183+
overlap_text = overlap_text.decode('utf-8', errors='replace')
184+
try:
185+
current_chunk_tokens = tokenizer.encode(overlap_text, add_special_tokens=False)
186+
current_chunk_token_count = len(current_chunk_tokens)
187+
except Exception:
188+
current_chunk_tokens = []
189+
current_chunk_token_count = 0
139190
else:
140191
current_chunk_tokens = []
141192
current_chunk_token_count = 0
@@ -154,11 +205,37 @@ def reset_chunk_with_overlap():
154205
if len(current_chunk_sentences) >= self.min_sentences_per_chunk:
155206
if create_chunk():
156207
reset_chunk_with_overlap()
157-
# Don't increment i, retry with current sentence
158-
continue
208+
# After reset, check if overlap is too large to fit current sentence
209+
# If overlap + current sentence still exceeds chunk_size, clear overlap to avoid infinite loop
210+
if current_chunk_token_count + sent_info['token_count'] > self.chunk_size:
211+
# Log the situation and clear overlap
212+
actual_total = current_chunk_token_count + sent_info['token_count']
213+
logger.warning(
214+
f"Chunk overflow detected: Current sentence ({sent_info['token_count']} tokens) + "
215+
f"overlap ({current_chunk_token_count} tokens) = {actual_total} tokens exceeds "
216+
f"chunk_size ({self.chunk_size}). "
217+
f"Operation: Clearing overlap and preserving full sentence as new chunk. "
218+
f"Suggestion: Reduce chunk_overlap or increase chunk_size."
219+
)
220+
# Clear overlap and force add current sentence to break the loop
221+
current_chunk_sentences.clear()
222+
current_chunk_tokens = []
223+
current_chunk_token_count = 0
224+
else:
225+
# Overlap is small enough, retry with current sentence
226+
continue
159227
else:
160228
# If we don't have enough sentences yet, force add this sentence
161229
# (even if it exceeds chunk_size) to meet min_sentences_per_chunk requirement
230+
if sent_info['token_count'] > self.chunk_size:
231+
overflow_amount = sent_info['token_count'] - self.chunk_size
232+
logger.warning(
233+
f"Chunk overflow detected: Single sentence has {sent_info['token_count']} tokens, "
234+
f"exceeds chunk_size ({self.chunk_size}) by {overflow_amount} tokens. "
235+
f"Operation: Preserving full sentence as oversized chunk (will not truncate). "
236+
f"Sentence preview: '{sent_info['text'][:100]}...' "
237+
f"Suggestion: Increase chunk_size parameter to accommodate long sentences."
238+
)
162239
current_chunk_sentences.append(sent_info['text'])
163240
current_chunk_tokens.extend(sent_info['tokens'])
164241
current_chunk_token_count += sent_info['token_count']
@@ -190,6 +267,16 @@ def reset_chunk_with_overlap():
190267
if len(current_chunk_sentences) >= self.min_sentences_per_chunk:
191268
chunk_text = ''.join(current_chunk_sentences)
192269
if chunk_text.strip():
270+
# Check if final chunk exceeds chunk_size and log it
271+
if current_chunk_token_count > self.chunk_size:
272+
overflow_amount = current_chunk_token_count - self.chunk_size
273+
logger.warning(
274+
f"Final chunk overflow detected: Final chunk has {current_chunk_token_count} tokens, "
275+
f"exceeds chunk_size ({self.chunk_size}) by {overflow_amount} tokens. "
276+
f"Contains {len(current_chunk_sentences)} sentence(s). "
277+
f"Operation: Preserving full chunk without truncation. "
278+
f"Suggestion: Increase chunk_size or reduce chunk_overlap."
279+
)
193280
chunks.append(chunk_text.strip())
194281
# If remaining sentences don't meet min requirement, they are discarded
195282
# (or could be merged with previous chunk if needed, but current logic discards them)

0 commit comments

Comments
 (0)