1010Fast JSONL decoder using Cython for performance-critical operations.
1111"""
1212
13- from libc.string cimport memchr, memcmp
13+ from libc.string cimport memcmp
1414from libc.stddef cimport size_t
1515from cpython.bytes cimport PyBytes_AS_STRING, PyBytes_GET_SIZE
1616from cpython.mem cimport PyMem_Malloc, PyMem_Free
@@ -23,17 +23,22 @@ import platform
2323cdef extern from " simd_search.h" :
2424 size_t neon_count(const char * data, size_t length, char target)
2525 size_t avx_count(const char * data, size_t length, char target)
26+ int neon_search(const char * data, size_t length, char target)
27+ int avx_search(const char * data, size_t length, char target)
2628
2729
2830# Detect architecture at module initialization and select the appropriate SIMD function
2931cdef size_t (* simd_count)(const char * , size_t, char )
32+ cdef int (* simd_search)(const char * , size_t, char )
3033
3134# Detect CPU architecture once at module load
3235_arch = platform.machine().lower()
3336if _arch in (' arm64' , ' aarch64' ):
3437 simd_count = neon_count
38+ simd_search = neon_search
3539else :
3640 simd_count = avx_count
41+ simd_search = avx_search
3742
3843
3944cdef enum ColumnType:
@@ -116,17 +121,21 @@ cdef inline const char* find_key_value(const char* line, Py_ssize_t line_len, co
116121 cdef int bracket_count
117122 cdef int backslash_run
118123 cdef Py_ssize_t remaining
124+ cdef int quote_offset
119125
120126 # Search for the key pattern: "key":
121127 while pos < end:
122128 # Find opening quote of a key
123129 remaining = end - pos
124130 if remaining <= 0 :
125131 return NULL
126- key_pos = < const char * > memchr(pos, 34 , < size_t> remaining) # '"'
127- if key_pos == NULL :
132+
133+ # Use SIMD search to find the quote character
134+ quote_offset = simd_search(pos, < size_t> remaining, 34 ) # '"'
135+ if quote_offset == - 1 :
128136 return NULL
129137
138+ key_pos = pos + quote_offset
130139 key_pos += 1 # Move past the opening quote
131140
132141 # Check if this matches our key
@@ -274,7 +283,6 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
274283 cdef Py_ssize_t key_len
275284 cdef str col_type
276285 cdef dict result = {}
277- cdef Py_ssize_t num_lines
278286 cdef Py_ssize_t i
279287 cdef Py_ssize_t num_cols = len (column_names)
280288 cdef list column_lists = []
@@ -287,10 +295,10 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
287295 cdef bytes value_bytes
288296 cdef object parsed
289297 cdef Py_ssize_t remaining
290- cdef const char * newline_pos
291298 cdef size_t line_count
292299 cdef size_t estimated_lines
293300 cdef Py_ssize_t line_index = 0
301+ cdef int newline_offset
294302
295303 result = {}
296304
@@ -329,21 +337,25 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
329337
330338 # Preallocate column lists
331339 for i in range (num_cols):
332- col_list = column_lists[i]
333- column_lists[i] = [None ] * estimated_lines
340+ col_list = [None ] * estimated_lines
341+ column_lists[i] = col_list
342+ col = column_names[i]
343+ result[col] = col_list
334344
335345 while pos < end:
336346 line_start = pos
337347 remaining = end - line_start
338348 if remaining <= 0 :
339349 break
340- newline_pos = < const char * > memchr(line_start, 10 , < size_t> remaining) # '\n'
341- if newline_pos == NULL :
350+
351+ # Use SIMD search to find newline
352+ newline_offset = simd_search(line_start, < size_t> remaining, 10 ) # '\n'
353+ if newline_offset == - 1 :
342354 line_end = end
343355 pos = end
344356 else :
345- line_end = newline_pos
346- pos = newline_pos + 1
357+ line_end = line_start + newline_offset
358+ pos = line_end + 1
347359
348360 line_len = line_end - line_start
349361 # num_lines += 1 # Removed, using line_index instead
@@ -365,63 +377,50 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
365377 # Already None
366378 continue
367379
368- if type_code == COL_BOOL:
380+ if value_len == 4 and memcmp(value_ptr, b" null" , 4 ) == 0 :
381+ # Already None
382+ continue
383+
384+ elif type_code == COL_BOOL:
369385 if value_len == 4 and memcmp(value_ptr, b" true" , 4 ) == 0 :
370386 col_list[line_index] = True
371387 elif value_len == 5 and memcmp(value_ptr, b" false" , 5 ) == 0 :
372388 col_list[line_index] = False
373389 # else already None
374390
375391 elif type_code == COL_INT:
376- if value_len == 4 and memcmp(value_ptr, b" null" , 4 ) == 0 :
377- # Already None
378- pass
379- else :
380- col_list[line_index] = fast_atoll(value_ptr, value_len)
392+ col_list[line_index] = fast_atoll(value_ptr, value_len)
381393
382394 elif type_code == COL_FLOAT:
383- if value_len == 4 and memcmp(value_ptr, b" null" , 4 ) == 0 :
384- # Already None
385- pass
386- else :
387- value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
388- col_list[line_index] = c_parse_fast_float(value_bytes)
395+ value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
396+ col_list[line_index] = c_parse_fast_float(value_bytes)
389397
390398 elif type_code == COL_STR:
391- if value_len == 4 and memcmp(value_ptr, b" null" , 4 ) == 0 :
399+ value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
400+ try :
401+ parsed = json.loads(value_bytes)
402+ if isinstance (parsed, str ):
403+ col_list[line_index] = parsed
404+ # else already None
405+ except (json.JSONDecodeError, UnicodeDecodeError ):
392406 # Already None
393407 pass
394- else :
395- value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
396- try :
397- parsed = json.loads(value_bytes)
398- if isinstance (parsed, str ):
399- col_list[line_index] = parsed
400- # else already None
401- except (json.JSONDecodeError, UnicodeDecodeError ):
402- # Already None
403- pass
404408
405409 else :
406- if value_len == 4 and memcmp(value_ptr, b" null" , 4 ) == 0 :
410+ value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
411+ try :
412+ parsed = json.loads(value_bytes)
413+ if isinstance (parsed, dict ):
414+ col_list[line_index] = json.dumps(parsed)
415+ else :
416+ col_list[line_index] = parsed
417+ except (json.JSONDecodeError, UnicodeDecodeError ):
407418 # Already None
408419 pass
409- else :
410- value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
411- try :
412- parsed = json.loads(value_bytes)
413- if isinstance (parsed, dict ):
414- col_list[line_index] = json.dumps(parsed)
415- else :
416- col_list[line_index] = parsed
417- except (json.JSONDecodeError, UnicodeDecodeError ):
418- # Already None
419- pass
420420
421421 line_index += 1
422422
423- num_lines = line_index
424- return (num_lines, num_cols, result)
423+ return (estimated_lines, num_cols, result)
425424 finally :
426425 if type_codes != NULL :
427426 PyMem_Free(type_codes)
0 commit comments