Skip to content

Commit 2e56215

Browse files
authored
Merge pull request #670 from tacaswell/enh_add_structured_arrays
ENH: add structured data support
2 parents f39f584 + 896c3c2 commit 2e56215

File tree

1 file changed

+157
-121
lines changed

1 file changed

+157
-121
lines changed

databroker/mongo_normalized.py

Lines changed: 157 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,23 @@
2525
ArrayStructure,
2626
ArrayMacroStructure,
2727
Kind,
28-
MachineDataType,
28+
MachineDataType as BuiltinType,
2929
)
30+
31+
from tiled.structures.structure_array import (
32+
StructDtype,
33+
ArrayTabularMacroStructure,
34+
StructuredArrayTabularStructure,
35+
)
36+
3037
from tiled.structures.xarray import (
3138
DataArrayStructure,
3239
DataArrayMacroStructure,
3340
DatasetMacroStructure,
3441
VariableStructure,
3542
VariableMacroStructure,
3643
)
44+
from tiled.trees.in_memory import Tree
3745
from tiled.query_registration import QueryTranslationRegistry
3846
from tiled.queries import FullText
3947
from tiled.utils import (
@@ -67,6 +75,146 @@
6775
logger = logging.getLogger(__name__)
6876

6977

78+
def structure_from_descriptor(descriptor, sub_dict, max_seq_num, unicode_columns=None):
79+
def _try_descr(field_metadata):
80+
if descr := field_metadata.get("dtype_descr"):
81+
if len(descr) == 1 and descr[0][0] == "":
82+
return None
83+
dtype = StructDtype.from_numpy_dtype(numpy.dtype(descr))
84+
if dtype.max_depth() > 1:
85+
raise RuntimeError(
86+
"We can not yet cope with multiple nested structured dtypes. "
87+
f"{descr}"
88+
)
89+
return dtype
90+
else:
91+
return None
92+
93+
# Build the time coordinate.
94+
time_shape = (max_seq_num - 1,)
95+
time_chunks = normalize_chunks(
96+
("auto",) * len(time_shape),
97+
shape=time_shape,
98+
limit=CHUNK_SIZE_LIMIT,
99+
dtype=FLOAT_DTYPE.to_numpy_dtype(),
100+
)
101+
time_data = ArrayStructure(
102+
macro=ArrayMacroStructure(
103+
shape=time_shape,
104+
chunks=time_chunks,
105+
),
106+
micro=FLOAT_DTYPE,
107+
)
108+
time_variable = VariableStructure(
109+
macro=VariableMacroStructure(dims=["time"], data=time_data, attrs={}),
110+
micro=None,
111+
)
112+
time_data_array = DataArrayStructure(
113+
macro=DataArrayMacroStructure(variable=time_variable, coords={}, name="time"),
114+
micro=None,
115+
)
116+
if unicode_columns is None:
117+
unicode_columns = {}
118+
dim_counter = itertools.count()
119+
base_vars = {}
120+
struct_vars = {}
121+
122+
for key, field_metadata in descriptor["data_keys"].items():
123+
# if the EventDescriptor doesn't provide names for the
124+
# dimensions (it's optional) use the same default dimension
125+
# names that xarray would.
126+
try:
127+
dims = ["time"] + field_metadata["dims"]
128+
except KeyError:
129+
ndim = len(field_metadata["shape"])
130+
dims = ["time"] + [f"dim_{next(dim_counter)}" for _ in range(ndim)]
131+
attrs = {}
132+
# Record which object (i.e. device) this column is associated with,
133+
# which enables one to find the relevant configuration, if any.
134+
for object_name, keys_ in descriptor.get("object_keys", {}).items():
135+
for item in keys_:
136+
if item == key:
137+
attrs["object"] = object_name
138+
break
139+
units = field_metadata.get("units")
140+
if units:
141+
if isinstance(units, str):
142+
attrs["units_string"] = units
143+
# TODO We may soon add a more structured units type, which
144+
# would likely be a dict here.
145+
if sub_dict == "data":
146+
shape = tuple((max_seq_num - 1, *field_metadata["shape"]))
147+
# if we have a descr, then this is a
148+
if dtype := _try_descr(field_metadata):
149+
if len(shape) > 2:
150+
raise RuntimeError(
151+
"We do not yet support general structured arrays, only 1D ones."
152+
)
153+
# if we have a detailed string, trust that
154+
elif dt_str := field_metadata.get("dtype_str"):
155+
dtype = BuiltinType.from_numpy_dtype(numpy.dtype(dt_str))
156+
# otherwise guess!
157+
else:
158+
dtype = JSON_DTYPE_TO_MACHINE_DATA_TYPE[field_metadata["dtype"]]
159+
else:
160+
# assert sub_dict == "timestamps"
161+
shape = tuple((max_seq_num - 1,))
162+
dtype = FLOAT_DTYPE
163+
164+
if "chunks" in field_metadata:
165+
# If the Event Descriptor tells us a preferred chunking, use that.
166+
suggested_chunks = field_metadata["chunks"]
167+
elif 0 in shape:
168+
# special case to avoid warning from dask
169+
suggested_chunks = shape
170+
elif len(shape) == 4:
171+
# TEMP: Special-case 4D data in a way that optimzes single-frame
172+
# access of area detector data.
173+
# If we choose 1 that would make single-frame access fast
174+
# but many-frame access too slow.
175+
suggested_chunks = (
176+
min(MAX_AD_FRAMES_PER_CHUNK, shape[0]),
177+
min(MAX_AD_FRAMES_PER_CHUNK, shape[1]),
178+
"auto",
179+
"auto",
180+
)
181+
else:
182+
suggested_chunks = ("auto",) * len(shape)
183+
184+
chunks = normalize_chunks(
185+
suggested_chunks,
186+
shape=shape,
187+
limit=CHUNK_SIZE_LIMIT,
188+
dtype=dtype.to_numpy_dtype(),
189+
)
190+
191+
if isinstance(dtype, BuiltinType):
192+
data = ArrayStructure(
193+
macro=ArrayMacroStructure(shape=shape, chunks=chunks),
194+
micro=dtype,
195+
)
196+
variable = VariableStructure(
197+
macro=VariableMacroStructure(dims=dims, data=data, attrs=attrs),
198+
micro=None,
199+
)
200+
data_array = DataArrayStructure(
201+
macro=DataArrayMacroStructure(variable, coords={}, name=key), micro=None
202+
)
203+
base_vars[key] = data_array
204+
else:
205+
struct_vars[key] = StructuredArrayTabularStructure(
206+
macro=ArrayTabularMacroStructure(chunks=chunks, shape=shape),
207+
micro=dtype,
208+
)
209+
210+
return (
211+
DatasetMacroStructure(
212+
data_vars=base_vars, coords={"time": time_data_array}, attrs={}
213+
),
214+
Tree(struct_vars),
215+
)
216+
217+
70218
class BlueskyRun(TreeInMemory, BlueskyRunMixin):
71219
specs = ["BlueskyRun"]
72220

@@ -334,8 +482,6 @@ def macrostructure(self):
334482
# `name` MUST be alike, so we can choose one arbitrarily.
335483
# IMPORTANT: Access via self.metadata so that the transforms are applied.
336484
descriptor, *_ = self.metadata["descriptors"]
337-
data_vars = {}
338-
dim_counter = itertools.count()
339485
unicode_columns = {}
340486
if self._sub_dict == "data":
341487
# Collect the keys (column names) that are of unicode data type.
@@ -349,121 +495,11 @@ def macrostructure(self):
349495
# if our guess is too large.
350496
if unicode_keys:
351497
unicode_columns.update(self._get_columns(unicode_keys, slices=None))
352-
# Build the time coordinate.
353-
time_shape = (self._cutoff_seq_num - 1,)
354-
time_chunks = normalize_chunks(
355-
("auto",) * len(time_shape),
356-
shape=time_shape,
357-
limit=CHUNK_SIZE_LIMIT,
358-
dtype=FLOAT_DTYPE.to_numpy_dtype(),
359-
)
360-
time_data = ArrayStructure(
361-
macro=ArrayMacroStructure(
362-
shape=time_shape,
363-
chunks=time_chunks,
364-
),
365-
micro=FLOAT_DTYPE,
366-
)
367-
time_variable = VariableStructure(
368-
macro=VariableMacroStructure(dims=["time"], data=time_data, attrs={}),
369-
micro=None,
370-
)
371-
time_data_array = DataArrayStructure(
372-
macro=DataArrayMacroStructure(
373-
variable=time_variable, coords={}, name="time"
374-
),
375-
micro=None,
376-
)
377-
for key, field_metadata in descriptor["data_keys"].items():
378-
# if the EventDescriptor doesn't provide names for the
379-
# dimensions (it's optional) use the same default dimension
380-
# names that xarray would.
381-
try:
382-
dims = ["time"] + field_metadata["dims"]
383-
except KeyError:
384-
ndim = len(field_metadata["shape"])
385-
dims = ["time"] + [f"dim_{next(dim_counter)}" for _ in range(ndim)]
386-
attrs = {}
387-
# Record which object (i.e. device) this column is associated with,
388-
# which enables one to find the relevant configuration, if any.
389-
for object_name, keys_ in descriptor.get("object_keys", {}).items():
390-
for item in keys_:
391-
if item == key:
392-
attrs["object"] = object_name
393-
break
394-
units = field_metadata.get("units")
395-
if units:
396-
if isinstance(units, str):
397-
attrs["units_string"] = units
398-
# TODO We may soon add a more structured units type, which
399-
# would likely be a dict here.
400-
if self._sub_dict == "data":
401-
shape = tuple((self._cutoff_seq_num - 1, *field_metadata["shape"]))
402-
# A detailed dtype "dtype_str" was added to the schema in event-model#215.
403-
# Prefer that, if it is set.
404-
# Fall back to "dtype" otherwise, and guess the detailed dtype.
405-
if "dtype_str" in field_metadata:
406-
dtype = MachineDataType.from_numpy_dtype(
407-
numpy.dtype(field_metadata["dtype_str"])
408-
)
409-
else:
410-
dtype = JSON_DTYPE_TO_MACHINE_DATA_TYPE[field_metadata["dtype"]]
411-
if dtype.kind == Kind.unicode:
412-
array = unicode_columns[key]
413-
# I do not fully understand why we need this factor of 4.
414-
# Something about what itemsize means to the dtype system
415-
# versus its actual bytesize.
416-
# This might be making an assumption that we have ASCII
417-
# characters only, which is not a safe assumption. We should
418-
# revisit this.
419-
dtype.itemsize = array.itemsize // 4
420-
else:
421-
# assert sub_dict == "timestamps"
422-
shape = tuple((self._cutoff_seq_num - 1,))
423-
dtype = FLOAT_DTYPE
424-
if "chunks" in field_metadata:
425-
# If the Event Descriptor tells us a preferred chunking, use that.
426-
suggested_chunks = field_metadata["chunks"]
427-
elif 0 in shape:
428-
# special case to avoid warning from dask
429-
suggested_chunks = shape
430-
elif len(shape) == 4:
431-
# TEMP: Special-case 4D data in a way that optimzes single-frame
432-
# access of area detector data.
433-
# If we choose 1 that would make single-frame access fast
434-
# but many-frame access too slow.
435-
suggested_chunks = (
436-
min(MAX_AD_FRAMES_PER_CHUNK, shape[0]),
437-
min(MAX_AD_FRAMES_PER_CHUNK, shape[1]),
438-
"auto",
439-
"auto",
440-
)
441-
else:
442-
suggested_chunks = ("auto",) * len(shape)
443-
chunks = normalize_chunks(
444-
suggested_chunks,
445-
shape=shape,
446-
limit=CHUNK_SIZE_LIMIT,
447-
dtype=dtype.to_numpy_dtype(),
448-
)
449-
data = ArrayStructure(
450-
macro=ArrayMacroStructure(shape=shape, chunks=chunks),
451-
micro=dtype,
452-
)
453-
variable = VariableStructure(
454-
macro=VariableMacroStructure(dims=dims, data=data, attrs=attrs),
455-
micro=None,
456-
)
457-
data_array = DataArrayStructure(
458-
macro=DataArrayMacroStructure(
459-
variable, coords={"time": time_data_array}, name=key
460-
),
461-
micro=None,
462-
)
463-
data_vars[key] = data_array
464-
return DatasetMacroStructure(
465-
data_vars=data_vars, coords={"time": time_data_array}, attrs={}
498+
499+
old_ret, structed_data = structure_from_descriptor(
500+
descriptor, self._sub_dict, self._cutoff_seq_num - 1, unicode_columns
466501
)
502+
return old_ret
467503

468504
def microstructure(self):
469505
return None
@@ -1825,10 +1861,10 @@ def parse_transforms(transforms):
18251861

18261862
# These are fallback guesses when all we have is a general jsonschema "dtype"
18271863
# like "array" no specific "dtype_str" like "<u2".
1828-
BOOLEAN_DTYPE = MachineDataType.from_numpy_dtype(numpy.dtype("bool"))
1829-
FLOAT_DTYPE = MachineDataType.from_numpy_dtype(numpy.dtype("float64"))
1830-
INT_DTYPE = MachineDataType.from_numpy_dtype(numpy.dtype("int64"))
1831-
STRING_DTYPE = MachineDataType.from_numpy_dtype(numpy.dtype("<U"))
1864+
BOOLEAN_DTYPE = BuiltinType.from_numpy_dtype(numpy.dtype("bool"))
1865+
FLOAT_DTYPE = BuiltinType.from_numpy_dtype(numpy.dtype("float64"))
1866+
INT_DTYPE = BuiltinType.from_numpy_dtype(numpy.dtype("int64"))
1867+
STRING_DTYPE = BuiltinType.from_numpy_dtype(numpy.dtype("<U"))
18321868
JSON_DTYPE_TO_MACHINE_DATA_TYPE = {
18331869
"boolean": BOOLEAN_DTYPE,
18341870
"number": FLOAT_DTYPE,

0 commit comments

Comments
 (0)