@@ -31,7 +31,7 @@ from pyarrow.lib import (
31
31
32
32
from pymongoarrow.errors import InvalidBSON, PyMongoArrowError
33
33
from pymongoarrow.context import PyMongoArrowContext
34
- from pymongoarrow.types import _BsonArrowTypes, _atypes, ObjectIdType, Decimal128StringType
34
+ from pymongoarrow.types import _BsonArrowTypes, _atypes, ObjectIdType, Decimal128StringType, BinaryType
35
35
36
36
# Cython imports
37
37
from cpython cimport PyBytes_Size, object
@@ -68,6 +68,7 @@ _builder_type_map = {
68
68
BSON_TYPE_DOCUMENT: DocumentBuilder,
69
69
BSON_TYPE_DECIMAL128: StringBuilder,
70
70
BSON_TYPE_ARRAY: ListBuilder,
71
+ BSON_TYPE_BINARY: BinaryBuilder
71
72
}
72
73
73
74
_field_type_map = {
@@ -82,6 +83,10 @@ _field_type_map = {
82
83
83
84
cdef extract_field_dtype(bson_iter_t * doc_iter, bson_iter_t * child_iter, bson_type_t value_t, context):
84
85
""" Get the appropropriate data type for a specific field"""
86
+ cdef const uint8_t * val_buf = NULL
87
+ cdef uint32_t val_buf_len = 0
88
+ cdef bson_subtype_t subtype
89
+
85
90
if value_t in _field_type_map:
86
91
field_type = _field_type_map[value_t]
87
92
elif value_t == BSON_TYPE_ARRAY:
@@ -93,6 +98,9 @@ cdef extract_field_dtype(bson_iter_t * doc_iter, bson_iter_t * child_iter, bson_
93
98
field_type = extract_document_dtype(child_iter, context)
94
99
elif value_t == BSON_TYPE_DATE_TIME:
95
100
field_type = timestamp(' ms' , tz = context.tzinfo)
101
+ elif value_t == BSON_TYPE_BINARY:
102
+ bson_iter_binary (doc_iter, & subtype, & val_buf_len, & val_buf)
103
+ field_type = BinaryType(subtype)
96
104
else :
97
105
raise PyMongoArrowError(' unknown value type {}' .format(value_t))
98
106
return field_type
@@ -129,10 +137,8 @@ def process_bson_stream(bson_stream, context, arr_value_builder=None):
129
137
cdef char * decimal128_str = < char * > malloc(
130
138
BSON_DECIMAL128_STRING * sizeof(char ))
131
139
cdef uint32_t str_len
132
- cdef const uint8_t * doc_buf = NULL
133
- cdef uint32_t doc_buf_len = 0 ;
134
- cdef const uint8_t * arr_buf = NULL
135
- cdef uint32_t arr_buf_len = 0 ;
140
+ cdef const uint8_t * val_buf = NULL
141
+ cdef uint32_t val_buf_len = 0
136
142
cdef bson_decimal128_t dec128
137
143
cdef bson_type_t value_t
138
144
cdef const char * bson_str
@@ -142,6 +148,7 @@ def process_bson_stream(bson_stream, context, arr_value_builder=None):
142
148
cdef bson_iter_t child_iter
143
149
cdef const char * key
144
150
cdef Py_ssize_t count = 0
151
+ cdef bson_subtype_t subtype
145
152
146
153
builder_map = context.builder_map
147
154
@@ -155,6 +162,7 @@ def process_bson_stream(bson_stream, context, arr_value_builder=None):
155
162
t_bool = _BsonArrowTypes.bool
156
163
t_document = _BsonArrowTypes.document
157
164
t_array = _BsonArrowTypes.array
165
+ t_binary = _BsonArrowTypes.binary
158
166
159
167
160
168
# initialize count to current length of builders
@@ -197,6 +205,10 @@ def process_bson_stream(bson_stream, context, arr_value_builder=None):
197
205
list_dtype = extract_array_dtype(& child_iter, context)
198
206
list_dtype = list_(list_dtype)
199
207
builder = ListBuilder(list_dtype, context.tzinfo, value_builder = arr_value_builder)
208
+ elif builder_type == BinaryBuilder:
209
+ bson_iter_binary (& doc_iter, & subtype,
210
+ & val_buf_len, & val_buf)
211
+ builder = BinaryBuilder(subtype)
200
212
else :
201
213
builder = builder_type()
202
214
if arr_value_builder is None :
@@ -257,20 +269,28 @@ def process_bson_stream(bson_stream, context, arr_value_builder=None):
257
269
builder.append_null()
258
270
elif ftype == t_document:
259
271
if value_t == BSON_TYPE_DOCUMENT:
260
- bson_iter_document(& doc_iter, & doc_buf_len , & doc_buf )
261
- if doc_buf_len <= 0 :
272
+ bson_iter_document(& doc_iter, & val_buf_len , & val_buf )
273
+ if val_buf_len <= 0 :
262
274
raise ValueError (" Subdocument is invalid" )
263
- builder.append(< bytes> doc_buf[:doc_buf_len ])
275
+ builder.append(< bytes> val_buf[:val_buf_len ])
264
276
else :
265
277
builder.append_null()
266
278
elif ftype == t_array:
267
279
if value_t == BSON_TYPE_ARRAY:
268
- bson_iter_array(& doc_iter, & doc_buf_len , & doc_buf )
269
- if doc_buf_len <= 0 :
280
+ bson_iter_array(& doc_iter, & val_buf_len , & val_buf )
281
+ if val_buf_len <= 0 :
270
282
raise ValueError (" Subarray is invalid" )
271
- builder.append(< bytes> doc_buf[:doc_buf_len ])
283
+ builder.append(< bytes> val_buf[:val_buf_len ])
272
284
else :
273
285
builder.append_null()
286
+ elif ftype == t_binary:
287
+ if value_t == BSON_TYPE_BINARY:
288
+ bson_iter_binary (& doc_iter, & subtype,
289
+ & val_buf_len, & val_buf)
290
+ if subtype != builder.subtype:
291
+ builder.append_null()
292
+ else :
293
+ builder.append(< bytes> val_buf[:val_buf_len])
274
294
else :
275
295
raise PyMongoArrowError(' unknown ftype {}' .format(ftype))
276
296
count += 1
@@ -534,6 +554,8 @@ cdef object get_field_builder(field, tzinfo):
534
554
field_builder = ObjectIdBuilder()
535
555
elif getattr (field_type, ' _type_marker' ) == _BsonArrowTypes.decimal128_str:
536
556
field_builder = StringBuilder()
557
+ elif getattr (field_type, ' _type_marker' ) == _BsonArrowTypes.binary:
558
+ field_builder = BinaryBuilder(field_type.subtype)
537
559
else :
538
560
field_builder = StringBuilder()
539
561
return field_builder
@@ -596,6 +618,7 @@ cdef class DocumentBuilder(_ArrayBuilderBase):
596
618
cdef shared_ptr[CStructBuilder] unwrap(self ):
597
619
return self .builder
598
620
621
+
599
622
cdef class ListBuilder(_ArrayBuilderBase):
600
623
type_marker = _BsonArrowTypes.array
601
624
@@ -647,3 +670,36 @@ cdef class ListBuilder(_ArrayBuilderBase):
647
670
648
671
cdef shared_ptr[CListBuilder] unwrap(self ):
649
672
return self .builder
673
+
674
+
675
+ cdef class BinaryBuilder(_ArrayBuilderBase):
676
+ type_marker = _BsonArrowTypes.binary
677
+ cdef:
678
+ shared_ptr[CBinaryBuilder] builder
679
+ uint8_t _subtype
680
+
681
+ def __cinit__ (self , uint8_t subtype ):
682
+ self ._subtype = subtype
683
+ self .builder.reset(new CBinaryBuilder())
684
+
685
+ cpdef append_null(self ):
686
+ self .builder.get().AppendNull()
687
+
688
+ @property
689
+ def subtype (self ):
690
+ return self ._subtype
691
+
692
+ def __len__ (self ):
693
+ return self .builder.get().length()
694
+
695
+ cpdef append(self , value):
696
+ self .builder.get().Append(< bytes> value, len (value))
697
+
698
+ cpdef finish(self ):
699
+ cdef shared_ptr[CArray] out
700
+ with nogil:
701
+ self .builder.get().Finish(& out)
702
+ return pyarrow_wrap_array(out).cast(BinaryType(self ._subtype))
703
+
704
+ cdef shared_ptr[CBinaryBuilder] unwrap(self ):
705
+ return self .builder
0 commit comments