@@ -22,7 +22,7 @@ import sys
2222# Python imports
2323import bson
2424import numpy as np
25- from pyarrow import timestamp
25+ from pyarrow import timestamp, default_memory_pool
2626
2727from pymongoarrow.errors import InvalidBSON
2828from pymongoarrow.types import ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType
@@ -62,22 +62,24 @@ cdef class BuilderManager:
6262 uint64_t count
6363 bint has_schema
6464 object tzinfo
65+ public object pool
6566
6667 def __cinit__ (self , dict schema_map , bint has_schema , object tzinfo ):
6768 self .has_schema = has_schema
6869 self .tzinfo = tzinfo
6970 self .count = 0
7071 self .builder_map = {}
72+ self .pool = default_memory_pool()
7173 # Unpack the schema map.
7274 for fname, (ftype, arrow_type) in schema_map.items():
7375 name = fname.encode(' utf-8' )
7476 # special-case initializing builders for parameterized types
7577 if ftype == BSON_TYPE_DATE_TIME:
7678 if tzinfo is not None and arrow_type.tz is None :
7779 arrow_type = timestamp(arrow_type.unit, tz = tzinfo) # noqa: PLW2901
78- self .builder_map[name] = DatetimeBuilder(dtype = arrow_type)
80+ self .builder_map[name] = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
7981 elif ftype == BSON_TYPE_BINARY:
80- self .builder_map[name] = BinaryBuilder(arrow_type.subtype)
82+ self .builder_map[name] = BinaryBuilder(arrow_type.subtype, memory_pool = self .pool )
8183 else :
8284 # We only use the doc_iter for binary arrays, which are handled already.
8385 self .get_builder(name, ftype, < bson_iter_t * > nullptr)
@@ -100,39 +102,39 @@ cdef class BuilderManager:
100102 if value_t == BSON_TYPE_DATE_TIME:
101103 if self .tzinfo is not None :
102104 arrow_type = timestamp(' ms' , tz = self .tzinfo)
103- builder = DatetimeBuilder(dtype = arrow_type)
105+ builder = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
104106 else :
105- builder = DatetimeBuilder()
107+ builder = DatetimeBuilder(memory_pool = self .pool )
106108 elif value_t == BSON_TYPE_DOCUMENT:
107109 builder = DocumentBuilder()
108110 elif value_t == BSON_TYPE_ARRAY:
109- builder = ListBuilder()
111+ builder = ListBuilder(memory_pool = self .pool )
110112 elif value_t == BSON_TYPE_BINARY:
111113 if doc_iter == NULL :
112114 raise ValueError (' Did not pass a doc_iter!' )
113115 bson_iter_binary (doc_iter, & subtype,
114116 & val_buf_len, & val_buf)
115- builder = BinaryBuilder(subtype)
117+ builder = BinaryBuilder(subtype, memory_pool = self .pool )
116118 elif value_t == ARROW_TYPE_DATE32:
117- builder = Date32Builder()
119+ builder = Date32Builder(memory_pool = self .pool )
118120 elif value_t == ARROW_TYPE_DATE64:
119- builder = Date64Builder()
121+ builder = Date64Builder(memory_pool = self .pool )
120122 elif value_t == BSON_TYPE_INT32:
121- builder = Int32Builder()
123+ builder = Int32Builder(memory_pool = self .pool )
122124 elif value_t == BSON_TYPE_INT64:
123- builder = Int64Builder()
125+ builder = Int64Builder(memory_pool = self .pool )
124126 elif value_t == BSON_TYPE_DOUBLE:
125- builder = DoubleBuilder()
127+ builder = DoubleBuilder(memory_pool = self .pool )
126128 elif value_t == BSON_TYPE_OID:
127- builder = ObjectIdBuilder()
129+ builder = ObjectIdBuilder(memory_pool = self .pool )
128130 elif value_t == BSON_TYPE_UTF8:
129- builder = StringBuilder()
131+ builder = StringBuilder(memory_pool = self .pool )
130132 elif value_t == BSON_TYPE_BOOL:
131- builder = BoolBuilder()
133+ builder = BoolBuilder(memory_pool = self .pool )
132134 elif value_t == BSON_TYPE_DECIMAL128:
133- builder = Decimal128Builder()
135+ builder = Decimal128Builder(memory_pool = self .pool )
134136 elif value_t == BSON_TYPE_CODE:
135- builder = CodeBuilder()
137+ builder = CodeBuilder(memory_pool = self .pool )
136138
137139 self .builder_map[key] = builder
138140 return builder
@@ -175,12 +177,15 @@ cdef class BuilderManager:
175177 # For lists, the nulls are stored in the parent.
176178 if parent_type != BSON_TYPE_ARRAY:
177179 if count > builder.length():
178- builder.append_nulls(count - builder.length())
180+ for _ in range (count - builder.length()):
181+ status = builder.append_null_raw()
182+ if not status.ok():
183+ raise ValueError (" Could not append nulls to" , full_key)
179184
180185 # Append the next value.
181186 status = builder.append_raw(doc_iter, value_t)
182187 if not status.ok():
183- raise ValueError (" Could not append raw value" )
188+ raise ValueError (" Could not append raw value to " , full_key, type (builder), self .count )
184189
185190 # Recurse into documents.
186191 if value_t == BSON_TYPE_DOCUMENT:
@@ -196,9 +201,6 @@ cdef class BuilderManager:
196201 if parent_type == BSON_TYPE_ARRAY:
197202 (< ListBuilder> self .builder_map[base_key]).append_count()
198203
199- # Update our count for top level documents.
200- if parent_type == 0 :
201- self .count += 1
202204
203205 cpdef void process_bson_stream(self , const uint8_t* bson_stream, size_t length):
204206 """ Process a bson byte stream."""
@@ -212,6 +214,7 @@ cdef class BuilderManager:
212214 break
213215 if not bson_iter_init(& doc_iter, doc):
214216 raise InvalidBSON(" Could not read BSON document" )
217+ self .count += 1
215218 self .parse_document(& doc_iter, b" " , 0 )
216219 finally :
217220 bson_reader_destroy(stream_reader)
@@ -221,6 +224,7 @@ cdef class BuilderManager:
221224 cdef dict return_map = {}
222225 cdef bytes key
223226 cdef str field
227+ cdef CStatus status
224228 cdef _ArrayBuilderBase value
225229
226230 # Move the builders to a new dict with string keys.
@@ -230,15 +234,17 @@ cdef class BuilderManager:
230234 # Insert null fields.
231235 for field in list (return_map):
232236 if return_map[field] is None :
233- return_map[field] = NullBuilder(self .count )
237+ return_map[field] = NullBuilder()
234238
235239 # Pad fields as needed.
236240 for field, value in return_map.items():
237241 # If it isn't a list item, append nulls as needed.
238242 # For lists, the nulls are stored in the parent.
239243 if not field.endswith(' []' ):
240244 if value.length() < self .count:
241- value.append_nulls(self .count - value.length())
245+ status = value.append_null_raw()
246+ if not status.ok():
247+ raise ValueError (" Could not append nulls to" , field)
242248
243249 return return_map
244250
@@ -250,9 +256,11 @@ cdef class _ArrayBuilderBase:
250256 def append_values (self , values ):
251257 for value in values:
252258 if value is None or value is np.nan:
253- self .append_null()
259+ status = self .append_null()
254260 else :
255- self .append(value)
261+ status = self .append(value)
262+ if not status.ok():
263+ raise ValueError (" Failed to append value" )
256264
257265 def append (self , value ):
258266 """ Interface to append a python value to the builder.
@@ -284,12 +292,13 @@ cdef class _ArrayBuilderBase:
284292 def __len__ (self ):
285293 return self .length()
286294
287- cpdef void append_null(self ):
288- self .get_builder().get().AppendNull()
295+ def append_null (self ):
296+ cdef CStatus status = self .append_null_raw()
297+ if not status.ok():
298+ raise ValueError (" Could not append null" )
289299
290- cpdef void append_nulls(self , uint64_t count):
291- for _ in range (count):
292- self .append_null()
300+ cdef CStatus append_null_raw(self ):
301+ return self .get_builder().get().AppendNull()
293302
294303 cpdef uint64_t length(self ):
295304 return self .get_builder().get().length()
@@ -529,13 +538,10 @@ cdef class Date32Builder(_ArrayBuilderBase):
529538cdef class NullBuilder(_ArrayBuilderBase):
530539 cdef shared_ptr[CArrayBuilder] builder
531540
532- def __cinit__ (self , uint64_t count , MemoryPool memory_pool = None ):
541+ def __cinit__ (self , MemoryPool memory_pool = None ):
533542 cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
534- cdef uint64_t i
535543 self .builder.reset(new CNullBuilder(pool))
536544 self .type_marker = ARROW_TYPE_NULL
537- for i in range (count):
538- self .append_null()
539545
540546 cdef CStatus append_raw(self , bson_iter_t * doc_iter, bson_type_t value_t):
541547 return self .builder.get().AppendNull()
@@ -601,9 +607,10 @@ cdef class BinaryBuilder(_ArrayBuilderBase):
601607 uint8_t _subtype
602608 shared_ptr[CBinaryBuilder] builder
603609
604- def __cinit__ (self , uint8_t subtype ):
610+ def __cinit__ (self , uint8_t subtype , MemoryPool memory_pool = None ):
605611 self ._subtype = subtype
606- self .builder.reset(new CBinaryBuilder())
612+ cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
613+ self .builder.reset(new CBinaryBuilder(pool))
607614 self .type_marker = BSON_TYPE_BINARY
608615
609616 @property
@@ -646,8 +653,9 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
646653 cpdef uint64_t length(self ):
647654 return self .count
648655
649- cpdef void append_null (self ):
656+ cdef CStatus append_null_raw (self ):
650657 self .count += 1
658+ return CStatus_OK()
651659
652660 cpdef void add_field(self , cstring field_name):
653661 self .field_map[field_name] = 1
@@ -675,8 +683,8 @@ cdef class ListBuilder(_ArrayBuilderBase):
675683 cpdef void append_count(self ):
676684 self .count += 1
677685
678- cpdef void append_null (self ):
679- self .builder.get().Append(self .count)
686+ cdef CStatus append_null_raw (self ):
687+ return self .builder.get().Append(self .count)
680688
681689 cdef shared_ptr[CArrayBuilder] get_builder(self ):
682690 return < shared_ptr[CArrayBuilder]> self .builder
0 commit comments