Skip to content

Commit 4dfa0e9

Browse files
committed
0.0.188
1 parent e8c8dac commit 4dfa0e9

File tree

5 files changed

+277
-21
lines changed

5 files changed

+277
-21
lines changed

orso/compute/compiled.pyx

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ cimport numpy as cnp
3030
from numpy cimport ndarray
3131
from libc.stdint cimport int32_t, int64_t
3232
from cpython.dict cimport PyDict_GetItem
33+
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
3334

3435
cnp.import_array()
3536

@@ -174,3 +175,139 @@ def process_table(table, row_factory, int max_chunksize) -> list:
174175
rows[i] = row_factory(row)
175176
i += 1
176177
return rows
178+
179+
180+
181+
# cython: language_level=3
182+
# cython: boundscheck=False
183+
# cython: wraparound=False
184+
# cython: nonecheck=False
185+
# cython: cdivision=True
186+
# cython: initializedcheck=False
187+
# cython: infer_types=True
188+
189+
import pyarrow
190+
cimport cython
191+
from libc.stdint cimport int64_t, uint8_t, int32_t
192+
from libc.stdint cimport int32_t, int64_t, uint8_t, uint64_t, uintptr_t
193+
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
194+
195+
# cython: language_level=3
196+
# cython: boundscheck=False
197+
# cython: wraparound=False
198+
# cython: nonecheck=False
199+
# cython: cdivision=True
200+
# cython: initializedcheck=False
201+
# cython: infer_types=True
202+
203+
import pyarrow
204+
import struct
205+
cimport cython
206+
from libc.stdint cimport int32_t, int64_t, uint8_t
207+
208+
cpdef list _process_table(table, object row_factory, int max_chunksize):
209+
"""
210+
Converts a PyArrow table into a list of tuples efficiently.
211+
212+
Parameters:
213+
table: PyArrow Table
214+
The input table to process.
215+
row_factory: function
216+
A function applied to each row.
217+
max_chunksize: int
218+
The batch size to process at a time.
219+
220+
Returns:
221+
A list of transformed rows.
222+
"""
223+
cdef list result = []
224+
cdef Py_ssize_t num_cols = table.num_columns
225+
cdef Py_ssize_t row_idx, col_idx, chunk_offset
226+
cdef object chunk, buffers
227+
cdef const uint8_t* validity
228+
cdef const int32_t* int_offsets
229+
cdef const char* data
230+
cdef Py_ssize_t row_count, str_start, str_end
231+
cdef bytes value
232+
cdef object row_tuple
233+
cdef uint8_t null_mask
234+
cdef Py_ssize_t bit_offset, byte_offset, bit_index
235+
236+
for batch in table.to_batches(max_chunksize):
237+
batch_cols = batch.columns
238+
batch_num_rows = batch.num_rows
239+
240+
# Preallocate row storage
241+
batch_result = [None] * batch_num_rows
242+
243+
for row_idx in range(batch_num_rows):
244+
row_tuple = [None] * num_cols
245+
246+
for col_idx in range(num_cols):
247+
chunk = batch_cols[col_idx]
248+
buffers = chunk.buffers()
249+
250+
# Extract validity bitmap
251+
validity = <const uint8_t*><uintptr_t>buffers[0].address if buffers[0] else NULL
252+
253+
# Compute null mask offsets
254+
if validity:
255+
byte_offset = row_idx // 8
256+
bit_index = row_idx % 8
257+
null_mask = validity[byte_offset] & (1 << bit_index)
258+
else:
259+
null_mask = 1 # If no validity buffer, assume all valid
260+
261+
if null_mask == 0:
262+
# NULL value case
263+
continue
264+
265+
# Process based on type
266+
if pyarrow.types.is_string(chunk.type) or pyarrow.types.is_binary(chunk.type):
267+
int_offsets = <const int32_t*><uintptr_t>buffers[1].address
268+
data = <const char*><uintptr_t>buffers[2].address if len(buffers) > 2 else NULL
269+
270+
str_start = int_offsets[row_idx]
271+
str_end = int_offsets[row_idx + 1]
272+
273+
if str_start < str_end and data:
274+
value = data[str_start:str_end]
275+
row_tuple[col_idx] = value.decode()
276+
else:
277+
PyTuple_SET_ITEM(row_tuple, col_idx, "")
278+
279+
elif pyarrow.types.is_integer(chunk.type):
280+
# Get raw pointer to numeric data
281+
raw_data = <const uint8_t*><uintptr_t>buffers[1].address
282+
item_size = chunk.type.bit_width // 8
283+
284+
if item_size == 8: # int64
285+
row_tuple[col_idx] = struct.unpack_from("<q", raw_data, row_idx * 8)[0]
286+
elif item_size == 4: # int32
287+
row_tuple[col_idx] = struct.unpack_from("<i", raw_data, row_idx * 4)[0]
288+
elif item_size == 2: # int16
289+
row_tuple[col_idx] = struct.unpack_from("<h", raw_data, row_idx * 2)[0]
290+
elif item_size == 1: # int8
291+
row_tuple[col_idx] = struct.unpack_from("<b", raw_data, row_idx * 1)[0]
292+
293+
294+
295+
elif pyarrow.types.is_floating(chunk.type):
296+
row_tuple[col_idx] = chunk[row_idx].as_py()
297+
298+
elif pyarrow.types.is_boolean(chunk.type):
299+
# Booleans are bit-packed
300+
bool_data = <const uint8_t*><uintptr_t>buffers[1].address if buffers[1] else NULL
301+
bool_value = (bool_data[byte_offset] & (1 << bit_index)) != 0
302+
row_tuple[col_idx] = bool(bool_value)
303+
304+
# else:
305+
# # Fallback for unsupported types
306+
# PyTuple_SET_ITEM(row_tuple, col_idx, chunk[row_idx].as_py())
307+
308+
# batch_result[row_idx] = row_factory(row_tuple)
309+
print(row_tuple)
310+
311+
# result.extend(batch_result)
312+
313+
return result

orso/schema.py

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
5252
"""
5353

54+
import re
5455
from collections import defaultdict
5556
from dataclasses import _MISSING_TYPE
5657
from dataclasses import asdict
@@ -123,6 +124,46 @@ def load(cls: Type["Expectation"], serialized: Union[Dict[str, Any], str]) -> "E
123124
)
124125

125126

127+
def _parse_type(type_str: str) -> Union[str, Tuple[str, Tuple[int, ...]]]:
128+
"""
129+
Parses a SQL type string into its base type and optional parameters.
130+
131+
Parameters:
132+
type_str (str): The type definition string (e.g., 'DECIMAL(10,2)', 'VARCHAR[255]', 'ARRAY<VARCHAR>').
133+
134+
Returns:
135+
Union[str, Tuple[str, Tuple[int, ...]]]:
136+
- Just the base type (e.g., "INTEGER", "TEXT").
137+
- A tuple with the base type and a tuple of integer parameters if applicable (e.g., ("DECIMAL", (10, 2))).
138+
"""
139+
140+
# Match ARRAY<TYPE>
141+
array_match = re.match(r"ARRAY<([\w\s]+)>", type_str)
142+
if array_match:
143+
return "ARRAY", (array_match.group(1),)
144+
145+
# Match DECIMAL(p,s)
146+
decimal_match = re.match(r"DECIMAL\((\d+),\s*(\d+)\)", type_str)
147+
if decimal_match:
148+
precision, scale = map(int, decimal_match.groups())
149+
return "DECIMAL", (precision, scale)
150+
151+
# Match VARCHAR[n]
152+
varchar_match = re.match(r"VARCHAR\[(\d+)\]", type_str)
153+
if varchar_match:
154+
length = int(varchar_match.group(1))
155+
return "VARCHAR", (length,)
156+
157+
# Match BLOB[n]
158+
blob_match = re.match(r"BLOB\[(\d+)\]", type_str)
159+
if blob_match:
160+
size = int(blob_match.group(1))
161+
return "BLOB", (size,)
162+
163+
# If no parameters, return base type as a string
164+
return type_str.upper()
165+
166+
126167
@dataclass(init=False)
127168
class FlatColumn:
128169
"""
@@ -134,12 +175,14 @@ class FlatColumn:
134175
name: str
135176
default: Optional[Any] = None
136177
type: OrsoTypes = OrsoTypes._MISSING_TYPE
178+
subtype: Optional[OrsoTypes] = None
137179
description: Optional[str] = None
138180
disposition: Optional[ColumnDisposition] = None
139181
aliases: Optional[List[str]] = field(default_factory=list) # type: ignore
140182
nullable: bool = True
141183
expectations: Optional[Expectation] = field(default_factory=list)
142184
identity: str = field(default_factory=random_string)
185+
length: Optional[int] = None
143186
precision: Optional[int] = None
144187
scale: Optional[int] = None
145188
origin: Optional[List[str]] = field(default_factory=list)
@@ -178,25 +221,58 @@ def __init__(self, **kwargs):
178221
# map literals to OrsoTypes
179222
if self.type.__class__ is not OrsoTypes:
180223
type_name = str(self.type).upper()
181-
if type_name in OrsoTypes.__members__:
182-
self.type = OrsoTypes[type_name]
183-
elif type_name == "LIST":
184-
warn("Column type LIST will be deprecated in a future version, use ARRAY instead.")
185-
self.type = OrsoTypes.ARRAY
186-
elif type_name == "NUMERIC":
187-
warn(
188-
"Column type NUMERIC will be deprecated in a future version, use DECIMAL, DOUBLE or INTEGER instead. Mapped to DOUBLE, this may not be compatible with all values NUMERIC was compatible with."
189-
)
190-
self.type = OrsoTypes.DOUBLE
191-
elif type_name == "BSON":
192-
warn("Column type BSON will be deprecated in a future version, use JSONB instead.")
193-
self.type = OrsoTypes.JSONB
194-
elif type_name == "STRING":
195-
raise ValueError(
196-
f"Unknown column type '{self.type}' for column '{self.name}'. Did you mean 'VARCHAR'?"
197-
)
198-
elif self.type != 0:
224+
parsed_types = _parse_type(type_name)
225+
if isinstance(parsed_types, str):
226+
if parsed_types == "ARRAY":
227+
warn("Column type ARRAY without subtype, defaulting to VARCHAR.")
228+
self.type = OrsoTypes.ARRAY
229+
self.subtype = OrsoTypes.VARCHAR
230+
elif parsed_types in OrsoTypes.__members__:
231+
self.type = OrsoTypes[parsed_types]
232+
elif parsed_types == "LIST":
233+
warn(
234+
"Column type LIST will be deprecated in a future version, use ARRAY instead."
235+
)
236+
self.type = OrsoTypes.ARRAY
237+
elif parsed_types == "NUMERIC":
238+
warn(
239+
"Column type NUMERIC will be deprecated in a future version, use DECIMAL, DOUBLE or INTEGER instead. Mapped to DOUBLE, this may not be compatible with all values NUMERIC was compatible with."
240+
)
241+
self.type = OrsoTypes.DOUBLE
242+
elif parsed_types == "BSON":
243+
warn(
244+
"Column type BSON will be deprecated in a future version, use JSONB instead."
245+
)
246+
self.type = OrsoTypes.JSONB
247+
elif parsed_types == "STRING":
248+
raise ValueError(
249+
f"Unknown column type '{self.type}' for column '{self.name}'. Did you mean 'VARCHAR'?"
250+
)
251+
elif type_name == "0":
252+
self.type = 0
253+
else:
254+
raise ValueError(f"Unknown column type '{self.type}' for column '{self.name}'.")
255+
elif parsed_types[0] == "ARRAY":
256+
subtype = parsed_types[1][0]
257+
if subtype in ("ARRAY", "LIST", "NUMERIC", "BSON", "STRING"):
258+
raise ValueError(f"Invalid subtype '{subtype}' for ARRAY type.")
259+
if subtype in OrsoTypes.__members__:
260+
self.type = OrsoTypes.ARRAY
261+
self.subtype = OrsoTypes[subtype]
262+
else:
263+
raise ValueError(f"Unknown column type '{subtype}' for column '{self.name}'.")
264+
elif parsed_types[0] == "DECIMAL":
265+
self.type = OrsoTypes.DECIMAL
266+
self.precision, self.scale = parsed_types[1]
267+
elif parsed_types[0] == "VARCHAR":
268+
self.type = OrsoTypes.VARCHAR
269+
self.length = parsed_types[1][0]
270+
elif parsed_types[0] == "BLOB":
271+
self.type = OrsoTypes.BLOB
272+
self.length = parsed_types[1][0]
273+
else:
199274
raise ValueError(f"Unknown column type '{self.type}' for column '{self.name}'.")
275+
# validate decimal properties
200276

201277
if self.type == OrsoTypes.DECIMAL and self.precision is None:
202278
from decimal import getcontext
@@ -271,6 +347,7 @@ def to_flatcolumn(self) -> "FlatColumn":
271347
aliases=self.aliases,
272348
identity=self.identity,
273349
type=self.type,
350+
subtype=self.subtype,
274351
nullable=self.nullable,
275352
scale=self.scale,
276353
precision=self.precision,

orso/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13-
__version__: str = "0.0.187"
13+
__version__: str = "0.0.188"
1414
__author__: str = "@joocer"

tests/test_schema.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,49 @@ def test_parsers():
335335
assert isinstance(parsed, str), type(parsed)
336336
assert parsed == "1718530754", parsed
337337

338-
338+
def test_type_name_parsing():
339+
_type = FlatColumn(name="col", type="INTEGER")
340+
assert _type.type == OrsoTypes.INTEGER, _type.type
341+
_type = FlatColumn(name="col", type="VARCHAR")
342+
assert _type.type == OrsoTypes.VARCHAR, _type.type
343+
assert _type.length is None
344+
_type = FlatColumn(name="col", type="BLOB")
345+
assert _type.type == OrsoTypes.BLOB, _type.type
346+
assert _type.length is None
347+
_type = FlatColumn(name="col", type="DOUBLE")
348+
assert _type.type == OrsoTypes.DOUBLE, _type.type
349+
_type = FlatColumn(name="col", type="DECIMAL")
350+
assert _type.type == OrsoTypes.DECIMAL, _type.type
351+
_type = FlatColumn(name="col", type="BOOLEAN")
352+
assert _type.type == OrsoTypes.BOOLEAN, _type.type
353+
_type = FlatColumn(name="col", type="TIMESTAMP")
354+
assert _type.type == OrsoTypes.TIMESTAMP, _type.type
355+
_type = FlatColumn(name="col", type="ARRAY")
356+
assert _type.type == OrsoTypes.ARRAY, _type.type
357+
assert _type.subtype == OrsoTypes.VARCHAR, _type.subtype
358+
_type = FlatColumn(name="col", type="ARRAY<INTEGER>")
359+
assert _type.type == OrsoTypes.ARRAY, _type.type
360+
assert _type.subtype == OrsoTypes.INTEGER, _type.subtype
361+
_type = FlatColumn(name="col", type="ARRAY<VARCHAR>")
362+
assert _type.type == OrsoTypes.ARRAY, _type.type
363+
assert _type.subtype == OrsoTypes.VARCHAR, _type.subtype
364+
with pytest.raises(ValueError):
365+
_type = FlatColumn(name="col", type="ARRAY<A")
366+
with pytest.raises(ValueError):
367+
_type = FlatColumn(name="col", type="ARRAY<BIT>")
368+
with pytest.raises(ValueError):
369+
_type = FlatColumn(name="col", type="ARRAY<ARRAY>")
370+
_type = FlatColumn(name="col", type="DECIMAL(10,2)")
371+
assert _type.type == OrsoTypes.DECIMAL, _type.type
372+
assert _type.precision == 10, _type.precision
373+
assert _type.scale == 2, _type.scale
374+
_type = FlatColumn(name="col", type="VARCHAR[12]")
375+
assert _type.type == OrsoTypes.VARCHAR, _type.type
376+
assert _type.length == 12, _type.length
377+
_type = FlatColumn(name="col", type="BLOB[12]")
378+
assert _type.type == OrsoTypes.BLOB, _type.type
379+
assert _type.length == 12, _type.length
380+
339381
if __name__ == "__main__": # prgama: nocover
340382
from tests import run_tests
341383

tests/test_schema_columns.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_column_type_mapping():
135135
assert fc.type == OrsoTypes.DOUBLE, fc.type
136136

137137
fc = FlatColumn(name="athled", type=0)
138-
assert fc.type == 0
138+
assert fc.type == 0, fc.type
139139

140140
with pytest.raises(ValueError):
141141
FlatColumn(name="able", type="LEFT")

0 commit comments

Comments
 (0)