@@ -22,7 +22,7 @@ import sys
2222# Python imports
2323import bson
2424import numpy as np
25- from pyarrow import timestamp
25+ from pyarrow import timestamp, default_memory_pool
2626
2727from pymongoarrow.errors import InvalidBSON
2828from pymongoarrow.types import ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType
@@ -62,22 +62,24 @@ cdef class BuilderManager:
6262 uint64_t count
6363 bint has_schema
6464 object tzinfo
65+ object pool
6566
6667 def __cinit__ (self , dict schema_map , bint has_schema , object tzinfo ):
6768 self .has_schema = has_schema
6869 self .tzinfo = tzinfo
6970 self .count = 0
7071 self .builder_map = {}
72+ self .pool = default_memory_pool()
7173 # Unpack the schema map.
7274 for fname, (ftype, arrow_type) in schema_map.items():
7375 name = fname.encode(' utf-8' )
7476 # special-case initializing builders for parameterized types
7577 if ftype == BSON_TYPE_DATE_TIME:
7678 if tzinfo is not None and arrow_type.tz is None :
7779 arrow_type = timestamp(arrow_type.unit, tz = tzinfo) # noqa: PLW2901
78- self .builder_map[name] = DatetimeBuilder(dtype = arrow_type)
80+ self .builder_map[name] = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
7981 elif ftype == BSON_TYPE_BINARY:
80- self .builder_map[name] = BinaryBuilder(arrow_type.subtype)
82+ self .builder_map[name] = BinaryBuilder(arrow_type.subtype, memory_pool = self .pool )
8183 else :
8284 # We only use the doc_iter for binary arrays, which are handled already.
8385 self .get_builder(name, ftype, < bson_iter_t * > nullptr)
@@ -100,39 +102,39 @@ cdef class BuilderManager:
100102 if value_t == BSON_TYPE_DATE_TIME:
101103 if self .tzinfo is not None :
102104 arrow_type = timestamp(' ms' , tz = self .tzinfo)
103- builder = DatetimeBuilder(dtype = arrow_type)
105+ builder = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
104106 else :
105- builder = DatetimeBuilder()
107+ builder = DatetimeBuilder(memory_pool = self .pool )
106108 elif value_t == BSON_TYPE_DOCUMENT:
107109 builder = DocumentBuilder()
108110 elif value_t == BSON_TYPE_ARRAY:
109- builder = ListBuilder()
111+ builder = ListBuilder(memory_pool = self .pool )
110112 elif value_t == BSON_TYPE_BINARY:
111113 if doc_iter == NULL :
112114 raise ValueError (' Did not pass a doc_iter!' )
113115 bson_iter_binary (doc_iter, & subtype,
114116 & val_buf_len, & val_buf)
115- builder = BinaryBuilder(subtype)
117+ builder = BinaryBuilder(subtype, memory_pool = self .pool )
116118 elif value_t == ARROW_TYPE_DATE32:
117- builder = Date32Builder()
119+ builder = Date32Builder(memory_pool = self .pool )
118120 elif value_t == ARROW_TYPE_DATE64:
119- builder = Date64Builder()
121+ builder = Date64Builder(memory_pool = self .pool )
120122 elif value_t == BSON_TYPE_INT32:
121- builder = Int32Builder()
123+ builder = Int32Builder(memory_pool = self .pool )
122124 elif value_t == BSON_TYPE_INT64:
123- builder = Int64Builder()
125+ builder = Int64Builder(memory_pool = self .pool )
124126 elif value_t == BSON_TYPE_DOUBLE:
125- builder = DoubleBuilder()
127+ builder = DoubleBuilder(memory_pool = self .pool )
126128 elif value_t == BSON_TYPE_OID:
127- builder = ObjectIdBuilder()
129+ builder = ObjectIdBuilder(memory_pool = self .pool )
128130 elif value_t == BSON_TYPE_UTF8:
129- builder = StringBuilder()
131+ builder = StringBuilder(memory_pool = self .pool )
130132 elif value_t == BSON_TYPE_BOOL:
131- builder = BoolBuilder()
133+ builder = BoolBuilder(memory_pool = self .pool )
132134 elif value_t == BSON_TYPE_DECIMAL128:
133- builder = Decimal128Builder()
135+ builder = Decimal128Builder(memory_pool = self .pool )
134136 elif value_t == BSON_TYPE_CODE:
135- builder = CodeBuilder()
137+ builder = CodeBuilder(memory_pool = self .pool )
136138
137139 self .builder_map[key] = builder
138140 return builder
@@ -175,12 +177,14 @@ cdef class BuilderManager:
175177 # For lists, the nulls are stored in the parent.
176178 if parent_type != BSON_TYPE_ARRAY:
177179 if count > builder.length():
178- builder.append_nulls(count - builder.length())
180+ status = builder.append_nulls_raw(count - builder.length())
181+ if not status.ok():
182+ raise ValueError (" Failed to append nulls to" , full_key.decode(' utf8' ))
179183
180184 # Append the next value.
181185 status = builder.append_raw(doc_iter, value_t)
182186 if not status.ok():
183- raise ValueError (" Could not append raw value" )
187+ raise ValueError (" Could not append raw value to " , full_key.decode( ' utf8 ' ) )
184188
185189 # Recurse into documents.
186190 if value_t == BSON_TYPE_DOCUMENT:
@@ -218,6 +222,7 @@ cdef class BuilderManager:
218222 cdef dict return_map = {}
219223 cdef bytes key
220224 cdef str field
225+ cdef CStatus status
221226 cdef _ArrayBuilderBase value
222227
223228 # Move the builders to a new dict with string keys.
@@ -227,15 +232,17 @@ cdef class BuilderManager:
227232 # Insert null fields.
228233 for field in list (return_map):
229234 if return_map[field] is None :
230- return_map[field] = NullBuilder(self .count )
235+ return_map[field] = NullBuilder(memory_pool = self .pool )
231236
232237 # Pad fields as needed.
233238 for field, value in return_map.items():
234239 # If it isn't a list item, append nulls as needed.
235240 # For lists, the nulls are stored in the parent.
236241 if not field.endswith(' []' ):
237242 if value.length() < self .count:
238- value.append_nulls(self .count - value.length())
243+ status = value.append_nulls_raw(self .count - value.length())
244+ if not status.ok():
245+ raise ValueError (" Failed to append nulls to" , field)
239246
240247 return return_map
241248
@@ -281,21 +288,36 @@ cdef class _ArrayBuilderBase:
281288 def __len__ (self ):
282289 return self .length()
283290
284- cpdef void append_null(self ):
285- self .get_builder().get().AppendNull()
291+ cpdef append_null(self ):
292+ cdef CStatus status = self .append_null_raw()
293+ if not status.ok():
294+ raise ValueError (" Could not append null value" )
286295
287296 cpdef void append_nulls(self , uint64_t count):
288297 for _ in range (count):
289298 self .append_null()
290299
300+ cdef CStatus append_null_raw(self ):
301+ return self .get_builder().get().AppendNull()
302+
303+ cdef CStatus append_nulls_raw(self , uint64_t count):
304+ cdef CStatus status
305+ for _ in range (count):
306+ status = self .append_null_raw()
307+ if not status.ok():
308+ return status
309+
291310 cpdef uint64_t length(self ):
292311 return self .get_builder().get().length()
293312
294313 def finish (self ):
295314 cdef shared_ptr[CArray] out
315+ cdef CStatus status
296316 cdef shared_ptr[CArrayBuilder] builder = self .get_builder()
297317 with nogil:
298- builder.get().Finish(& out)
318+ status = builder.get().Finish(& out)
319+ if not status.ok():
320+ raise ValueError (" Failed to convert value to array" )
299321 return pyarrow_wrap_array(out)
300322
301323
@@ -526,13 +548,10 @@ cdef class Date32Builder(_ArrayBuilderBase):
526548cdef class NullBuilder(_ArrayBuilderBase):
527549 cdef shared_ptr[CArrayBuilder] builder
528550
529- def __cinit__ (self , uint64_t count , MemoryPool memory_pool = None ):
551+ def __cinit__ (self , MemoryPool memory_pool = None ):
530552 cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
531- cdef uint64_t i
532553 self .builder.reset(new CNullBuilder(pool))
533554 self .type_marker = ARROW_TYPE_NULL
534- for i in range (count):
535- self .append_null()
536555
537556 cdef CStatus append_raw(self , bson_iter_t * doc_iter, bson_type_t value_t):
538557 return self .builder.get().AppendNull()
@@ -596,11 +615,12 @@ cdef class Decimal128Builder(_ArrayBuilderBase):
596615cdef class BinaryBuilder(_ArrayBuilderBase):
597616 cdef:
598617 uint8_t _subtype
599- shared_ptr[CBinaryBuilder ] builder
618+ shared_ptr[CStringBuilder ] builder
600619
601- def __cinit__ (self , uint8_t subtype ):
620+ def __cinit__ (self , uint8_t subtype , MemoryPool memory_pool = None ):
621+ cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
602622 self ._subtype = subtype
603- self .builder.reset(new CBinaryBuilder( ))
623+ self .builder.reset(new CStringBuilder(pool ))
604624 self .type_marker = BSON_TYPE_BINARY
605625
606626 @property
@@ -630,7 +650,7 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
630650 """ The document builder stores a map of field names that can be retrieved as a set."""
631651 cdef:
632652 dict field_map
633- int32_t count
653+ int64_t count
634654
635655 def __cinit__ (self ):
636656 self .type_marker = BSON_TYPE_DOCUMENT
@@ -643,8 +663,9 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
643663 cpdef uint64_t length(self ):
644664 return self .count
645665
646- cpdef void append_null (self ):
666+ cdef CStatus append_null_raw (self ):
647667 self .count += 1
668+ return CStatus_OK()
648669
649670 cpdef void add_field(self , cstring field_name):
650671 self .field_map[field_name] = 1
@@ -657,7 +678,7 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
657678cdef class ListBuilder(_ArrayBuilderBase):
658679 """ The list builder stores an int32 list of offsets and a counter with the current value."""
659680 cdef:
660- int32_t count
681+ int64_t count
661682 shared_ptr[CInt32Builder] builder
662683
663684 def __cinit__ (self , MemoryPool memory_pool = None ):
@@ -672,8 +693,8 @@ cdef class ListBuilder(_ArrayBuilderBase):
672693 cpdef void append_count(self ):
673694 self .count += 1
674695
675- cpdef void append_null (self ):
676- self .builder.get().Append(self .count)
696+ cdef CStatus append_null_raw (self ):
697+ return self .builder.get().Append(self .count)
677698
678699 cdef shared_ptr[CArrayBuilder] get_builder(self ):
679700 return < shared_ptr[CArrayBuilder]> self .builder
0 commit comments