55import functools
66import itertools
77import json
8+ import logging
89import os
910import sys
1011import threading
11- import warnings
1212
1313from bson .objectid import ObjectId , InvalidId
1414import cachetools
6464CHUNK_SIZE_LIMIT = os .getenv ("DATABROKER_CHUNK_SIZE_LIMIT" , "100MB" )
6565MAX_AD_FRAMES_PER_CHUNK = int (os .getenv ("DATABROKER_MAX_AD_FRAMES_PER_CHUNK" , "10" ))
6666
67+ logger = logging .getLogger (__name__ )
68+
6769
6870class BlueskyRun (TreeInMemory , BlueskyRunMixin ):
6971 specs = ["BlueskyRun" ]
@@ -397,7 +399,15 @@ def macrostructure(self):
397399 # would likely be a dict here.
398400 if self ._sub_dict == "data" :
399401 shape = tuple ((self ._cutoff_seq_num - 1 , * field_metadata ["shape" ]))
400- dtype = JSON_DTYPE_TO_MACHINE_DATA_TYPE [field_metadata ["dtype" ]]
402+ # A detailed dtype "dtype_str" was added to the schema in event-model#215.
403+ # Prefer that, if it is set.
404+ # Fall back to "dtype" otherwise, and guess the detailed dtype.
405+ if "dtype_str" in field_metadata :
406+ dtype = MachineDataType .from_numpy_dtype (
407+ numpy .dtype (field_metadata ["dtype_str" ])
408+ )
409+ else :
410+ dtype = JSON_DTYPE_TO_MACHINE_DATA_TYPE [field_metadata ["dtype" ]]
401411 if dtype .kind == Kind .unicode :
402412 array = unicode_columns [key ]
403413 # I do not fully understand why we need this factor of 4.
@@ -476,7 +486,18 @@ def read(self, variables=None):
476486 variable = structure .data_vars [key ].macro .variable
477487 dtype = variable .macro .data .micro .to_numpy_dtype ()
478488 raw_array = columns [key ]
479- array = raw_array .astype (dtype )
489+ if raw_array .dtype != dtype :
490+ logger .warning (
491+ f"{ key !r} actually has dtype { raw_array .dtype .str !r} "
492+ f"but was reported as having dtype { dtype .str !r} . "
493+ "It will be converted to the reported type, "
494+ "but this should be fixed by setting 'dtype_str' "
495+ "in the data_key of the EventDescriptor. "
496+ f"RunStart UID: { self ._run .metadata ['start' ]['uid' ]!r} "
497+ )
498+ array = raw_array .astype (dtype )
499+ else :
500+ array = raw_array
480501 data_array = xarray .DataArray (
481502 array ,
482503 attrs = variable .macro .attrs ,
@@ -516,7 +537,18 @@ def read_block(self, variable, block, coord=None, slice=None):
516537 ]
517538 slices = [s [index ] for s , index in zip (slices_for_chunks , block )]
518539 raw_array = self ._get_columns ([variable ], slices = slices )[variable ]
519- array = raw_array .astype (dtype )
540+ if raw_array .dtype != dtype :
541+ logger .warning (
542+ f"{ variable !r} actually has dtype { raw_array .dtype .str !r} "
543+ f"but was reported as having dtype { dtype .str !r} . "
544+ "It will be converted to the reported type, "
545+ "but this should be fixed by setting 'dtype_str' "
546+ "in the data_key of the EventDescriptor. "
547+ f"RunStart UID: { self ._run .metadata ['start' ]['uid' ]!r} "
548+ )
549+ array = raw_array .astype (dtype )
550+ else :
551+ array = raw_array
520552 if slice is not None :
521553 array = array [slice ]
522554 return array
@@ -1691,7 +1723,7 @@ def discover_handlers(entrypoint_group_name="databroker.handlers", skip_failures
16911723 matches = list (matches )
16921724 if len (matches ) != 1 :
16931725 winner = group [name ]
1694- warnings . warn (
1726+ logger . warning (
16951727 f"There are { len (matches )} entrypoints for the "
16961728 f"databroker handler spec { name !r} . "
16971729 f"They are { matches } . The match { winner } has won the race."
@@ -1702,7 +1734,7 @@ def discover_handlers(entrypoint_group_name="databroker.handlers", skip_failures
17021734 handler_class = entrypoint .load ()
17031735 except Exception as exc :
17041736 if skip_failures :
1705- warnings . warn (
1737+ logger . warning (
17061738 f"Skipping { entrypoint !r} which failed to load. "
17071739 f"Exception: { exc !r} "
17081740 )
@@ -1791,6 +1823,8 @@ def parse_transforms(transforms):
17911823 return _parse_dict_of_objs_or_importable_strings (transforms )
17921824
17931825
1826+ # These are fallback guesses when all we have is a general jsonschema "dtype"
1827+ # like "array" no specific "dtype_str" like "<u2".
17941828BOOLEAN_DTYPE = MachineDataType .from_numpy_dtype (numpy .dtype ("bool" ))
17951829FLOAT_DTYPE = MachineDataType .from_numpy_dtype (numpy .dtype ("float64" ))
17961830INT_DTYPE = MachineDataType .from_numpy_dtype (numpy .dtype ("int64" ))
@@ -1800,7 +1834,7 @@ def parse_transforms(transforms):
18001834 "number" : FLOAT_DTYPE ,
18011835 "integer" : INT_DTYPE ,
18021836 "string" : STRING_DTYPE ,
1803- "array" : FLOAT_DTYPE , # HACK This is not a good assumption .
1837+ "array" : FLOAT_DTYPE , # If this is wrong, set 'dtype_str' in data_key to override .
18041838}
18051839
18061840
0 commit comments