@@ -64,6 +64,7 @@ cdef const bson_t* bson_reader_read_safe(bson_reader_t* stream_reader) except? N
6464# Placeholder numbers for the date types.
6565cdef uint8_t ARROW_TYPE_DATE32 = 100
6666cdef uint8_t ARROW_TYPE_DATE64 = 101
67+ cdef uint8_t ARROW_TYPE_NULL = 102
6768
6869_builder_type_map = {
6970 BSON_TYPE_INT32: Int32Builder,
@@ -80,6 +81,7 @@ _builder_type_map = {
8081 BSON_TYPE_CODE: CodeBuilder,
8182 ARROW_TYPE_DATE32: Date32Builder,
8283 ARROW_TYPE_DATE64: Date64Builder,
84+ ARROW_TYPE_NULL: NullBuilder
8385}
8486
8587_field_type_map = {
@@ -177,6 +179,7 @@ cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, obje
177179 cdef Py_ssize_t count = 0
178180 cdef uint8_t byte_order_status = 0
179181 cdef map [cstring, void * ] builder_map
182+ cdef map [cstring, void * ] missing_builders
180183 cdef map [cstring, void * ].iterator it
181184 cdef bson_subtype_t subtype
182185 cdef int32_t val32
@@ -197,6 +200,7 @@ cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, obje
197200 cdef DocumentBuilder doc_builder
198201 cdef Date32Builder date32_builder
199202 cdef Date64Builder date64_builder
203+ cdef NullBuilder null_builder
200204
201205 # Build up a map of the builders.
202206 for key, value in context.builder_map.items():
@@ -219,10 +223,6 @@ cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, obje
219223 builder = None
220224 if arr_value_builder is not None :
221225 builder = arr_value_builder
222- else :
223- it = builder_map.find(key)
224- if it != builder_map.end():
225- builder = < _ArrayBuilderBase> builder_map[key]
226226
227227 if builder is None :
228228 it = builder_map.find(key)
@@ -233,9 +233,16 @@ cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, obje
233233 # Get the appropriate builder for the current field.
234234 value_t = bson_iter_type(& doc_iter)
235235 builder_type = _builder_type_map.get(value_t)
236+
237+ # Keep the key in missing builders until we find it.
236238 if builder_type is None :
239+ missing_builders[key] = < void * > None
237240 continue
238241
242+ it = missing_builders.find(key)
243+ if it != builder_map.end():
244+ missing_builders.erase(key)
245+
239246 # Handle the parameterized builders.
240247 if builder_type == DatetimeBuilder and context.tzinfo is not None :
241248 arrow_type = timestamp(' ms' , tz = context.tzinfo)
@@ -410,6 +417,9 @@ cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, obje
410417 binary_builder.append_null()
411418 else :
412419 binary_builder.append_raw(< char * > val_buf, val_buf_len)
420+ elif ftype == ARROW_TYPE_NULL:
421+ null_builder = builder
422+ null_builder.append_null()
413423 else :
414424 raise PyMongoArrowError(' unknown ftype {}' .format(ftype))
415425
@@ -422,6 +432,17 @@ cdef void process_raw_bson_stream(const uint8_t * docstream, size_t length, obje
422432 if len (builder) != count:
423433 builder.append_null()
424434 preincrement(it)
435+
436+ # Any missing fields that are left must be null fields.
437+ it = missing_builders.begin()
438+ while it != missing_builders.end():
439+ builder = NullBuilder()
440+ context.builder_map[key] = builder
441+ null_builder = builder
442+ for _ in range (count):
443+ null_builder.append_null()
444+ preincrement(it)
445+
425446 finally :
426447 bson_reader_destroy(stream_reader)
427448
@@ -724,6 +745,37 @@ cdef class Date32Builder(_ArrayBuilderBase):
724745 return self .builder
725746
726747
748+ cdef class NullBuilder(_ArrayBuilderBase):
749+ cdef:
750+ shared_ptr[CNullBuilder] builder
751+
752+ def __cinit__ (self , MemoryPool memory_pool = None ):
753+ cdef CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
754+ self .builder.reset(new CNullBuilder(pool))
755+ self .type_marker = ARROW_TYPE_NULL
756+
757+ cdef append_raw(self , void * value):
758+ self .builder.get().AppendNull()
759+
760+ cpdef append(self , value):
761+ self .builder.get().AppendNull()
762+
763+ cpdef append_null(self ):
764+ self .builder.get().AppendNull()
765+
766+ def __len__ (self ):
767+ return self .builder.get().length()
768+
769+ cpdef finish(self ):
770+ cdef shared_ptr[CArray] out
771+ with nogil:
772+ self .builder.get().Finish(& out)
773+ return pyarrow_wrap_array(out)
774+
775+ cdef shared_ptr[CNullBuilder] unwrap(self ):
776+ return self .builder
777+
778+
727779cdef class BoolBuilder(_ArrayBuilderBase):
728780 cdef:
729781 shared_ptr[CBooleanBuilder] builder
@@ -817,6 +869,8 @@ cdef object get_field_builder(object field, object tzinfo):
817869 field_builder = ListBuilder(field_type, tzinfo)
818870 elif _atypes.is_large_list(field_type):
819871 field_builder = ListBuilder(field_type, tzinfo)
872+ elif _atypes.is_null(field_type):
873+ field_builder = NullBuilder()
820874 elif getattr (field_type, ' _type_marker' ) == _BsonArrowTypes.objectid:
821875 field_builder = ObjectIdBuilder()
822876 elif getattr (field_type, ' _type_marker' ) == _BsonArrowTypes.decimal128:
0 commit comments