@@ -22,7 +22,7 @@ import sys
2222# Python imports
2323import bson
2424import numpy as np
25- from pyarrow import timestamp, default_memory_pool
25+ from pyarrow import timestamp
2626
2727from pymongoarrow.errors import InvalidBSON
2828from pymongoarrow.types import ObjectIdType, Decimal128Type as Decimal128Type_, BinaryType, CodeType
@@ -62,24 +62,22 @@ cdef class BuilderManager:
6262 uint64_t count
6363 bint has_schema
6464 object tzinfo
65- public object pool
6665
6766 def __cinit__ (self , dict schema_map , bint has_schema , object tzinfo ):
6867 self .has_schema = has_schema
6968 self .tzinfo = tzinfo
7069 self .count = 0
7170 self .builder_map = {}
72- self .pool = default_memory_pool()
7371 # Unpack the schema map.
7472 for fname, (ftype, arrow_type) in schema_map.items():
7573 name = fname.encode(' utf-8' )
7674 # special-case initializing builders for parameterized types
7775 if ftype == BSON_TYPE_DATE_TIME:
7876 if tzinfo is not None and arrow_type.tz is None :
7977 arrow_type = timestamp(arrow_type.unit, tz = tzinfo) # noqa: PLW2901
80- self .builder_map[name] = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
78+ self .builder_map[name] = DatetimeBuilder(dtype = arrow_type)
8179 elif ftype == BSON_TYPE_BINARY:
82- self .builder_map[name] = BinaryBuilder(arrow_type.subtype, memory_pool = self .pool )
80+ self .builder_map[name] = BinaryBuilder(arrow_type.subtype)
8381 else :
8482 # We only use the doc_iter for binary arrays, which are handled already.
8583 self .get_builder(name, ftype, < bson_iter_t * > nullptr)
@@ -102,39 +100,39 @@ cdef class BuilderManager:
102100 if value_t == BSON_TYPE_DATE_TIME:
103101 if self .tzinfo is not None :
104102 arrow_type = timestamp(' ms' , tz = self .tzinfo)
105- builder = DatetimeBuilder(dtype = arrow_type, memory_pool = self .pool )
103+ builder = DatetimeBuilder(dtype = arrow_type)
106104 else :
107- builder = DatetimeBuilder(memory_pool = self .pool )
105+ builder = DatetimeBuilder()
108106 elif value_t == BSON_TYPE_DOCUMENT:
109107 builder = DocumentBuilder()
110108 elif value_t == BSON_TYPE_ARRAY:
111- builder = ListBuilder(memory_pool = self .pool )
109+ builder = ListBuilder()
112110 elif value_t == BSON_TYPE_BINARY:
113111 if doc_iter == NULL :
114112 raise ValueError (' Did not pass a doc_iter!' )
115113 bson_iter_binary (doc_iter, & subtype,
116114 & val_buf_len, & val_buf)
117- builder = BinaryBuilder(subtype, memory_pool = self .pool )
115+ builder = BinaryBuilder(subtype)
118116 elif value_t == ARROW_TYPE_DATE32:
119- builder = Date32Builder(memory_pool = self .pool )
117+ builder = Date32Builder()
120118 elif value_t == ARROW_TYPE_DATE64:
121- builder = Date64Builder(memory_pool = self .pool )
119+ builder = Date64Builder()
122120 elif value_t == BSON_TYPE_INT32:
123- builder = Int32Builder(memory_pool = self .pool )
121+ builder = Int32Builder()
124122 elif value_t == BSON_TYPE_INT64:
125- builder = Int64Builder(memory_pool = self .pool )
123+ builder = Int64Builder()
126124 elif value_t == BSON_TYPE_DOUBLE:
127- builder = DoubleBuilder(memory_pool = self .pool )
125+ builder = DoubleBuilder()
128126 elif value_t == BSON_TYPE_OID:
129- builder = ObjectIdBuilder(memory_pool = self .pool )
127+ builder = ObjectIdBuilder()
130128 elif value_t == BSON_TYPE_UTF8:
131- builder = StringBuilder(memory_pool = self .pool )
129+ builder = StringBuilder()
132130 elif value_t == BSON_TYPE_BOOL:
133- builder = BoolBuilder(memory_pool = self .pool )
131+ builder = BoolBuilder()
134132 elif value_t == BSON_TYPE_DECIMAL128:
135- builder = Decimal128Builder(memory_pool = self .pool )
133+ builder = Decimal128Builder()
136134 elif value_t == BSON_TYPE_CODE:
137- builder = CodeBuilder(memory_pool = self .pool )
135+ builder = CodeBuilder()
138136
139137 self .builder_map[key] = builder
140138 return builder
@@ -177,15 +175,12 @@ cdef class BuilderManager:
177175 # For lists, the nulls are stored in the parent.
178176 if parent_type != BSON_TYPE_ARRAY:
179177 if count > builder.length():
180- for _ in range (count - builder.length()):
181- status = builder.append_null_raw()
182- if not status.ok():
183- raise ValueError (" Could not append nulls to" , full_key)
178+ builder.append_nulls(count - builder.length())
184179
185180 # Append the next value.
186181 status = builder.append_raw(doc_iter, value_t)
187182 if not status.ok():
188- raise ValueError (" Could not append raw value to " , full_key, type (builder), self .count )
183+ raise ValueError (" Could not append raw value" )
189184
190185 # Recurse into documents.
191186 if value_t == BSON_TYPE_DOCUMENT:
@@ -201,6 +196,9 @@ cdef class BuilderManager:
201196 if parent_type == BSON_TYPE_ARRAY:
202197 (< ListBuilder> self .builder_map[base_key]).append_count()
203198
199+ # Update our count for top level documents.
200+ if parent_type == 0 :
201+ self .count += 1
204202
205203 cpdef void process_bson_stream(self , const uint8_t* bson_stream, size_t length):
206204 """ Process a bson byte stream."""
@@ -214,7 +212,6 @@ cdef class BuilderManager:
214212 break
215213 if not bson_iter_init(& doc_iter, doc):
216214 raise InvalidBSON(" Could not read BSON document" )
217- self .count += 1
218215 self .parse_document(& doc_iter, b" " , 0 )
219216 finally :
220217 bson_reader_destroy(stream_reader)
@@ -224,7 +221,6 @@ cdef class BuilderManager:
224221 cdef dict return_map = {}
225222 cdef bytes key
226223 cdef str field
227- cdef CStatus status
228224 cdef _ArrayBuilderBase value
229225
230226 # Move the builders to a new dict with string keys.
@@ -234,17 +230,15 @@ cdef class BuilderManager:
234230 # Insert null fields.
235231 for field in list (return_map):
236232 if return_map[field] is None :
237- return_map[field] = NullBuilder()
233+ return_map[field] = NullBuilder(self .count )
238234
239235 # Pad fields as needed.
240236 for field, value in return_map.items():
241237 # If it isn't a list item, append nulls as needed.
242238 # For lists, the nulls are stored in the parent.
243239 if not field.endswith(' []' ):
244240 if value.length() < self .count:
245- status = value.append_null_raw()
246- if not status.ok():
247- raise ValueError (" Could not append nulls to" , field)
241+ value.append_nulls(self .count - value.length())
248242
249243 return return_map
250244
@@ -256,11 +250,9 @@ cdef class _ArrayBuilderBase:
256250 def append_values (self , values ):
257251 for value in values:
258252 if value is None or value is np.nan:
259- status = self .append_null()
253+ self .append_null()
260254 else :
261- status = self .append(value)
262- if not status.ok():
263- raise ValueError (" Failed to append value" )
255+ self .append(value)
264256
265257 def append (self , value ):
266258 """ Interface to append a python value to the builder.
@@ -292,13 +284,12 @@ cdef class _ArrayBuilderBase:
292284 def __len__ (self ):
293285 return self .length()
294286
295- def append_null (self ):
296- cdef CStatus status = self .append_null_raw()
297- if not status.ok():
298- raise ValueError (" Could not append null" )
287+ cpdef void append_null(self ):
288+ self .get_builder().get().AppendNull()
299289
300- cdef CStatus append_null_raw(self ):
301- return self .get_builder().get().AppendNull()
290+ cpdef void append_nulls(self , uint64_t count):
291+ for _ in range (count):
292+ self .append_null()
302293
303294 cpdef uint64_t length(self ):
304295 return self .get_builder().get().length()
@@ -538,10 +529,13 @@ cdef class Date32Builder(_ArrayBuilderBase):
538529cdef class NullBuilder(_ArrayBuilderBase):
539530 cdef shared_ptr[CArrayBuilder] builder
540531
541- def __cinit__ (self , MemoryPool memory_pool = None ):
532+ def __cinit__ (self , uint64_t count , MemoryPool memory_pool = None ):
542533 cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
534+ cdef uint64_t i
543535 self .builder.reset(new CNullBuilder(pool))
544536 self .type_marker = ARROW_TYPE_NULL
537+ for i in range (count):
538+ self .append_null()
545539
546540 cdef CStatus append_raw(self , bson_iter_t * doc_iter, bson_type_t value_t):
547541 return self .builder.get().AppendNull()
@@ -607,10 +601,9 @@ cdef class BinaryBuilder(_ArrayBuilderBase):
607601 uint8_t _subtype
608602 shared_ptr[CBinaryBuilder] builder
609603
610- def __cinit__ (self , uint8_t subtype , MemoryPool memory_pool = None ):
604+ def __cinit__ (self , uint8_t subtype ):
611605 self ._subtype = subtype
612- cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
613- self .builder.reset(new CBinaryBuilder(pool))
606+ self .builder.reset(new CBinaryBuilder())
614607 self .type_marker = BSON_TYPE_BINARY
615608
616609 @property
@@ -653,9 +646,8 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
653646 cpdef uint64_t length(self ):
654647 return self .count
655648
656- cdef CStatus append_null_raw (self ):
649+ cpdef void append_null (self ):
657650 self .count += 1
658- return CStatus_OK()
659651
660652 cpdef void add_field(self , cstring field_name):
661653 self .field_map[field_name] = 1
@@ -683,8 +675,8 @@ cdef class ListBuilder(_ArrayBuilderBase):
683675 cpdef void append_count(self ):
684676 self .count += 1
685677
686- cdef CStatus append_null_raw (self ):
687- return self .builder.get().Append(self .count)
678+ cpdef void append_null (self ):
679+ self .builder.get().Append(self .count)
688680
689681 cdef shared_ptr[CArrayBuilder] get_builder(self ):
690682 return < shared_ptr[CArrayBuilder]> self .builder
0 commit comments