@@ -86,14 +86,18 @@ def is_sse_event_line(chunk_str: str) -> bool:
8686 @staticmethod
8787 def remove_chunk_prefix (chunk_str : str , field_mapping : FieldMapping ) -> str :
8888 """Remove prefix from chunk string based on field mapping configuration."""
89+ result = chunk_str .strip ()
8990 if field_mapping .end_prefix :
90- return chunk_str . replace (field_mapping .end_prefix , "" ) .strip ()
91+ result = result [ len (field_mapping .end_prefix ) :] .strip ()
9192 elif field_mapping .stream_prefix and chunk_str .startswith (
9293 field_mapping .stream_prefix
9394 ):
94- return chunk_str [len (field_mapping .stream_prefix ) :].strip ()
95- else :
96- return chunk_str .strip ()
95+ result = result [len (field_mapping .stream_prefix ) :].strip ()
96+ elif chunk_str .startswith ("data: " ):
97+ result = result [len ("data: " ) :].strip ()
98+ elif chunk_str .startswith ("event: " ):
99+ result = result [len ("event: " ) :].strip ()
100+ return result
97101
98102 @staticmethod
99103 def check_stop_flag (processed_chunk : str , field_mapping : FieldMapping ) -> bool :
@@ -269,9 +273,6 @@ def process_stream_chunk(
269273 if not chunk_str :
270274 return False , None , metrics
271275
272- if StreamProcessor .is_sse_event_line (chunk_str ):
273- return False , None , metrics
274-
275276 # Remove prefix if present
276277 processed_chunk = StreamProcessor .remove_chunk_prefix (chunk_str , field_mapping )
277278
@@ -282,6 +283,8 @@ def process_stream_chunk(
282283 if StreamProcessor .check_stop_flag (processed_chunk , field_mapping ):
283284 return True , None , metrics # Normal stream end
284285
286+ if StreamProcessor .is_sse_event_line (chunk_str ):
287+ return False , None , metrics
285288 # Check if data format is JSON
286289 if field_mapping .data_format == "json" :
287290 try :
0 commit comments