Skip to content

Commit 7020ecc

Browse files
authored
Merge pull request #2853 from mabel-dev/improve-jsonl-performance-2
improve jsonl performance
2 parents be49a57 + ed5ac01 commit 7020ecc

File tree

6 files changed

+270
-80
lines changed

6 files changed

+270
-80
lines changed

opteryx/__version__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
# THIS FILE IS AUTOMATICALLY UPDATED DURING THE BUILD PROCESS
22
# DO NOT EDIT THIS FILE DIRECTLY
33

4-
__build__ = 1660
4+
__build__ = 1664
55
__author__ = "@joocer"
6-
__version__ = "0.26.0-beta.1660"
6+
__version__ = "0.26.0-beta.1664"
77

88
# Store the version here so:
99
# 1) we don't load dependencies by storing it in __init__.py

opteryx/compiled/structures/jsonl_decoder.pyx

Lines changed: 111 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,35 @@
1010
Fast JSONL decoder using Cython for performance-critical operations.
1111
"""
1212

13-
from libc.string cimport memchr, memcmp
13+
from libc.string cimport memcmp
1414
from libc.stddef cimport size_t
1515
from cpython.bytes cimport PyBytes_AS_STRING, PyBytes_GET_SIZE
1616
from cpython.mem cimport PyMem_Malloc, PyMem_Free
1717

1818
from opteryx.third_party.fastfloat.fast_float cimport c_parse_fast_float
1919

2020
import orjson as json
21+
import platform
22+
23+
cdef extern from "simd_search.h":
24+
size_t neon_count(const char* data, size_t length, char target)
25+
size_t avx_count(const char* data, size_t length, char target)
26+
int neon_search(const char* data, size_t length, char target)
27+
int avx_search(const char* data, size_t length, char target)
28+
29+
30+
# Detect architecture at module initialization and select the appropriate SIMD function
31+
cdef size_t (*simd_count)(const char*, size_t, char)
32+
cdef int (*simd_search)(const char*, size_t, char)
33+
34+
# Detect CPU architecture once at module load
35+
_arch = platform.machine().lower()
36+
if _arch in ('arm64', 'aarch64'):
37+
simd_count = neon_count
38+
simd_search = neon_search
39+
else:
40+
simd_count = avx_count
41+
simd_search = avx_search
2142

2243

2344
cdef enum ColumnType:
@@ -100,27 +121,30 @@ cdef inline const char* find_key_value(const char* line, Py_ssize_t line_len, co
100121
cdef int bracket_count
101122
cdef int backslash_run
102123
cdef Py_ssize_t remaining
124+
cdef int quote_offset
103125

104126
# Search for the key pattern: "key":
105127
while pos < end:
106128
# Find opening quote of a key
107129
remaining = end - pos
108130
if remaining <= 0:
109131
return NULL
110-
key_pos = <const char*>memchr(pos, b'"', <size_t>remaining)
111-
if key_pos == NULL:
132+
133+
# Use SIMD search to find the quote character
134+
quote_offset = simd_search(pos, <size_t>remaining, 34) # '"'
135+
if quote_offset == -1:
112136
return NULL
113137

138+
key_pos = pos + quote_offset
114139
key_pos += 1 # Move past the opening quote
115140

116141
# Check if this matches our key
117-
if (end - key_pos >= key_len and memcmp(key_pos, key, <size_t>key_len) == 0 and key_pos[key_len] == b'"'):
118-
142+
if (end - key_pos >= key_len and memcmp(key_pos, key, <size_t>key_len) == 0 and key_pos[key_len] == 34): # '"'
119143
# Found the key, now find the colon
120144
value_pos = key_pos + key_len + 1 # Skip closing quote
121145

122146
# Skip whitespace and colon
123-
while value_pos < end and (value_pos[0] == b' ' or value_pos[0] == b'\t' or value_pos[0] == b':'):
147+
while value_pos < end and (value_pos[0] == 32 or value_pos[0] == 9 or value_pos[0] == 58): # ' ', '\t', ':'
124148
value_pos += 1
125149

126150
if value_pos >= end:
@@ -130,84 +154,84 @@ cdef inline const char* find_key_value(const char* line, Py_ssize_t line_len, co
130154
value_start[0] = value_pos - line
131155

132156
# Determine value type and find end
133-
if first_char == b'"':
157+
if first_char == 34: # '"'
134158
# String value - find closing quote, handling escapes
135159
quote_start = value_pos + 1
136160
quote_end = quote_start
137161
backslash_run = 0
138162
while quote_end < end:
139-
if quote_end[0] == b'\\':
163+
if quote_end[0] == 92: # '\\'
140164
backslash_run += 1
141165
else:
142-
if quote_end[0] == b'"' and (backslash_run & 1) == 0:
166+
if quote_end[0] == 34 and (backslash_run & 1) == 0: # '"'
143167
# Found unescaped quote
144168
value_len[0] = (quote_end + 1) - value_pos
145169
return value_pos
146170
backslash_run = 0
147171
quote_end += 1
148172
return NULL
149173

150-
elif first_char == b'{':
174+
elif first_char == 123: # '{'
151175
# Object - count braces
152176
brace_count = 1
153177
quote_end = value_pos + 1
154178
while quote_end < end and brace_count > 0:
155-
if quote_end[0] == b'{':
179+
if quote_end[0] == 123: # '{'
156180
brace_count += 1
157-
elif quote_end[0] == b'}':
181+
elif quote_end[0] == 125: # '}'
158182
brace_count -= 1
159-
elif quote_end[0] == b'"':
183+
elif quote_end[0] == 34: # '"'
160184
# Skip string contents to avoid premature brace counting
161185
quote_end += 1
162186
while quote_end < end:
163-
if quote_end[0] == b'\\':
187+
if quote_end[0] == 92: # '\'
164188
quote_end += 2
165189
continue
166-
if quote_end[0] == b'"':
190+
if quote_end[0] == 34: # '"'
167191
break
168192
quote_end += 1
169193
quote_end += 1
170194
value_len[0] = quote_end - value_pos
171195
return value_pos
172196

173-
elif first_char == b'[':
197+
elif first_char == 91: # '['
174198
# Array - count brackets
175199
bracket_count = 1
176200
quote_end = value_pos + 1
177201
while quote_end < end and bracket_count > 0:
178-
if quote_end[0] == b'[':
202+
if quote_end[0] == 91: # '['
179203
bracket_count += 1
180-
elif quote_end[0] == b']':
204+
elif quote_end[0] == 93: # ']'
181205
bracket_count -= 1
182-
elif quote_end[0] == b'"':
206+
elif quote_end[0] == 34: # '"'
183207
# Skip string contents inside arrays
184208
quote_end += 1
185209
while quote_end < end:
186-
if quote_end[0] == b'\\':
210+
if quote_end[0] == 92:
187211
quote_end += 2
188212
continue
189-
if quote_end[0] == b'"':
213+
if quote_end[0] == 34: # '"'
190214
break
191215
quote_end += 1
192216
quote_end += 1
193217
value_len[0] = quote_end - value_pos
194218
return value_pos
195219

196-
elif first_char == b'n':
220+
elif first_char == 110: # 'n'
197221
# null
198222
if end - value_pos >= 4 and memcmp(value_pos, b"null", 4) == 0:
199223
value_len[0] = 4
200224
return value_pos
201225
return NULL
202226

203-
elif first_char == b't':
227+
elif first_char == 116: # 't'
204228
# true
205229
if end - value_pos >= 4 and memcmp(value_pos, b"true", 4) == 0:
206230
value_len[0] = 4
207231
return value_pos
208232
return NULL
209233

210-
elif first_char == b'f':
234+
elif first_char == 102: # 'f'
211235
# false
212236
if end - value_pos >= 5 and memcmp(value_pos, b"false", 5) == 0:
213237
value_len[0] = 5
@@ -219,7 +243,7 @@ cdef inline const char* find_key_value(const char* line, Py_ssize_t line_len, co
219243
quote_end = value_pos + 1
220244
while quote_end < end:
221245
# Check for delimiter characters
222-
if quote_end[0] == b' ' or quote_end[0] == b',' or quote_end[0] == b'}' or quote_end[0] == b']' or quote_end[0] == b'\t' or quote_end[0] == b'\n':
246+
if quote_end[0] == 32 or quote_end[0] == 44 or quote_end[0] == 125 or quote_end[0] == 93 or quote_end[0] == 9 or quote_end[0] == 10: # ' ', ',', '}', ']', '\t', '\n'
223247
value_len[0] = quote_end - value_pos
224248
return value_pos
225249
quote_end += 1
@@ -259,7 +283,6 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
259283
cdef Py_ssize_t key_len
260284
cdef str col_type
261285
cdef dict result = {}
262-
cdef Py_ssize_t num_lines = 0
263286
cdef Py_ssize_t i
264287
cdef Py_ssize_t num_cols = len(column_names)
265288
cdef list column_lists = []
@@ -270,10 +293,12 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
270293
cdef int type_code
271294
cdef list col_list
272295
cdef bytes value_bytes
273-
cdef str value_str
274296
cdef object parsed
275297
cdef Py_ssize_t remaining
276-
cdef const char* newline_pos
298+
cdef size_t line_count
299+
cdef size_t estimated_lines
300+
cdef Py_ssize_t line_index = 0
301+
cdef int newline_offset
277302

278303
result = {}
279304

@@ -303,25 +328,41 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
303328
col_type = column_types.get(col, 'str')
304329
type_codes[i] = _column_type_code(col_type)
305330

331+
# Pre-count lines to preallocate arrays (using architecture-appropriate SIMD function)
332+
line_count = simd_count(data, data_len, 10) # Count '\n' (ASCII 10)
333+
if data_len > 0 and data[data_len - 1] != 10: # Doesn't end with '\n'
334+
estimated_lines = line_count + 1
335+
else:
336+
estimated_lines = line_count
337+
338+
# Preallocate column lists
339+
for i in range(num_cols):
340+
col_list = [None] * estimated_lines
341+
column_lists[i] = col_list
342+
col = column_names[i]
343+
result[col] = col_list
344+
306345
while pos < end:
307346
line_start = pos
308347
remaining = end - line_start
309348
if remaining <= 0:
310349
break
311-
newline_pos = <const char*>memchr(line_start, b'\n', <size_t>remaining)
312-
if newline_pos == NULL:
350+
351+
# Use SIMD search to find newline
352+
newline_offset = simd_search(line_start, <size_t>remaining, 10) # '\n'
353+
if newline_offset == -1:
313354
line_end = end
314355
pos = end
315356
else:
316-
line_end = newline_pos
317-
pos = newline_pos + 1
357+
line_end = line_start + newline_offset
358+
pos = line_end + 1
318359

319360
line_len = line_end - line_start
320-
num_lines += 1
361+
# num_lines += 1 # Removed, using line_index instead
321362

322363
if line_len == 0:
323-
for i in range(num_cols):
324-
(<list>column_lists[i]).append(None)
364+
# Already pre-filled with None
365+
line_index += 1
325366
continue
326367

327368
for i in range(num_cols):
@@ -333,59 +374,53 @@ cpdef fast_jsonl_decode_columnar(bytes buffer, list column_names, dict column_ty
333374
value_ptr = find_key_value(line_start, line_len, key_ptr, key_len, &value_start, &value_len)
334375

335376
if value_ptr == NULL:
336-
col_list.append(None)
377+
# Already None
337378
continue
338379

339-
if type_code == COL_BOOL:
380+
if value_len == 4 and memcmp(value_ptr, b"null", 4) == 0:
381+
# Already None
382+
continue
383+
384+
elif type_code == COL_BOOL:
340385
if value_len == 4 and memcmp(value_ptr, b"true", 4) == 0:
341-
col_list.append(True)
386+
col_list[line_index] = True
342387
elif value_len == 5 and memcmp(value_ptr, b"false", 5) == 0:
343-
col_list.append(False)
344-
else:
345-
col_list.append(None)
388+
col_list[line_index] = False
389+
# else already None
346390

347391
elif type_code == COL_INT:
348-
if value_len == 4 and memcmp(value_ptr, b"null", 4) == 0:
349-
col_list.append(None)
350-
else:
351-
col_list.append(fast_atoll(value_ptr, value_len))
392+
col_list[line_index] = fast_atoll(value_ptr, value_len)
352393

353394
elif type_code == COL_FLOAT:
354-
if value_len == 4 and memcmp(value_ptr, b"null", 4) == 0:
355-
col_list.append(None)
356-
else:
357-
value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
358-
col_list.append(c_parse_fast_float(value_bytes))
395+
value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
396+
col_list[line_index] = c_parse_fast_float(value_bytes)
359397

360398
elif type_code == COL_STR:
361-
if value_len == 4 and memcmp(value_ptr, b"null", 4) == 0:
362-
col_list.append(None)
363-
elif value_ptr[0] == b'"' and value_len >= 2:
364-
value_bytes = PyBytes_FromStringAndSize(value_ptr + 1, value_len - 2)
365-
try:
366-
value_str = value_bytes.decode('utf-8')
367-
value_str = value_str.replace('\\n', '\n').replace('\\t', '\t').replace('\\"', '"').replace('\\\\', '\\')
368-
col_list.append(value_str)
369-
except UnicodeDecodeError:
370-
col_list.append(None)
371-
else:
372-
col_list.append(None)
399+
value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
400+
try:
401+
parsed = json.loads(value_bytes)
402+
if isinstance(parsed, str):
403+
col_list[line_index] = parsed
404+
# else already None
405+
except (json.JSONDecodeError, UnicodeDecodeError):
406+
# Already None
407+
pass
373408

374409
else:
375-
if value_len == 4 and memcmp(value_ptr, b"null", 4) == 0:
376-
col_list.append(None)
377-
else:
378-
value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
379-
try:
380-
parsed = json.loads(value_bytes)
381-
if isinstance(parsed, dict):
382-
col_list.append(json.dumps(parsed))
383-
else:
384-
col_list.append(parsed)
385-
except (json.JSONDecodeError, UnicodeDecodeError):
386-
col_list.append(None)
387-
388-
return (num_lines, num_cols, result)
410+
value_bytes = PyBytes_FromStringAndSize(value_ptr, value_len)
411+
try:
412+
parsed = json.loads(value_bytes)
413+
if isinstance(parsed, dict):
414+
col_list[line_index] = json.dumps(parsed)
415+
else:
416+
col_list[line_index] = parsed
417+
except (json.JSONDecodeError, UnicodeDecodeError):
418+
# Already None
419+
pass
420+
421+
line_index += 1
422+
423+
return (estimated_lines, num_cols, result)
389424
finally:
390425
if type_codes != NULL:
391426
PyMem_Free(type_codes)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "opteryx"
3-
version = "0.26.0-beta.1660"
3+
version = "0.26.0-beta.1664"
44
description = "Query your data, where it lives"
55
requires-python = '>=3.11'
66
readme = {file = "README.md", content-type = "text/markdown"}

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,10 @@ def rust_build(setup_kwargs: Dict[str, Any]) -> None:
251251
),
252252
Extension(
253253
name="opteryx.compiled.structures.jsonl_decoder",
254-
sources=["opteryx/compiled/structures/jsonl_decoder.pyx"],
254+
sources=[
255+
"opteryx/compiled/structures/jsonl_decoder.pyx",
256+
"src/cpp/simd_search.cpp"
257+
],
255258
include_dirs=include_dirs + ["third_party/fastfloat/fast_float"],
256259
language="c++",
257260
extra_compile_args=CPP_COMPILE_FLAGS,

0 commit comments

Comments
 (0)