@@ -60,40 +60,83 @@ def __init__(self, iterable=None, buffer_size=4):
60
60
self ._is_view = True
61
61
return
62
62
63
- # Add elements of the iterable.
63
+ try :
64
+ # If possible try pre-allocating memory.
65
+ if len (iterable ) > 0 :
66
+ first_element = np .asarray (iterable [0 ])
67
+ n_elements = np .sum ([len (iterable [i ])
68
+ for i in range (len (iterable ))])
69
+ new_shape = (n_elements ,) + first_element .shape [1 :]
70
+ self ._data = np .empty (new_shape , dtype = first_element .dtype )
71
+ except TypeError :
72
+ pass
73
+
74
+ # Initialize the `ArraySequence` object from iterable's item.
75
+ coroutine = self ._extend_using_coroutine ()
76
+ coroutine .send (None ) # Run until the first yield.
77
+
78
+ for e in iterable :
79
+ coroutine .send (e )
80
+
81
+ coroutine .close () # Terminate coroutine.
82
+
83
+ def _extend_using_coroutine (self , buffer_size = 4 ):
84
+ """ Creates a coroutine allowing to append elements.
85
+
86
+ Parameters
87
+ ----------
88
+ buffer_size : float, optional
89
+ Size (in Mb) for memory pre-allocation.
90
+
91
+ Returns
92
+ -------
93
+ coroutine
94
+ Coroutine object which expects the values to be appended to this
95
+ array sequence.
96
+
97
+ Notes
98
+ -----
99
+ This method is essential for
100
+ :func:`create_arraysequences_from_generator` as it allows for an
101
+ efficient way of creating multiple array sequences in a hyperthreaded
102
+ fashion and still benefit from the memory buffering. Whitout this
103
+ method the alternative would be to use :meth:`append` which does
104
+ not have such buffering mechanism and thus is at least one order of
105
+ magnitude slower.
106
+ """
64
107
offsets = []
65
108
lengths = []
66
- # Initialize the `ArraySequence` object from iterable's item.
67
- offset = 0
68
- for i , e in enumerate (iterable ):
69
- e = np .asarray (e )
70
- if i == 0 :
71
- try :
72
- n_elements = np .sum ([len (iterable [i ])
73
- for i in range (len (iterable ))])
74
- new_shape = (n_elements ,) + e .shape [1 :]
75
- except TypeError :
76
- # Can't get the number of elements in iterable. So,
77
- # we use a memory buffer while building the ArraySequence.
109
+
110
+ offset = 0 if len (self ) == 0 else self ._offsets [- 1 ] + self ._lengths [- 1 ]
111
+ try :
112
+ first_element = True
113
+ while True :
114
+ e = (yield )
115
+ e = np .asarray (e )
116
+ if first_element :
117
+ first_element = False
78
118
n_rows_buffer = int (buffer_size * 1024 ** 2 // e .nbytes )
79
119
new_shape = (n_rows_buffer ,) + e .shape [1 :]
120
+ if len (self ) == 0 :
121
+ self ._data = np .empty (new_shape , dtype = e .dtype )
80
122
81
- self ._data = np .empty (new_shape , dtype = e .dtype )
123
+ end = offset + len (e )
124
+ if end > len (self ._data ):
125
+ # Resize needed, adding `len(e)` items plus some buffer.
126
+ nb_points = len (self ._data )
127
+ nb_points += len (e ) + n_rows_buffer
128
+ self ._data .resize ((nb_points ,) + self .common_shape )
82
129
83
- end = offset + len (e )
84
- if end > len (self ._data ):
85
- # Resize needed, adding `len(e)` items plus some buffer.
86
- nb_points = len (self ._data )
87
- nb_points += len (e ) + n_rows_buffer
88
- self ._data .resize ((nb_points ,) + self .common_shape )
130
+ offsets .append (offset )
131
+ lengths .append (len (e ))
132
+ self ._data [offset :offset + len (e )] = e
133
+ offset += len (e )
89
134
90
- offsets .append (offset )
91
- lengths .append (len (e ))
92
- self ._data [offset :offset + len (e )] = e
93
- offset += len (e )
135
+ except GeneratorExit :
136
+ pass
94
137
95
- self ._offsets = np .asarray ( offsets )
96
- self ._lengths = np .asarray ( lengths )
138
+ self ._offsets = np .concatenate ([ self . _offsets , offsets ], axis = 0 )
139
+ self ._lengths = np .concatenate ([ self . _lengths , lengths ], axis = 0 )
97
140
98
141
# Clear unused memory.
99
142
self ._data .resize ((offset ,) + self .common_shape )
@@ -266,13 +309,6 @@ def __getitem__(self, idx):
266
309
seq ._is_view = True
267
310
return seq
268
311
269
- # for name, slice_ in data_per_point_slice.items():
270
- # seq = ArraySequence()
271
- # seq._data = scalars._data[:, slice_]
272
- # seq._offsets = scalars._offsets
273
- # seq._lengths = scalars._lengths
274
- # tractogram.data_per_point[name] = seq
275
-
276
312
raise TypeError ("Index must be either an int, a slice, a list of int"
277
313
" or a ndarray of bool! Not " + str (type (idx )))
278
314
@@ -320,10 +356,27 @@ def load(cls, filename):
320
356
321
357
def create_arraysequences_from_generator (gen , n ):
322
358
""" Creates :class:`ArraySequence` objects from a generator yielding tuples
359
+
360
+ Parameters
361
+ ----------
362
+ gen : generator
363
+ Generator yielding a size `n` tuple containing the values to put in the
364
+ array sequences.
365
+ n : int
366
+ Number of :class:`ArraySequences` object to create.
323
367
"""
324
368
seqs = [ArraySequence () for _ in range (n )]
369
+ coroutines = [seq ._extend_using_coroutine () for seq in seqs ]
370
+
371
+ for coroutine in coroutines :
372
+ coroutine .send (None )
373
+
325
374
for data in gen :
326
- for i , seq in enumerate (seqs ):
327
- seq .append (data [i ])
375
+ for i , coroutine in enumerate (coroutines ):
376
+ if data [i ].nbytes > 0 :
377
+ coroutine .send (data [i ])
378
+
379
+ for coroutine in coroutines :
380
+ coroutine .close ()
328
381
329
382
return seqs
0 commit comments