@@ -186,31 +186,33 @@ def _read_csv_iterator(
186186 end -= 1 # Range is inclusive, contrary to Python's List
187187 bytes_range = "bytes={}-{}" .format (ini , end )
188188 logger .debug (f"bytes_range: { bytes_range } " )
189- body = client_s3 .get_object (Bucket = bucket_name , Key = key_path , Range = bytes_range )[ "Body" ]\
190- . read ()\
191- . decode ( "utf-8" )
189+ body = client_s3 .get_object (Bucket = bucket_name ,
190+ Key = key_path ,
191+ Range = bytes_range )[ "Body" ]. read ( )
192192 chunk_size = len (body )
193- logger .debug (f"chunk_size: { chunk_size } " )
193+ logger .debug (f"chunk_size (bytes) : { chunk_size } " )
194194
195195 if count == 1 : # first chunk
196196 last_char = Pandas ._find_terminator (
197197 body = body ,
198+ sep = sep ,
198199 quoting = quoting ,
199200 quotechar = quotechar ,
200201 lineterminator = lineterminator )
201- forgotten_bytes = len (body [last_char :]. encode ( "utf-8" ) )
202+ forgotten_bytes = len (body [last_char :])
202203 elif count == bounders_len : # Last chunk
203204 last_char = chunk_size
204205 else :
205206 last_char = Pandas ._find_terminator (
206207 body = body ,
208+ sep = sep ,
207209 quoting = quoting ,
208210 quotechar = quotechar ,
209211 lineterminator = lineterminator )
210- forgotten_bytes = len (body [last_char :]. encode ( "utf-8" ) )
212+ forgotten_bytes = len (body [last_char :])
211213
212214 df = pandas .read_csv (
213- StringIO (body [:last_char ]),
215+ StringIO (body [:last_char ]. decode ( "utf-8" ) ),
214216 header = header ,
215217 names = names ,
216218 sep = sep ,
@@ -229,57 +231,95 @@ def _read_csv_iterator(
229231 header = None
230232
231233 @staticmethod
232- def _find_terminator (body , quoting , quotechar , lineterminator ):
234+ def _extract_terminator_profile (body , sep , quotechar , lineterminator ,
235+ last_index ):
236+ """
237+ Backward parser for quoted CSV lines
238+ :param body: String
239+ :param sep: Same as pandas.read_csv()
240+ :param quotechar: Same as pandas.read_csv()
241+ :param lineterminator: Same as pandas.read_csv()
242+ :return: Dict with the profile
243+ """
244+ sep_int = int .from_bytes (bytes = sep .encode (encoding = "utf-8" ),
245+ byteorder = "big" ) # b"," -> 44
246+ quote_int = int .from_bytes (bytes = quotechar .encode (encoding = "utf-8" ),
247+ byteorder = "big" ) # b'"' -> 34
248+ terminator_int = int .from_bytes (
249+ bytes = lineterminator .encode (encoding = "utf-8" ),
250+ byteorder = "big" ) # b"\n" -> 10
251+ logger .debug (f"sep_int: { sep_int } " )
252+ logger .debug (f"quote_int: { quote_int } " )
253+ logger .debug (f"terminator_int: { terminator_int } " )
254+ last_terminator_suspect_index = None
255+ first_non_special_byte_index = None
256+ sep_counter = 0
257+ quote_counter = 0
258+ for i in range ((len (body [:last_index ]) - 1 ), - 1 , - 1 ):
259+ b = body [i ]
260+ if last_terminator_suspect_index :
261+ if b == quote_int :
262+ quote_counter += 1
263+ elif b == sep_int :
264+ sep_counter += 1
265+ elif b == terminator_int :
266+ pass
267+ else :
268+ first_non_special_byte_index = i
269+ break
270+ if b == terminator_int :
271+ if not last_terminator_suspect_index :
272+ last_terminator_suspect_index = i
273+ elif last_terminator_suspect_index - 1 == i :
274+ first_non_special_byte_index = i
275+ break
276+ logger .debug (
277+ f"last_terminator_suspect_index: { last_terminator_suspect_index } " )
278+ logger .debug (
279+ f"first_non_special_byte_index: { first_non_special_byte_index } " )
280+ logger .debug (f"sep_counter: { sep_counter } " )
281+ logger .debug (f"quote_counter: { quote_counter } " )
282+ return {
283+ "last_terminator_suspect_index" : last_terminator_suspect_index ,
284+ "first_non_special_byte_index" : first_non_special_byte_index ,
285+ "sep_counter" : sep_counter ,
286+ "quote_counter" : quote_counter
287+ }
288+
289+ @staticmethod
290+ def _find_terminator (body , sep , quoting , quotechar , lineterminator ):
233291 """
234292 Find for any suspicious of line terminator (From end to start)
235293 :param body: String
294+ :param sep: Same as pandas.read_csv()
236295 :param quoting: Same as pandas.read_csv()
237296 :param quotechar: Same as pandas.read_csv()
238297 :param lineterminator: Same as pandas.read_csv()
239298 :return: The index of the suspect line terminator
240299 """
241300 try :
301+ last_index = None
242302 if quoting == csv .QUOTE_ALL :
243- index = body .rindex (lineterminator )
244303 while True :
245- i = 0
246- while True :
247- i += 1
248- if index + i <= len (body ) - 1 :
249- c = body [index + i ]
250- if c == "," :
251- pass
252- elif c == quotechar :
253- right = True
254- break
255- else :
256- right = False
257- break
258- else :
259- right = True
260- break
261- i = 0
262- while True :
263- i += 1
264- if index - i >= 0 :
265- c = body [index - i ]
266- if c == "," :
267- pass
268- elif c == quotechar :
269- left = True
270- break
271- else :
272- left = False
273- break
304+ profile = Pandas ._extract_terminator_profile (
305+ body = body ,
306+ sep = sep ,
307+ quotechar = quotechar ,
308+ lineterminator = lineterminator ,
309+ last_index = last_index )
310+ if profile ["last_terminator_suspect_index" ] and profile [
311+ "first_non_special_byte_index" ]:
312+ if profile ["quote_counter" ] % 2 == 0 or profile [
313+ "quote_counter" ] == 0 :
314+ last_index = profile [
315+ "last_terminator_suspect_index" ]
274316 else :
275- left = True
317+ index = profile [ "last_terminator_suspect_index" ]
276318 break
277-
278- if right and left :
279- break
280- index = body [:index ].rindex (lineterminator )
319+ else :
320+ raise LineTerminatorNotFound ()
281321 else :
282- index = body .rindex (lineterminator )
322+ index = body .rindex (lineterminator . encode ( encoding = "utf-8" ) )
283323 except ValueError :
284324 raise LineTerminatorNotFound ()
285325 return index
0 commit comments