| 
21 | 21 | 
 
  | 
22 | 22 |     from mdio.core.storage_location import StorageLocation  | 
23 | 23 | 
 
  | 
 | 24 | +from xarray import Variable  | 
 | 25 | +from zarr.core.config import config as zarr_config  | 
24 | 26 | 
 
  | 
25 | 27 | from mdio.constants import UINT32_MAX  | 
26 | 28 | from mdio.schemas.v1.dataset_serializer import _get_fill_value  | 
@@ -101,6 +103,12 @@ def trace_worker(  # noqa: PLR0913  | 
101 | 103 |     Returns:  | 
102 | 104 |         SummaryStatistics object containing statistics about the written traces.  | 
103 | 105 |     """  | 
 | 106 | +    # Setting the zarr config to 1 thread to ensure we honor the `MDIO__IMPORT__MAX_WORKERS`  | 
 | 107 | +    # environment variable.  | 
 | 108 | +    # Since the release of the Zarr 3 engine, it will default to many threads.  | 
 | 109 | +    # This can cause resource contention and unpredicted memory consumption.  | 
 | 110 | +    zarr_config.set({"threading.max_workers": 1})  | 
 | 111 | + | 
104 | 112 |     if not dataset.trace_mask.any():  | 
105 | 113 |         return None  | 
106 | 114 | 
 
  | 
@@ -128,13 +136,29 @@ def trace_worker(  # noqa: PLR0913  | 
128 | 136 |         # TODO(BrianMichell): Implement this better so that we can enable fill values without changing the code. #noqa: TD003  | 
129 | 137 |         tmp_headers = np.zeros_like(dataset[header_key])  | 
130 | 138 |         tmp_headers[not_null] = traces.header  | 
131 |  | -        ds_to_write[header_key][:] = tmp_headers  | 
 | 139 | +        # Create a new Variable object to avoid copying the temporary array  | 
 | 140 | +        # The ideal solution is to use `ds_to_write[header_key][:] = tmp_headers`  | 
 | 141 | +        # but Xarray appears to be copying memory instead of doing direct assignment.  | 
 | 142 | +        # TODO(BrianMichell): #614 Look into this further.  | 
 | 143 | +        ds_to_write[header_key] = Variable(  | 
 | 144 | +            ds_to_write[header_key].dims,  | 
 | 145 | +            tmp_headers,  | 
 | 146 | +            attrs=ds_to_write[header_key].attrs,  | 
 | 147 | +            encoding=ds_to_write[header_key].encoding,  # Not strictly necessary, but safer than not doing it.  | 
 | 148 | +        )  | 
132 | 149 | 
 
  | 
133 | 150 |     data_variable = ds_to_write[data_variable_name]  | 
134 | 151 |     fill_value = _get_fill_value(ScalarType(data_variable.dtype.name))  | 
135 | 152 |     tmp_samples = np.full_like(data_variable, fill_value=fill_value)  | 
136 | 153 |     tmp_samples[not_null] = traces.sample  | 
137 |  | -    ds_to_write[data_variable_name][:] = tmp_samples  | 
 | 154 | +    # Create a new Variable object to avoid copying the temporary array  | 
 | 155 | +    # TODO(BrianMichell): #614 Look into this further.  | 
 | 156 | +    ds_to_write[data_variable_name] = Variable(  | 
 | 157 | +        ds_to_write[data_variable_name].dims,  | 
 | 158 | +        tmp_samples,  | 
 | 159 | +        attrs=ds_to_write[data_variable_name].attrs,  | 
 | 160 | +        encoding=ds_to_write[data_variable_name].encoding,  # Not strictly necessary, but safer than not doing it.  | 
 | 161 | +    )  | 
138 | 162 | 
 
  | 
139 | 163 |     ds_to_write.to_zarr(output_location.uri, region=region, mode="r+", write_empty_chunks=False, zarr_format=2)  | 
140 | 164 | 
 
  | 
 | 
0 commit comments