diff --git a/Framework/PythonInterface/plugins/algorithms/CreatePoleFigureTableWorkspace.py b/Framework/PythonInterface/plugins/algorithms/CreatePoleFigureTableWorkspace.py index 4ac4bc3cae5d..102db558bb5f 100644 --- a/Framework/PythonInterface/plugins/algorithms/CreatePoleFigureTableWorkspace.py +++ b/Framework/PythonInterface/plugins/algorithms/CreatePoleFigureTableWorkspace.py @@ -227,7 +227,7 @@ def PyExec(self): if not self.no_x0_needed: x0s = np.asarray(peak_param_ws.column("X0")) # if no hkl provided, peak will be set to mean x0 - peak = np.mean(x0s) + peak = np.mean(x0s[np.isfinite(x0s)]) # only use non nan x0s if not self.no_chi2_needed: chi2 = np.asarray(peak_param_ws.column("chi2")) else: diff --git a/Testing/SystemTests/tests/framework/TextureAnalysisScriptTest.py b/Testing/SystemTests/tests/framework/TextureAnalysisScriptTest.py index acaf4dc4a799..7f9a241ffdd1 100644 --- a/Testing/SystemTests/tests/framework/TextureAnalysisScriptTest.py +++ b/Testing/SystemTests/tests/framework/TextureAnalysisScriptTest.py @@ -9,7 +9,7 @@ import systemtesting from mantid import config from mantid.api import AnalysisDataService as ADS -from Engineering.texture.TextureUtils import run_abs_corr, fit_all_peaks, is_macOS +from Engineering.texture.TextureUtils import run_abs_corr, fit_all_peaks from mantid.simpleapi import LoadEmptyInstrument, CreateSampleShape, SetSampleMaterial, Load, ExtractSingleSpectrum, ConvertUnits from Engineering.common.xml_shapes import get_cube_xml import numpy as np @@ -57,7 +57,7 @@ def validate_expected_files(self): [self.assertTrue(os.path.exists(ef)) for ef in self.expected_files] -class RunAStandardAbsorptionCorrectionWithAttenuationTable(systemtesting.MantidSystemTest, AbsCorrMixin): +class RunAStandardAbsorptionCorrectionWithAttenuationTable(AbsCorrMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_absorption_correction_inputs() kwargs = self.default_kwargs @@ -82,7 +82,7 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["AbsorptionCorrection", "AttenuationTables", "User"]) -class RunAStandardAbsorptionCorrection(systemtesting.MantidSystemTest, AbsCorrMixin): +class RunAStandardAbsorptionCorrection(AbsCorrMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_absorption_correction_inputs() kwargs = self.default_kwargs @@ -101,7 +101,7 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["AbsorptionCorrection"]) -class RunAStandardAbsorptionCorrectionEulerGoniometer(systemtesting.MantidSystemTest, AbsCorrMixin): +class RunAStandardAbsorptionCorrectionEulerGoniometer(AbsCorrMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_absorption_correction_inputs() orientation_file = os.path.join(CWDIR, "rotation_as_euler.txt") @@ -124,7 +124,7 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["AbsorptionCorrection"]) -class RunAStandardAbsorptionCorrectionProvideGoniometerMatrix(systemtesting.MantidSystemTest, AbsCorrMixin): +class RunAStandardAbsorptionCorrectionProvideGoniometerMatrix(AbsCorrMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_absorption_correction_inputs() orientation_file = os.path.join(CWDIR, "rotation_as_matrix.txt") @@ -145,7 +145,7 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["AbsorptionCorrection"]) -class RunAStandardAbsorptionCorrectionWithCustomGaugeVolume(systemtesting.MantidSystemTest, AbsCorrMixin): +class RunAStandardAbsorptionCorrectionWithCustomGaugeVolume(AbsCorrMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_absorption_correction_inputs() gv_file = os.path.join(CWDIR, "custom_gauge_volume.xml") @@ -166,7 +166,7 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["AbsorptionCorrection"]) -class RunAStandardAbsorptionCorrectionWithDivergenceCorrection(systemtesting.MantidSystemTest, AbsCorrMixin): +class RunAStandardAbsorptionCorrectionWithDivergenceCorrection(AbsCorrMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_absorption_correction_inputs() kwargs = self.default_kwargs @@ -196,57 +196,57 @@ def setup_fit_peaks_inputs(self): self.fit_dir = os.path.join(CWDIR, "FitParameters") self.peaks = (1.8, 1.44) self.reference_columns = ["wsindex", "I_est", "I", "I_err", "A", "A_err", "B", "B_err", "X0", "X0_err", "S", "S_err"] + self.cols_to_check_vals = ["wsindex", "I_est", "I", "X0", "S"] self.default_kwargs = { "wss": ["ENGINX_280625_focused_bank_1_dSpacing"], "peaks": self.peaks, - "peak_window": 0.03, + "peak_window": 0.05, "save_dir": self.fit_dir, } self.peak_1_vals = [ 0, - 54.72063098592834, - 219.83253111955707, + 60.0549787, + 58.67645081, + 0.85816193, + 8.69792, 0.0, - 8.11518344152499, - 0.0, - -0.7790091073987409, - 0.0, - 1.7477947721710467, - 0.0, - 0.00042411, + 0.29023, 0.0, + 1.80098428, + 9.53361135e-05, + 126.32162042, + 1.8714507, ] self.peak_2_vals = [ 0, - 45.836957142608, - 60.962977355483645, - 0.0, - 212.37513269155008, - 0.0, - 155.94311474389426, - 0.0, - 1.4379627981678513, + 53.11713333, + 58.45633143, + 1.11592828, + 1497790.0, 0.0, - 0.00696813, + 0.0165231, 0.0, + 1.43592701, + 0.00016835, + 163.86109962, + 3.08200681, ] - def validate_table(self, out_table, expected_dict): + def validate_table(self, out_table, expected_dict, rtol=5e-3): expected_cols = list(expected_dict.keys()) for c in out_table.getColumnNames(): print(c, ": ", np.nan_to_num(out_table.column(c)), ", validation value: ", expected_dict[c]) for c in out_table.getColumnNames(): self.assertIn(c, expected_cols) - if not is_macOS(): - # fitting results currently flaky on mac os - np.testing.assert_allclose(np.nan_to_num(out_table.column(c)), expected_dict[c], rtol=1e-3) + if c in self.cols_to_check_vals: + np.testing.assert_allclose(np.nan_to_num(out_table.column(c)), expected_dict[c], rtol=rtol) def validate_missing_peaks_vals(self, peak_1_vals, peak_2_vals): - param_table1 = ADS.retrieve("ENGINX_280625_2.3_GROUP_Fit_Parameters") + param_table1 = ADS.retrieve("ENGINX_280625_2.2_GROUP_Fit_Parameters") param_table2 = ADS.retrieve("ENGINX_280625_2.5_GROUP_Fit_Parameters") expected_files = [ - os.path.join(CWDIR, "FitParameters", "GROUP", "2.3", "ENGINX_280625_2.3_GROUP_Fit_Parameters.nxs"), + os.path.join(CWDIR, "FitParameters", "GROUP", "2.2", "ENGINX_280625_2.2_GROUP_Fit_Parameters.nxs"), os.path.join(CWDIR, "FitParameters", "GROUP", "2.5", "ENGINX_280625_2.5_GROUP_Fit_Parameters.nxs"), ] @@ -255,7 +255,7 @@ def validate_missing_peaks_vals(self, peak_1_vals, peak_2_vals): [self.assertTrue(os.path.exists(ef)) for ef in expected_files] -class TestFittingPeaksOfFocusedData(systemtesting.MantidSystemTest, PeakFitMixin): +class TestFittingPeaksOfFocusedDataNoGroup(PeakFitMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_fit_peaks_inputs() fit_all_peaks(**self.default_kwargs) @@ -269,7 +269,7 @@ def validate(self): ] self.validate_table(param_table1, dict(zip(self.reference_columns, self.peak_1_vals))) - self.validate_table(param_table2, dict(zip(self.reference_columns, self.peak_2_vals))) + self.validate_table(param_table2, dict(zip(self.reference_columns, self.peak_2_vals)), rtol=8e-3) [self.assertTrue(os.path.exists(ef)) for ef in expected_files] def cleanup(self): @@ -277,11 +277,11 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["FitParameters"]) -class TestFittingPeaksOfMissingPeakDataWithFillZero(systemtesting.MantidSystemTest, PeakFitMixin): +class TestFittingPeaksOfMissingPeakDataWithFillZero(PeakFitMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_fit_peaks_inputs() kwargs = self.default_kwargs - kwargs["peaks"] = (2.3, 2.5) + kwargs["peaks"] = (2.2, 2.5) # expect no peaks here, set the i over sigma threshold large as well as sigma is ill-defined fit_all_peaks(**kwargs, i_over_sigma_thresh=10.0, nan_replacement="zeros") @@ -296,11 +296,11 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["FitParameters"]) -class TestFittingPeaksOfMissingPeakDataWithSpecifiedValue(systemtesting.MantidSystemTest, PeakFitMixin): +class TestFittingPeaksOfMissingPeakDataWithSpecifiedValue(PeakFitMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_fit_peaks_inputs() kwargs = self.default_kwargs - kwargs["peaks"] = (2.3, 2.5) + kwargs["peaks"] = (2.2, 2.5) # expect no peaks here, set the i over sigma threshold large as well as sigma is ill-defined fit_all_peaks(**kwargs, i_over_sigma_thresh=10.0, nan_replacement="zeros", no_fit_value_dict={"I_est": 1.0, "I": 0.01}) @@ -316,7 +316,7 @@ def cleanup(self): _try_delete_dirs(CWDIR, ["FitParameters"]) -class TestFittingPeaksOfFocusedDataWithGroup(systemtesting.MantidSystemTest, PeakFitMixin): +class TestFittingPeaksOfFocusedDataWithGroup(PeakFitMixin, systemtesting.MantidSystemTest): def runTest(self): self.setup_fit_peaks_inputs() self.input_ws.getRun().addProperty("Grouping", "TEST", False) @@ -332,7 +332,7 @@ def validate(self): ] self.validate_table(param_table1, dict(zip(self.reference_columns, self.peak_1_vals))) - self.validate_table(param_table2, dict(zip(self.reference_columns, self.peak_2_vals))) + self.validate_table(param_table2, dict(zip(self.reference_columns, self.peak_2_vals)), rtol=8e-3) [self.assertTrue(os.path.exists(ef)) for ef in expected_files] def cleanup(self): diff --git a/docs/source/release/v6.15.0/Diffraction/Engineering/New_features/39931.rst b/docs/source/release/v6.15.0/Diffraction/Engineering/New_features/39931.rst new file mode 100644 index 000000000000..4b254a965a10 --- /dev/null +++ b/docs/source/release/v6.15.0/Diffraction/Engineering/New_features/39931.rst @@ -0,0 +1 @@ +- The automated peak fitting routine ``fit_all_peaks`` in ``Engineering.texture.TextureUtils`` has been updated to improve robustness, particularly in terms of fitting peak positions. diff --git a/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/plot_model.py b/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/plot_model.py index ca28255c7c8b..b3a4fb4f620b 100644 --- a/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/plot_model.py +++ b/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/plot_model.py @@ -11,7 +11,7 @@ from mantid import FunctionFactory from mantid.api import TextAxis -from mantid.kernel import UnitConversion, DeltaEModeType, UnitParams +from mantid.kernel import UnitConversion, DeltaEModeType from mantid.simpleapi import logger, CreateEmptyTableWorkspace, GroupWorkspaces, CreateWorkspace, FindPeaksConvolve, SaveNexus from mantid.api import AnalysisDataService as ADS from mantidqtinterfaces.Engineering.gui.engineering_diffraction.tabs.common.output_sample_logs import write_table_row @@ -19,6 +19,7 @@ from mantid.fitfunctions import FunctionWrapper from mantidqtinterfaces.Engineering.gui.engineering_diffraction.tabs.common import output_settings from mantidqtinterfaces.Engineering.gui.engineering_diffraction.tabs.common import wsname_in_instr_run_ceria_group_ispec_unit_format +from Engineering.EnggUtils import convert_TOFerror_to_derror from os import path, makedirs @@ -122,9 +123,7 @@ def _convert_TOF_to_d(self, tof, ws_name): def _convert_TOFerror_to_derror(self, tof_error, d, ws_name): diff_consts = self._get_diff_constants(ws_name) - difc = diff_consts[UnitParams.difc] - difa = diff_consts[UnitParams.difa] if UnitParams.difa in diff_consts else 0 - return tof_error / (2 * difa * d + difc) + return convert_TOFerror_to_derror(diff_consts, tof_error, d) def _get_diff_constants(self, ws_name): """ diff --git a/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/test/test_plot_model.py b/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/test/test_plot_model.py index 1be82c2eb0a3..bbd88b05b41a 100644 --- a/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/test/test_plot_model.py +++ b/qt/python/mantidqtinterfaces/mantidqtinterfaces/Engineering/gui/engineering_diffraction/tabs/fitting/plotting/test/test_plot_model.py @@ -669,19 +669,6 @@ def test_save_files_with_no_RB_creates_dirs_and_saves(self, mock_output_path, mo self.run_the_save_tests(expected_dirs, mock_makedirs, mock_save) - @patch(plot_model_path + ".FittingPlotModel._get_diff_constants") - def test_convert_centres_and_error_from_TOF_to_d(self, mock_get_diffs): - params = UnitParametersMap() - params[UnitParams.difc] = 18000 - mock_get_diffs.return_value = params - tof = 40000 - tof_error = 5 - d = self.model._convert_TOF_to_d(tof, "ws_name") - d_error = self.model._convert_TOFerror_to_derror(tof_error, d, "ws_name") - - self.assertAlmostEqual(tof / d, 18000, delta=1e-10) - self.assertAlmostEqual(d_error / d, tof_error / tof, delta=1e-10) - def _get_sample_findpeaksconvolve_group_ws(self): peak_centers = CreateEmptyTableWorkspace(OutputWorkspace="PeakCentre") peak_centers.addColumn("int", "SpecIndex") diff --git a/scripts/Engineering/EnggUtils.py b/scripts/Engineering/EnggUtils.py index 43bb45414556..48dded951a6a 100644 --- a/scripts/Engineering/EnggUtils.py +++ b/scripts/Engineering/EnggUtils.py @@ -1007,6 +1007,12 @@ def convert_to_TOF(parent, ws): return alg.getProperty("OutputWorkspace").value +def convert_TOFerror_to_derror(diff_consts, tof_error, d): + difc = diff_consts[UnitParams.difc] + difa = diff_consts[UnitParams.difa] if UnitParams.difa in diff_consts else 0 + return tof_error / (2 * difa * d + difc) + + def crop_data(parent, ws, indices): """ DEPRECATED: not used in UI, only in deprecated functions (EnggVanadiumCorrections and EnggFocus) diff --git a/scripts/Engineering/texture/TextureUtils.py b/scripts/Engineering/texture/TextureUtils.py index b6f30f471211..838b49204232 100644 --- a/scripts/Engineering/texture/TextureUtils.py +++ b/scripts/Engineering/texture/TextureUtils.py @@ -6,10 +6,18 @@ # SPDX - License - Identifier: GPL - 3.0 + import numpy as np from os import path, scandir -import sys from Engineering.texture.correction.correction_model import TextureCorrectionModel from Engineering.texture.polefigure.polefigure_model import TextureProjection from mantid.simpleapi import SaveNexus, logger, CreateEmptyTableWorkspace, Fit +from mantid.simpleapi import ( + ConvertUnits, + Rebunch, + Rebin, + SumSpectra, + AppendSpectra, + CloneWorkspace, + CropWorkspace, +) from pathlib import Path from Engineering.EnggUtils import GROUP from Engineering.EnginX import EnginX @@ -17,12 +25,15 @@ from typing import Optional, Sequence, Union, Tuple from mantid.dataobjects import Workspace2D from mantid.fitfunctions import FunctionWrapper, CompositeFunctionWrapper -from plugins.algorithms.IntegratePeaks1DProfile import PeakFunctionGenerator, calc_intens_and_sigma_arrays +from plugins.algorithms.IntegratePeaks1DProfile import calc_intens_and_sigma_arrays from Engineering.texture.xtal_helper import get_xtal_structure +from Engineering.EnggUtils import convert_TOFerror_to_derror # import texture helper functions so they can be accessed by users through the TextureUtils namespace from Engineering.texture.texture_helper import plot_pole_figure +from mantid.kernel import DeltaEModeType, UnitConversion +from plugins.algorithms.peakdata_utils import PeakData # -------- Utility -------------------------------- @@ -246,142 +257,244 @@ def validate_abs_corr_inputs( # -------- Fitting Script Logic-------------------------------- -class TexturePeakFunctionGenerator(PeakFunctionGenerator): - def __init__(self, peak_params_to_fix: Sequence[str], min_width: float = 1e-3): - super().__init__(peak_params_to_fix) - self.min_width = min_width - - def get_initial_fit_function_and_kwargs_from_specs( - self, - ws: Workspace2D, - peak: float, - x_window: tuple[float, float], - parameters_to_tie: Sequence[str], - peak_func_name: str, - bg_func_name: str, - ) -> Tuple[str, dict, Sequence[float]]: - # modification of get_initial_fit_function_and_kwargs to just fit a peak within the x_window - si = ws.spectrumInfo() - ispecs = list(range(si.size())) - x_start, x_end = x_window - function = MultiDomainFunction() - fit_kwargs = {} - # estimate background - istart = ws.yIndexOfX(x_start) - iend = ws.yIndexOfX(x_end) - # init bg func (global) - bg_func = FunctionFactory.createFunction(bg_func_name) - # init peak func +def crop_and_rebin(ws, out_ws, lower, upper, rebin_params): + CropWorkspace(ws, lower, upper, OutputWorkspace="__tmp_peak_window") + Rebin("__tmp_peak_window", rebin_params, OutputWorkspace=out_ws) + + +def _get_max_bin(ws): + xdat = ws.extractX() + return np.diff(xdat, axis=1).max() + + +def crop_wss_and_combine(wss, peak, lower, upper, output): + cropped_rebinned_wss = [f"rebin_ws_{peak}_0"] + peak_window_ws = CropWorkspace(wss[0], lower, upper, OutputWorkspace="__peak_window_crop") + rebin_params = (lower, _get_max_bin(peak_window_ws), upper) + Rebin("__peak_window_crop", rebin_params, OutputWorkspace=f"rebin_ws_{peak}_0") + CloneWorkspace(InputWorkspace=f"rebin_ws_{peak}_0", OutputWorkspace=f"rebin_ws_{peak}") + for iws, ws in enumerate(wss[1:]): + intermediate_ws = f"rebin_ws_{peak}_{iws + 1}" + cropped_rebinned_wss.append(intermediate_ws) + crop_and_rebin(ws, intermediate_ws, lower, upper, rebin_params) + AppendSpectra(f"rebin_ws_{peak}", intermediate_ws, OutputWorkspace=f"rebin_ws_{peak}") + return SumSpectra(f"rebin_ws_{peak}", OutputWorkspace=output), cropped_rebinned_wss + + +def fit_initial_summed_spectra(wss, peaks, peak_window, fit_kwargs): + x0_lims = [] + all_peak_crop_wss = [] + for i, peak in enumerate(peaks): + # set the ws bounds based on the supplied peak window + low_bound, hi_bound = peak - peak_window, peak + peak_window + window_ws, peak_crop_wss = crop_wss_and_combine(wss, peak, low_bound, hi_bound, f"peak_window_{i}") + + # the outer list is peak index and the inner list is each ws (str) in wss cropped and rebinned for that peak + all_peak_crop_wss.append(peak_crop_wss) + + # set up a function to fit + bg_func = FunctionFactory.createFunction("LinearBackground") + peak_func = FunctionFactory.Instance().createPeakFunction("BackToBackExponential") + + # estimate starting params + intens, sigma, *_ = _estimate_intensity_background_and_centre(window_ws, 0, 0, len(window_ws.readX(0)) - 1, peak) + peak_func.setCentre(peak) + peak_func.setIntensity(intens) + cen_par_name = peak_func.getCentreParameterName() + peak_func.addConstraints(f"{low_bound} < {cen_par_name} < {hi_bound}") + peak_func.addConstraints("I > 0") + comp_func = CompositeFunctionWrapper(FunctionWrapper(peak_func), FunctionWrapper(bg_func), NumDeriv=True) + fit_kwargs["InputWorkspace"] = window_ws.name() + fit_kwargs["StartX"] = low_bound + fit_kwargs["EndX"] = hi_bound + + fit_object = Fit( + Function=comp_func, + Output=f"composite_fit_{peak}", + MaxIterations=50, # if it hasn't fit in 50 it is likely because the texture has the peak missing + **fit_kwargs, + ) + b2b_func = fit_object.Function.function.getFunction(0) + x0 = b2b_func.getParameterValue("X0") + x0_lims.append((x0 * (1 - 3e-3), x0 * (1 + 3e-3))) + return x0_lims, all_peak_crop_wss + + +def get_initial_fit_function_and_kwargs_from_specs( + ws: Workspace2D, + ws_tof: Workspace2D, + peak: float, + x_window: tuple[float, float], + x0_window: tuple[float, float], + parameters_to_tie: Optional[Sequence[str]], + peak_func_name: str, + bg_func_name: str, + tie_bkg: bool, +) -> Tuple[str, dict, Sequence[float]]: + # modification of get_initial_fit_function_and_kwargs to just fit a peak within the x_window + + # get the number of spectra + si = ws.spectrumInfo() + ispecs = list(range(si.size())) + + # set up the fit window and data structures + x_start, x_end = x_window + fit_kwargs = {} + approx_bkgs = [] + intensity_estimates = [] + + # set up the overall fit wrapper function and base versions of the individual peak and background + function = MultiDomainFunction() + bg_func = FunctionFactory.createFunction(bg_func_name) + base_peak_func = FunctionFactory.Instance().createPeakFunction(peak_func_name) + base_peak_func.setIntensity(1.0) + + # save parameter names for future ties/constraints + intens_par_name = next( + base_peak_func.getParamName(ipar) for ipar in range(base_peak_func.nParams()) if base_peak_func.isExplicitlySet(ipar) + ) + cen_par_name = base_peak_func.getCentreParameterName() + width_par_name = base_peak_func.getWidthParameterName() + + # for each of the spectra + for ispec in ispecs: + # convert d spacing to tof - better step size for A, B and S refinement + diff_consts = si.diffractometerConstants(ispec) + tof_peak = UnitConversion.run("dSpacing", "TOF", peak, 0, DeltaEModeType.Elastic, diff_consts) + tof_start = UnitConversion.run("dSpacing", "TOF", x_start, 0, DeltaEModeType.Elastic, diff_consts) + tof_end = UnitConversion.run("dSpacing", "TOF", x_end, 0, DeltaEModeType.Elastic, diff_consts) + + # get the window indices for the spectra + istart = ws_tof.yIndexOfX(tof_start, ispec) + iend = ws_tof.yIndexOfX(tof_end, ispec) + + # get param estimates + intens, sigma, bg, centre = _estimate_intensity_background_and_centre(ws_tof, ispec, istart, iend, tof_peak) + intensity_estimates.append(intens) + approx_bkgs.append(bg) + + # create an individual peak, using estimated values as initial guess peak_func = FunctionFactory.Instance().createPeakFunction(peak_func_name) - # save parameter names for future ties/constraints - peak_func.setIntensity(1.0) - self.intens_par_name = next(peak_func.getParamName(ipar) for ipar in range(peak_func.nParams()) if peak_func.isExplicitlySet(ipar)) - self.cen_par_name = peak_func.getCentreParameterName() - self.width_par_name = peak_func.getWidthParameterName() - avg_bg = 0 - intensity_estimates = [] - for ispec in ispecs: - # get param estimates - intens, sigma, bg = self._estimate_intensity_and_background(ws, ispec, istart, iend) - intensity_estimates.append(intens) - avg_bg += bg - # add peak, using provided value as guess for centre - peak_func.setCentre(peak) - peak_func.setIntensity(intens) - # add constraints - peak_func.addConstraints(f"{self.cen_par_name} > {x_start}") - peak_func.addConstraints(f"{self.cen_par_name} < {x_end}") - peak_func.addConstraints(f"{self.intens_par_name} > 0") - comp_func = CompositeFunctionWrapper(FunctionWrapper(peak_func), FunctionWrapper(bg_func), NumDeriv=True) - function.add(comp_func.function) - function.setDomainIndex(ispec, ispec) - key_suffix = f"_{ispec}" if ispec > 0 else "" - fit_kwargs["InputWorkspace" + key_suffix] = ws.name() - fit_kwargs["StartX" + key_suffix] = x_start - fit_kwargs["EndX" + key_suffix] = x_end - fit_kwargs["WorkspaceIndex" + key_suffix] = int(ispec) - # set background (background global tied to first domain function) - function[0][1]["A0"] = avg_bg / len(ispecs) - function[0][0].setMatrixWorkspace(ws, 0, x_start, x_end) - available_params = [peak_func.getParamName(i) for i in range(peak_func.nParams())] - if parameters_to_tie is not None: - invalid = [p for p in parameters_to_tie if p not in available_params] - if invalid: - raise ValueError(f"Invalid parameter(s) to tie: {invalid}. Available: {available_params}") - - # set constraint on FWHM (to avoid peak fitting to noise or background) - self._add_fwhm_constraints(function, peak_func, fit_range=x_end - x_start, nbins=iend - istart) - return self._add_parameter_ties(function, parameters_to_tie), fit_kwargs, intensity_estimates - - def _add_parameter_ties(self, function: MultiDomainFunction, pars_to_tie: Sequence[str]) -> str: - # fix peak params requested - [function[0][0].fixParameter(par) for par in self.peak_params_to_fix] - additional_pars_to_fix = set(self.peak_params_to_fix) - set(pars_to_tie) - ties = [] - for idom in range(1, function.nDomains()): - # tie global params to first - for par in pars_to_tie: - ties.append(f"f{idom}.f0.{par}=f0.f0.{par}") # global peak pars - for ipar_bg in range(function[idom][1].nParams()): - par = function[idom][1].getParamName(ipar_bg) - ties.append(f"f{idom}.f1.{par}=f0.f1.{par}") - for par in additional_pars_to_fix: - # pars to be fixed but not global/already tied - function[idom][0].fixParameter(par) - # add ties as string (orders of magnitude quicker than self.function.tie) - return f"{str(function)};ties=({','.join(ties)})" - - -def _get_run_and_prefix_from_ws_log(ws: Workspace2D, wsname: str) -> Tuple[str, str]: - try: - run = str(ws.getRun().getLogData("run_number").value) - prefix = wsname.split(run)[0] - except: - run = "unknown" - prefix = "" - return run, prefix - - -def _get_grouping_from_ws_log(ws: Workspace2D) -> str: - try: - grouping = str(ws.getRun().getLogData("Grouping").value) - except RuntimeError: - grouping = "GROUP" - return grouping - - -def get_default_values(params, no_fit_dict): - defaults = dict(zip(params, [np.nan for _ in params])) - if isinstance(no_fit_dict, dict): - for k, v in no_fit_dict.items(): - defaults[k] = v - return defaults - - -def replace_nans(vals, method: str): - new_vals = np.zeros_like(vals.T) # want to iterate over table columns - # if method is zero, replace all nans with zero - if method == "zeros": - return np.nan_to_num(vals) - # otherwise, if there is at least 1 value per row that isn't nan, replace the nans with the min/max/mean of these - elif method == "mean": - func = np.mean - elif method == "max": - func = np.max - else: - func = np.min - for i, col in enumerate(vals.T): - non_nan = col[~np.isnan(col)] - if len(non_nan) > 0: - new_vals[i] = np.nan_to_num(col, nan=func(non_nan)) + peak_func.setCentre(centre) + peak_func.setIntensity(intens) + + # calculate constraint values + # convert x0 bounds to TOF + x0_lower = UnitConversion.run("dSpacing", "TOF", x0_window[0], 0, DeltaEModeType.Elastic, diff_consts) + x0_upper = UnitConversion.run("dSpacing", "TOF", x0_window[1], 0, DeltaEModeType.Elastic, diff_consts) + # constrain the values of S to be at least half bin width and no more than half the window size + scale_factor = 2 * np.sqrt(2 * np.log(2)) + width_min = 0.5 * ((tof_end - tof_start) / (iend - istart)) * scale_factor + width_max = max(width_min + 1e-10, ((tof_end - tof_start) / 2) * scale_factor) + + # add these constraints + peak_func.addConstraints(f"{x0_lower} < {cen_par_name} < {x0_upper}") + peak_func.addConstraints(f"{intens / 5} < {intens_par_name} < {intens * 5}") + peak_func.addConstraints(f"{width_min}<{width_par_name}<{width_max}") + + if not tie_bkg: + bg_func.setParameter("A0", bg) + + # package up the spectra fit functions (peak + background) into a composite function + comp_func = CompositeFunctionWrapper(FunctionWrapper(peak_func), FunctionWrapper(bg_func), NumDeriv=True) + function.add(comp_func.function) + function.setDomainIndex(ispec, ispec) + function.setMatrixWorkspace(ws_tof, ispec, tof_start, tof_end) + + # set the fit kwargs for this spectra + key_suffix = f"_{ispec}" if ispec > 0 else "" + fit_kwargs["InputWorkspace" + key_suffix] = ws_tof.name() + fit_kwargs["StartX" + key_suffix] = tof_start + fit_kwargs["EndX" + key_suffix] = tof_end + fit_kwargs["WorkspaceIndex" + key_suffix] = int(ispec) + + # add parameter ties + ties = [] + + # first tie background + if tie_bkg: + function, ties = _tie_bkg(function, approx_bkgs, ties) + + # then any other nominated parameters + available_params = [base_peak_func.getParamName(i) for i in range(base_peak_func.nParams())] + if parameters_to_tie is not None: + invalid = [p for p in parameters_to_tie if p not in available_params] + if invalid: + raise ValueError(f"Invalid parameter(s) to tie: {invalid}. Available: {available_params}") else: - # if the row is all nan, then we leave it as is - new_vals[i] = col - return new_vals.T - - -def is_macOS(): - return sys.platform == "darwin" + for idom in range(1, function.nDomains()): + # tie global params to first + for par in parameters_to_tie: + ties.append(f"f{idom}.f0.{par}=f0.f0.{par}") # global peak pars + + # if ties are to be added, do so as a string as it is faster + func = f"{str(function)};ties=({','.join(ties)})" if ties else function + return func, fit_kwargs, intensity_estimates + + +def rerun_fit_with_new_ws( + mdf: MultiDomainFunction, + fit_kwargs: dict, + md_fit_kwargs: dict, + new_ws: Workspace2D, + x0_frac_move: float, + iters: int, + parameters_to_fix: Optional[Sequence[str]] = None, + parameters_to_tie: Optional[Sequence[str]] = None, + tie_background: bool = False, + is_final: bool = False, +): + # update the input workspace in the fitting kwargs + for k in md_fit_kwargs.keys(): + if "InputWorkspace" in k: + md_fit_kwargs[k] = new_ws.name() + + ties = [] + new_func = MultiDomainFunction() + for idom in range(mdf.nFunctions()): + comp = mdf[idom] + peak = comp[0] + bg = comp[1] + # create fresh peak as ties are causing problems + new_peak = FunctionFactory.Instance().createPeakFunction(peak.name()) + [new_peak.setParameter(param, peak.getParameterValue(param)) for param in ("I", "X0", "A", "B", "S")] + # update constraints around new values + intens = max(peak.getParameterValue("I"), 1) + x0 = peak.getParameterValue("X0") + if not is_final: + # don't constrain the intensity on the final fit + new_peak.addConstraints(f"{max(intens / 2, 1e-6)} 0: + if tie_background: + for ipar_bg in range(bg.nParams()): + par = bg.getParamName(ipar_bg) + ties.append(f"f{idom}.f1.{par}=f0.f1.{par}") + if parameters_to_tie: + for par in parameters_to_tie: + ties.append(f"f{idom}.f0.{par}=f0.f0.{par}") + # fix parameters if required + if parameters_to_fix: + for param in parameters_to_fix: + new_peak.fixParameter(param) + + comp_func = CompositeFunctionWrapper(FunctionWrapper(new_peak), FunctionWrapper(bg), NumDeriv=True) + new_func.add(comp_func.function) + new_func.setDomainIndex(idom, idom) + key_suffix = f"_{idom}" if idom > 0 else "" + new_func.setMatrixWorkspace(new_ws, idom, md_fit_kwargs["StartX" + key_suffix], md_fit_kwargs["EndX" + key_suffix]) + + # if ties are defined add them + func = f"{str(new_func)};ties=({','.join(ties)})" if len(ties) > 0 else new_func + + return Fit( + Function=func, + Output=f"fit_{new_ws.name()}", + MaxIterations=iters, + **fit_kwargs, + **md_fit_kwargs, + ), md_fit_kwargs def fit_all_peaks( @@ -389,11 +502,15 @@ def fit_all_peaks( peaks: Sequence[float], peak_window: float, save_dir: str, - parameters_to_tie: Optional[Sequence[str]] = ("A", "B"), override_dir: bool = False, i_over_sigma_thresh: float = 2.0, nan_replacement: Optional[str] = "zeros", no_fit_value_dict: Optional[dict] = None, + smooth_vals: Sequence[int] = (3, 2), + tied_bkgs: Sequence[bool] = (False, True), + final_fit_raw: bool = True, + parameters_to_tie: Optional[Sequence[str]] = ("A", "B"), + subsequent_fit_param_fix: Optional[Sequence[str]] = None, ) -> None: """ @@ -403,80 +520,120 @@ def fit_all_peaks( peaks: Sequence of peak positions in d-spacing peak_window: size of the window to create around the desired peak for purpose of fitting save_dir: directory to save the results in - parameters_to_tie: Sequence of parameters which should be tied across spectra override_dir: flag which, if True, will save files directly into save_dir rather than creating a folder structure i_over_sigma_thresh: I/sig less than this value will be deemed as no peak and parameter values will be nan or specified value nan_replacement: method options are ("zero", "min", "max", "mean") will try to replace the nan values in columns zero - will replace all nans with 0.0 min/max/mean - will replace all nans in a column with the min/max/mean non-nan value (otherwise will remain nan) no_fit_value_dict: allows the user to specify the unfit default value of parameters as a dict of key:value pairs + smooth_vals: the number of bins which should be combined together to improve SNR stats + tied_bkgs: a bool flag for each of the subsequent fits whether the background fits should be independent for spectra + final_fit_raw: flag for whether the final fit should be done with no smoothing + parameters_to_tie: parameters which should be tied across spectra (Default is A and B) + subsequent_fit_param_fix: parameters which should be fixed after the initial fit (Default is None) """ - if is_macOS(): - logger.warning("Fitting can be unreliable on MacOS, this is being worked on") + # currently the only fit functions intended to be used - less flexibility here allows for less user input + peak_func_name = "BackToBackExponential" + bg_func_name = "LinearBackground" + # define some parameters for the fit fit_kwargs = { - "Minimizer": "Levenberg-Marquardt", "StepSizeMethod": "Sqrt epsilon", - "IgnoreInvalidData": True, + "IgnoreInvalidData": False, "CreateOutput": True, "OutputCompositeMembers": True, + "Minimizer": "Levenberg-Marquardt", + "CostFunction": "Unweighted least squares", } - for wsname in wss: + # we are initially going to fit a summed spectra to get a good starting point for the peak centre + # we will then fix the amount this can change in the individual fits + + x0_lims, all_cropped_rebinned_wss = fit_initial_summed_spectra(wss, peaks, peak_window, fit_kwargs.copy()) + + for iws, wsname in enumerate(wss): + # notice user how far through the fitting they are (useful if any fits fail) + logger.notice(f"Fitting Workspace: {wsname} ({iws + 1}/{len(wss)})") + + # obtain the ws and metadata about ws ws = ADS.retrieve(wsname) run, prefix = _get_run_and_prefix_from_ws_log(ws, wsname) grouping = _get_grouping_from_ws_log(ws) - for peak in peaks: + # loop over the peaks + for ipeak, peak in enumerate(peaks): + # perform fitting in TOF as the parameter magnitudes are better for fitting (A, B, and S) + ws_tof = ConvertUnits(InputWorkspace=all_cropped_rebinned_wss[ipeak][iws], OutputWorkspace="ws_tof", Target="TOF") + + # approach will be to use iterative fits and these iteration can have optionally 'rebunched' data to improve SNR + fit_wss = [] + bkg_is_tied = [] + if len(smooth_vals) > 0: + for i, smooth_val in enumerate(smooth_vals): + fit_wss.append(Rebunch(InputWorkspace=ws_tof, OutputWorkspace=f"smooth_ws_{smooth_val}", NBunch=smooth_val)) + bkg_is_tied.append(tied_bkgs[i]) + # if no smoothing values are given, the initial fit should just be on ws_tof + # if final_fit_raw flagged, ws_tof should be added to the end of the fit_wss stack + if final_fit_raw or len(smooth_vals) == 0: + fit_wss.append(ws_tof) + bkg_is_tied.append(True) + + # low level information logger.information(f"Workspace: {wsname}, Peak: {peak}") - # change peak window to fraction + + # set up an index of which of the fit iterations we are on + fit_num = 0 + # set up final ws and file paths out_ws = f"{prefix}{run}_{peak}_{grouping}_Fit_Parameters" out_file = out_ws + ".nxs" out_path = path.join(save_dir, out_file) if override_dir else path.join(save_dir, grouping, str(peak), out_file) - - xmin, xmax = peak - peak_window, peak + peak_window - out_tab = CreateEmptyTableWorkspace(OutputWorkspace=out_ws) - peak_func_name = "BackToBackExponential" - bg_func_name = "LinearBackground" + # get window bounds + xmin, xmax = peak - peak_window, peak + peak_window - func_generator = TexturePeakFunctionGenerator([]) # don't fix any parameters - parameters_to_tie = () if not parameters_to_tie else parameters_to_tie - initial_function, md_fit_kwargs, intensity_estimates = func_generator.get_initial_fit_function_and_kwargs_from_specs( - ws, peak, (xmin, xmax), parameters_to_tie, peak_func_name, bg_func_name + # perform initial fit set up and fit + fit_ws = fit_wss[fit_num] + initial_function, md_fit_kwargs, intensity_estimates = get_initial_fit_function_and_kwargs_from_specs( + ws, fit_ws, peak, (xmin, xmax), x0_lims[ipeak], parameters_to_tie, peak_func_name, bg_func_name, bkg_is_tied[fit_num] ) - fit_object = Fit( Function=initial_function, - CostFunction="Least squares", - Output="first_fit", + Output=f"fit_{fit_ws}", MaxIterations=50, # if it hasn't fit in 50 it is likely because the texture has the peak missing **fit_kwargs, **md_fit_kwargs, ) - fit_result = {"Function": fit_object.Function.function, "OutputWorkspace": fit_object.OutputWorkspace.name()} - + # perform subsequent fits + while len(fit_wss) - 1 > fit_num: + fit_num += 1 + mdf = fit_object.Function.function + fit_ws = fit_wss[fit_num] + fit_object, md_fit_kwargs = rerun_fit_with_new_ws( + mdf, + fit_kwargs, + md_fit_kwargs, + fit_ws, + 0.01, # allow x0 to only vary by 1% from previous fit + 50, + subsequent_fit_param_fix, + parameters_to_tie, + bkg_is_tied[fit_num], + fit_num == len(fit_wss) - 1, + ) + + # establish which detectors have sufficient I over sigma + mdf = fit_object.Function.function + fit_result = {"Function": mdf, "OutputWorkspace": fit_object.OutputWorkspace.name()} # update peak mask based on I/sig from fit *_, i_over_sigma, _ = calc_intens_and_sigma_arrays(fit_result, "Summation") fit_mask = i_over_sigma > i_over_sigma_thresh - # fit only peak pixels and let peak centers vary independently of DIFC ratio - final_function = func_generator.get_final_fit_function(fit_result["Function"], fit_mask, 0.02) - Fit( - Function=final_function, - CostFunction="Least squares", - Output="fit", - MaxIterations=50, # if it hasn't fit in 50 it is likely because the texture has the peak missing - **fit_kwargs, - **md_fit_kwargs, - ) - - spec_fit = ADS.retrieve("fit_Parameters") + # setup output table columns + spec_fit = ADS.retrieve(f"fit_{fit_ws}_Parameters") si = ws.spectrumInfo() - out_tab.addColumn("int", "wsindex") out_tab.addColumn("double", "I_est") all_params = spec_fit.column("Name")[:-1] # last row is cost function @@ -491,16 +648,27 @@ def fit_all_peaks( out_tab.addColumn("double", param) out_tab.addColumn("double", f"{param}_err") + # get user defined default vals for unsuccessful fit parameters default_vals = get_default_values(u_params, no_fit_value_dict) + # populate the rows of the table table_vals = np.zeros((si.size(), 2 * len(u_params) + 1)) # intensity_est + param_1_val, param_1_err, +... for ispec in range(si.size()): + # logic for spectra which HAVE been fit successfully if fit_mask[ispec]: row = [intensity_estimates[ispec]] for p in u_params: param_name = f"f{ispec}.f0.{p}" pind = all_params.index(param_name) - row += [param_vals[pind], param_errs[pind]] + if p != "X0": + row += [param_vals[pind], param_errs[pind]] + else: + # for x0, convert back to d spacing + diff_consts = si.diffractometerConstants(ispec) + d_peak = UnitConversion.run("TOF", "dSpacing", param_vals[pind], 0, DeltaEModeType.Elastic, diff_consts) + d_err = convert_TOFerror_to_derror(diff_consts, param_errs[pind], d_peak) + row += [d_peak, d_err] + # logic for spectra which HAVE NOT been fit successfully else: row = [default_vals.get("I_est", np.nan)] for p in u_params: @@ -510,9 +678,81 @@ def fit_all_peaks( table_vals = replace_nans(table_vals, nan_replacement) for i, row in enumerate(table_vals): out_tab.addRow([i] + list(row)) + + # save the final table SaveNexus(InputWorkspace=out_ws, Filename=out_path) +# ~fitting utility functions~ + + +def _tie_bkg(function, approx_bkgs, ties): + function[0][1]["A0"] = np.mean(approx_bkgs) + for idom in range(1, function.nDomains()): + for ipar_bg in range(function[idom][1].nParams()): + par = function[idom][1].getParamName(ipar_bg) + ties.append(f"f{idom}.f1.{par}=f0.f1.{par}") + return function, ties + + +def _estimate_intensity_background_and_centre( + ws: Workspace2D, ispec: int, istart: int, iend: int, peak: float +) -> Tuple[float, float, float, float]: + xdat = ws.readX(ispec)[istart:iend] + bin_width = np.diff(xdat) + bin_width = np.hstack((bin_width, bin_width[-1])) # easier than checking iend and istart not out of bounds + y = ws.readY(ispec)[istart:iend] + if not np.any(y > 0): + return 0.0, 0.0, 0.0, peak + e = ws.readE(ispec)[istart:iend] + ibg, _ = PeakData.find_bg_pts_seed_skew(y) + bg = np.mean(y[ibg]) + intensity = np.trapezoid((y - bg), xdat) + sigma = np.sqrt(np.sum((e * bin_width) ** 2)) + centre_arg = np.argmax(y) + centre = xdat[centre_arg] + return intensity, sigma, bg, centre + + +def _get_run_and_prefix_from_ws_log(ws: Workspace2D, wsname: str) -> Tuple[str, str]: + try: + run = str(ws.getRun().getLogData("run_number").value) + prefix = wsname.split(run)[0] + except: + run = "unknown" + prefix = "" + return run, prefix + + +def _get_grouping_from_ws_log(ws: Workspace2D) -> str: + try: + grouping = str(ws.getRun().getLogData("Grouping").value) + except RuntimeError: + grouping = "GROUP" + return grouping + + +def get_default_values(params, no_fit_dict): + defaults = dict(zip(params, [np.nan for _ in params])) + if isinstance(no_fit_dict, dict): + for k, v in no_fit_dict.items(): + defaults[k] = v + return defaults + + +def replace_nans(vals: np.ndarray, method: Optional[str] = None) -> np.ndarray: + if not method: + return vals + if method == "zeros": + return np.nan_to_num(vals, nan=0) + func = {"mean": np.nanmean, "max": np.nanmax, "min": np.nanmin}[method] + out = vals.copy() + col_stat = func(out, axis=0) + nan_mask = np.isnan(out) + out[nan_mask] = col_stat[np.where(nan_mask)[1]] + return out + + # -------- Pole Figure Script Logic-------------------------------- diff --git a/scripts/test/Engineering/test_EnggUtils.py b/scripts/test/Engineering/test_EnggUtils.py index 9a7ab2446ab5..61de0d761d1d 100644 --- a/scripts/test/Engineering/test_EnggUtils.py +++ b/scripts/test/Engineering/test_EnggUtils.py @@ -13,7 +13,9 @@ process_vanadium, GROUP, focus_run, + convert_TOFerror_to_derror, ) +from mantid.kernel import UnitConversion, DeltaEModeType, UnitParams, UnitParametersMap enggutils_path = "Engineering.EnggUtils" @@ -406,6 +408,17 @@ def test_focus_run_texture_group_with_rb_only_rb_dir( expected_dirs = [path.join("/mock", "User", "RB123", "Focus", "Texture20")] mock_save.assert_called_with(expected_dirs, ws, calib, "123456", "RB123") + def test_convert_centres_and_error_from_TOF_to_d(self): + params = UnitParametersMap() + params[UnitParams.difc] = 18000 + tof = 40000 + tof_error = 5 + d = UnitConversion.run("TOF", "dSpacing", tof, 0, DeltaEModeType.Elastic, params) + d_error = convert_TOFerror_to_derror(params, tof_error, d) + + self.assertAlmostEqual(tof / d, 18000, delta=1e-10) + self.assertAlmostEqual(d_error / d, tof_error / tof, delta=1e-10) + if __name__ == "__main__": unittest.main() diff --git a/scripts/test/Engineering/texture/test_TextureUtils.py b/scripts/test/Engineering/texture/test_TextureUtils.py index 8ca9235f36c9..e204fe5c02ee 100644 --- a/scripts/test/Engineering/texture/test_TextureUtils.py +++ b/scripts/test/Engineering/texture/test_TextureUtils.py @@ -7,7 +7,7 @@ # import unittest from unittest.mock import patch, MagicMock, call - +from os import path import numpy as np import tempfile from Engineering.texture.TextureUtils import ( @@ -17,19 +17,22 @@ run_abs_corr, validate_abs_corr_inputs, fit_all_peaks, - make_iterable, + get_initial_fit_function_and_kwargs_from_specs, create_pf_loop, _get_run_and_prefix_from_ws_log, _get_grouping_from_ws_log, - TexturePeakFunctionGenerator, + fit_initial_summed_spectra, + rerun_fit_with_new_ws, + _tie_bkg, + crop_and_rebin, + crop_wss_and_combine, ) -from numpy import ones import os texture_utils_path = "Engineering.texture.TextureUtils" -class TextureUtilsTest(unittest.TestCase): +class TextureUtilsFileHelperTests(unittest.TestCase): def test_find_all_files_returns_only_files(self): with tempfile.TemporaryDirectory() as tmpdir: file_path = os.path.join(tmpdir, "file1.txt") @@ -48,6 +51,8 @@ def test_mk_creates_missing_directory(self): mk(test_path) self.assertTrue(os.path.exists(test_path)) + +class TextureUtilsFocusTests(unittest.TestCase): @patch(f"{texture_utils_path}.mk") @patch(f"{texture_utils_path}.TextureInstrument") def test_run_focus_script_instantiates_model_and_calls_main(self, mock_instr, mock_mk): @@ -58,6 +63,8 @@ def test_run_focus_script_instantiates_model_and_calls_main(self, mock_instr, mo mock_model.main.assert_called_once() mock_mk.assert_called_once_with("focus") + +class TextureUtilsAbsCorrTests(unittest.TestCase): @patch(f"{texture_utils_path}.logger") @patch(f"{texture_utils_path}.validate_abs_corr_inputs") @patch(f"{texture_utils_path}.TextureCorrectionModel") @@ -151,6 +158,8 @@ def test_run_abs_corr_logs_error_on_invalid_input(self, mock_model, mock_logger) mock_logger.error.assert_called_once() self.assertIn("must flag orient_file_is_euler", mock_logger.error.call_args[0][0]) + +class TextureUtilsFittingUtilsTests(unittest.TestCase): def test_get_run_and_prefix_with_valid_log(self): run_number = "123456" prefix = "TEST" @@ -183,247 +192,715 @@ def test_get_grouping_with_no_log_gives_fallback(self): out_group = _get_grouping_from_ws_log(mock_ws) self.assertEqual(out_group, "GROUP") - def setup_expected_fit_results(self, wsname, prefix, run_number, group, peak, num_spec, save_dir): - expected_func = "some_fit_func" - expected_first_fit_kwargs = { - "Function": expected_func, - "CostFunction": "Least squares", - "Output": "first_fit", - "MaxIterations": 50, - "InputWorkspace": wsname, - "Minimizer": "Levenberg-Marquardt", - "StepSizeMethod": "Sqrt epsilon", - "IgnoreInvalidData": True, - "CreateOutput": True, - "OutputCompositeMembers": True, - } - expected_fit_kwargs = { - "Function": expected_func, - "CostFunction": "Least squares", - "Output": "fit", - "MaxIterations": 50, - "InputWorkspace": wsname, - "Minimizer": "Levenberg-Marquardt", - "StepSizeMethod": "Sqrt epsilon", - "IgnoreInvalidData": True, - "CreateOutput": True, - "OutputCompositeMembers": True, + def test_tie_bkg(self): + # inputs + function = MagicMock() + approx_bkgs = [1.0, 3.0] + ties = [] + + bg1 = MagicMock() + bg1.nParams.return_value = 2 + bg1.getParamName.side_effect = ("A0", "A1") + + bg0 = {} + + comp0, comp1 = MagicMock(), MagicMock() + comp0.__getitem__.return_value = bg0 + comp1.__getitem__.return_value = bg1 + + function.__getitem__.side_effect = lambda i: [comp0, comp1][i] + function.nDomains.return_value = 2 + + # exec + out_func, out_ties = _tie_bkg(function, approx_bkgs, ties) + + self.assertIs(out_func, function) + self.assertIs(out_ties, ties) + self.assertEqual(out_ties, ["f1.f1.A0=f0.f1.A0", "f1.f1.A1=f0.f1.A1"]) + self.assertEqual(bg0["A0"], 2.0) + + @patch(f"{texture_utils_path}.Rebin") + @patch(f"{texture_utils_path}.CropWorkspace") + def test_crop_and_rebin(self, mock_crop, mock_rebin): + # inputs + ws = "ws1" + out = "out_ws" + lower = 1 + upper = 2 + rebin_params = (1, 0.1, 2) + + crop_and_rebin(ws, out, lower, upper, rebin_params) + + mock_crop.assert_called_once_with(ws, lower, upper, OutputWorkspace="__tmp_peak_window") + mock_rebin.assert_called_once_with("__tmp_peak_window", rebin_params, OutputWorkspace=out) + + @patch(f"{texture_utils_path}.SumSpectra") + @patch(f"{texture_utils_path}.AppendSpectra") + @patch(f"{texture_utils_path}.CloneWorkspace") + @patch(f"{texture_utils_path}.Rebin") + @patch(f"{texture_utils_path}._get_max_bin") + @patch(f"{texture_utils_path}.CropWorkspace") + @patch(f"{texture_utils_path}.crop_and_rebin") + def test_crop_wss_and_combine(self, mock_crop_and_rebin, mock_crop, mock_max_bin, mock_rebin, mock_clone, mock_append, mock_sum): + # inputs + wss = ["ws1", "ws2"] + out = "out_ws" + lower = 1 + upper = 2 + peak = 1.5 + + mock_max_bin.return_value = 0.1 + + expected_rebin_params = (1, 0.1, 2) + + mock_peak_window_ws = MagicMock() + mock_peak_window_ws.extractX.return_value = MagicMock() + + _, output_list = crop_wss_and_combine(wss, peak, lower, upper, out) + + # mock returns + mock_crop.assert_called_once_with(wss[0], lower, upper, OutputWorkspace="__peak_window_crop") + mock_rebin.assert_called_once_with("__peak_window_crop", expected_rebin_params, OutputWorkspace=f"rebin_ws_{peak}_0") + mock_clone.assert_called_once_with(InputWorkspace=f"rebin_ws_{peak}_0", OutputWorkspace=f"rebin_ws_{peak}") + + mock_crop_and_rebin.assert_called_once_with(wss[1], f"rebin_ws_{peak}_1", lower, upper, expected_rebin_params) + + mock_append.assert_called_once_with(f"rebin_ws_{peak}", f"rebin_ws_{peak}_1", OutputWorkspace=f"rebin_ws_{peak}") + + mock_sum.assert_called_once_with(f"rebin_ws_{peak}", OutputWorkspace=out) + + self.assertEqual(output_list, [f"rebin_ws_{peak}_0", f"rebin_ws_{peak}_1"]) + + +class TextureUtilsFittingStepsTests(unittest.TestCase): + @patch(f"{texture_utils_path}.Fit") + @patch(f"{texture_utils_path}.CompositeFunctionWrapper") + @patch(f"{texture_utils_path}._estimate_intensity_background_and_centre") + @patch(f"{texture_utils_path}.FunctionFactory") + @patch(f"{texture_utils_path}.crop_wss_and_combine") + def test_fit_initial_summed_spectra( + self, + mock_crop_and_combine, + mock_func_factory, + mock_estimate_intens, + mock_comp, + mock_fit, + ): + # inputs + wss = ["ws1", "ws2"] + peak1, peak2 = 1.0, 2.0 + peaks = [peak1, peak2] + peak_window = 0.05 + fit_kwargs = {} + + # some mock intermediates + x_vals = [1, 1.5, 2] + intensities, sigmas = (2.0, 4.0), (1.0, 1.0) + + # some mock returns + mock_fit.return_value = MagicMock() + + peak1_window_ws, peak2_window_ws = MagicMock(), MagicMock() + peak1_window_ws.readX.return_value = x_vals + peak2_window_ws.readX.return_value = x_vals + peak1_window_ws.name.return_value = "peak_window_0" + peak2_window_ws.name.return_value = "peak_window_1" + + peak_func1, peak_func2 = MagicMock(), MagicMock() + + comp_func1, comp_func2 = MagicMock(), MagicMock() + + mock_crop_and_combine.side_effect = ((peak1_window_ws, ["ws1_1.0", "ws2_1.0"]), (peak2_window_ws, ["ws1_2.0", "ws2_2.0"])) + mock_func_factory.createFunction.return_value = MagicMock() + mock_instance = MagicMock() + mock_func_factory.Instance.return_value = mock_instance + mock_instance.createPeakFunction.side_effect = (peak_func1, peak_func2) + + mock_estimate_intens.side_effect = list(zip(intensities, sigmas)) + + mock_comp.side_effect = (comp_func1, comp_func2) + + # expected returns + peak1_kwargs = {"InputWorkspace": "peak_window_0", "StartX": 0.95, "EndX": 1.05} + peak2_kwargs = {"InputWorkspace": "peak_window_1", "StartX": 1.95, "EndX": 2.05} + + # exec + _, all_wss = fit_initial_summed_spectra(wss, peaks, peak_window, fit_kwargs) + + # assert + + mock_estimate_intens.assert_has_calls( + [ + call(peak1_window_ws, 0, 0, 2, peak1), # 2 is len(x_val) -1 + call(peak2_window_ws, 0, 0, 2, peak2), + ] + ) + + peak_func1.setCentre.assert_called_once_with(peak1) + peak_func2.setCentre.assert_called_once_with(peak2) + + mock_fit.assert_has_calls( + [ + call( + Function=comp_func1, + Output=f"composite_fit_{peak1}", + MaxIterations=50, + **peak1_kwargs, + ), + call( + Function=comp_func2, + Output=f"composite_fit_{peak2}", + MaxIterations=50, + **peak2_kwargs, + ), + ], + any_order=True, + ) + self.assertEqual(all_wss, [["ws1_1.0", "ws2_1.0"], ["ws1_2.0", "ws2_2.0"]]) + + @patch(f"{texture_utils_path}._tie_bkg") + @patch(f"{texture_utils_path}.CompositeFunctionWrapper") + @patch(f"{texture_utils_path}._estimate_intensity_background_and_centre") + @patch(f"{texture_utils_path}.UnitConversion") + @patch(f"{texture_utils_path}.DeltaEModeType") + @patch(f"{texture_utils_path}.FunctionFactory") + @patch(f"{texture_utils_path}.MultiDomainFunction") + def test_get_initial_fit_function_and_kwargs_from_specs( + self, mock_gen_mdf, mock_func_factory, mock_delta_e, mock_unit_conv, mock_estimate_intens, mock_comp, mock_tie_bkg + ): + # inputs + + mock_ws = MagicMock() + mock_ws_tof = MagicMock() + peak = 1.0 + x_window = (0.95, 1.05) + x0_window = (0.99, 1.01) + parameters_to_tie = ("A", "B") + peak_func_name = "BackToBackExponential" + bg_func_name = "LinearBackground" + bkg_is_tied = True + + # mock intermediates + + # mock spectrumInfo + mock_si = MagicMock() + mock_si.size.return_value = 2 # two spectra + diff_consts = MagicMock() + mock_si.diffractometerConstants.return_value = diff_consts + mock_ws.spectrumInfo.return_value = mock_si + + # mock functions and function factory + base_peak_func, peak_func1, peak_func2 = MagicMock(), MagicMock(), MagicMock() + + comp_func1, comp_func2 = MagicMock(), MagicMock() + comp_func1.function = "cf1" + comp_func2.function = "cf2" + + mock_func_factory.createFunction.return_value = MagicMock() + mock_instance = MagicMock() + mock_func_factory.Instance.return_value = mock_instance + mock_instance.createPeakFunction.side_effect = (base_peak_func, peak_func1, peak_func2) + + # mock unit conversion + + tof_peaks, tof_starts, tof_ends = (10, 11), (8, 9), (15, 16) # not physically accurate TOF but easier to keep track of + x0_lowers, x0_uppers = (0.99, 0.99), (1.01, 1.01) + mock_delta_e.Elastic = "elastic" + unit_conv_res1, unit_conv_res2 = zip(tof_peaks, tof_starts, tof_ends, x0_lowers, x0_uppers) + mock_unit_conv.run.side_effect = unit_conv_res1 + unit_conv_res2 + + # mock ws_tof data access + + istarts, iends = (2, 3), (100, 101) # mock indices for the indices corresponding to an x value + mock_ws_tof.yIndexOfX.side_effect = [istarts[0], iends[0], istarts[1], iends[1]] + mock_ws_tof.name.return_value = "ws_tof" + + # mock estimate + + intensities, sigmas, bgs, x0s = (2.0, 4.0), (1.0, 1.0), (0.0, 0.0), (10, 11) + + mock_estimate_intens.side_effect = list(zip(intensities, sigmas, bgs, x0s)) + + # mock mdf + + mock_mdf = MagicMock() + mock_gen_mdf.return_value = mock_mdf + mock_mdf.nDomains.return_value = 2 + + mock_comp.side_effect = (comp_func1, comp_func2) + + # mock bkg tie + + mock_tie_bkg.return_value = (mock_mdf, []) + + # mock func processing + + base_peak_func.nParams.return_value = 5 + base_peak_func.getParamName.side_effect = ("A", "B", "I", "X0", "S", "A", "B", "I", "X0", "S") + + mock_mdf.__str__.return_value = "func" + + # exec + + out_func, out_kwargs, _ = get_initial_fit_function_and_kwargs_from_specs( + mock_ws, mock_ws_tof, peak, x_window, x0_window, parameters_to_tie, peak_func_name, bg_func_name, bkg_is_tied + ) + + # assert + + base_peak_func.setIntensity.assert_called_once_with(1.0) + base_peak_func.getCentreParameterName.assert_called_once() + base_peak_func.getWidthParameterName.assert_called_once() + + vals_to_convert = (1.0, 0.95, 1.05, 0.99, 1.01, 1.0, 0.95, 1.05, 0.99, 1.01) + mock_unit_conv.run.assert_has_calls([call("dSpacing", "TOF", val, 0, mock_delta_e.Elastic, diff_consts) for val in vals_to_convert]) + + mock_mdf.add.assert_has_calls([call(comp_func1.function), call(comp_func2.function)]) + mock_mdf.setDomainIndex.assert_has_calls([call(0, 0), call(1, 1)]) + mock_mdf.setMatrixWorkspace.assert_has_calls([call(mock_ws_tof, i, tof_starts[i], tof_ends[i]) for i in range(2)]) + + self.assertEqual(out_func, "func;ties=(f1.f0.A=f0.f0.A,f1.f0.B=f0.f0.B)") + + expected_spec_kwargs = { + "InputWorkspace": "ws_tof", + "StartX": tof_starts[0], + "EndX": tof_ends[0], + "WorkspaceIndex": 0, + "InputWorkspace_1": "ws_tof", + "StartX_1": tof_starts[1], + "EndX_1": tof_ends[1], + "WorkspaceIndex_1": 1, } - expected_tab_name = f"{prefix}{run_number}_{peak}_{group}_Fit_Parameters" - expected_func_kwargs = {"InputWorkspace": wsname} - expected_create_tab_kwargs = {"OutputWorkspace": expected_tab_name} - expected_save_kwargs = {"InputWorkspace": expected_tab_name, "Filename": os.path.join(save_dir, f"{expected_tab_name}.nxs")} - expected_intensity_sub_bkg = ones(num_spec) - # expect a row for each parameter of each function for each spectra - here we just have the one func f0 with param P - # also expect a final cost function - expected_name_col = [f"f{i}.f0.P" for i in range(num_spec)] + ["Cost Function"] - expected_val_col = np.ones(len(expected_name_col)) - expected_err_col = np.zeros_like(expected_val_col) - return { - "wsname": wsname, - "prefix": prefix, - "run_number": run_number, - "group": group, - "peak": peak, - "num_spec": peak, - "save_dir": save_dir, - "expected_func": expected_func, - "expected_first_fit_kwargs": expected_first_fit_kwargs, - "expected_fit_kwargs": expected_fit_kwargs, - "expected_func_kwargs": expected_func_kwargs, - "expected_create_tab_kwargs": expected_create_tab_kwargs, - "expected_save_kwargs": expected_save_kwargs, - "expected_name_col": expected_name_col, - "expected_val_col": expected_val_col, - "expected_err_col": expected_err_col, - "expected_intensity_sub_bkg": expected_intensity_sub_bkg, + self.assertEqual(expected_spec_kwargs, out_kwargs) + + @patch(f"{texture_utils_path}.Fit") + @patch(f"{texture_utils_path}.CompositeFunctionWrapper") + @patch(f"{texture_utils_path}.FunctionWrapper") + @patch(f"{texture_utils_path}.FunctionFactory") + @patch(f"{texture_utils_path}.MultiDomainFunction") + def test_rerun_fit_with_new_ws( + self, + mock_gen_mdf, + mock_func_factory, + mock_func_wrapper, + mock_comp_wrapper, + mock_fit, + ): + # inputs + mdf = MagicMock() + fit_kwargs = {"SomeKwarg": 123} + md_fit_kwargs = { + "InputWorkspace": "old_ws", + "StartX": 10.0, + "EndX": 20.0, + "WorkspaceIndex": 0, + "InputWorkspace_1": "old_ws", + "StartX_1": 30.0, + "EndX_1": 40.0, + "WorkspaceIndex_1": 1, } + new_ws = MagicMock() + new_ws.name.return_value = "new_ws" + + x0_frac_move = 0.1 + iters = 50 + parameters_to_fix = ("A", "B") + tie_background = True + + # mock existing mdf domains: two composite functions (peak + bg) + mdf.nFunctions.return_value = 2 + + peak0, peak1 = MagicMock(), MagicMock() + bg0, bg1 = MagicMock(), MagicMock() + + peak0.name.return_value = "BackToBackExponential" + peak1.name.return_value = "BackToBackExponential" + + # peak parameter vals + peak0.getParameterValue.side_effect = lambda p: {"I": 0.5, "X0": 10.0, "A": 1.0, "B": 2.0, "S": 3.0}[p] + peak1.getParameterValue.side_effect = lambda p: {"I": 4.0, "X0": 20.0, "A": 4.0, "B": 5.0, "S": 6.0}[p] + + # background params + bg1.nParams.return_value = 2 + bg1.getParamName.side_effect = ("A0", "A1") + + comp0, comp1 = MagicMock(), MagicMock() + comp0.__getitem__.side_effect = lambda i: [peak0, bg0][i] + comp1.__getitem__.side_effect = lambda i: [peak1, bg1][i] + mdf.__getitem__.side_effect = (comp0, comp1) + + # mock mdf + new_func = MagicMock() + mock_gen_mdf.return_value = new_func + new_func.__str__.return_value = "newfunc" + + # mock func factory + new_peak0, new_peak1 = MagicMock(), MagicMock() + mock_instance = MagicMock() + mock_func_factory.Instance.return_value = mock_instance + mock_instance.createPeakFunction.side_effect = (new_peak0, new_peak1) + + # mock composite functions + comp_wrap0, comp_wrap1 = MagicMock(), MagicMock() + comp_wrap0.function = "cf0" + comp_wrap1.function = "cf1" + mock_comp_wrapper.side_effect = (comp_wrap0, comp_wrap1) + + # mock fit + fit_return = MagicMock() + mock_fit.return_value = fit_return + + # exec + out_fit, out_md_kwargs = rerun_fit_with_new_ws( + mdf=mdf, + fit_kwargs=fit_kwargs, + md_fit_kwargs=md_fit_kwargs, + new_ws=new_ws, + x0_frac_move=x0_frac_move, + iters=iters, + parameters_to_fix=parameters_to_fix, + tie_background=tie_background, + ) - def setup_fit_mocks_from_expected_results(self, repeated_expected_results, mock_peak_func_gen_cls, mock_ads, mock_intens_sigma): - # mock ws information retrieval - mock_ws = MagicMock() - mock_ws.getNumberHistograms.side_effect = [er["num_spec"] for er in repeated_expected_results] + # assert + self.assertEqual(out_md_kwargs["InputWorkspace"], "new_ws") + self.assertEqual(out_md_kwargs["InputWorkspace_1"], "new_ws") - # mock Peak Function Generator - mock_gen = MagicMock() - mock_gen.get_initial_fit_function_and_kwargs_from_specs.side_effect = [ - (er["expected_func"], er["expected_func_kwargs"], er["expected_intensity_sub_bkg"]) for er in repeated_expected_results + mock_instance.createPeakFunction.assert_has_calls([call("BackToBackExponential"), call("BackToBackExponential")]) + + expected_set_calls_peak0 = [ + call("I", 0.5), + call("X0", 10.0), + call("A", 1.0), + call("B", 2.0), + call("S", 3.0), + ] + expected_set_calls_peak1 = [ + call("I", 4.0), + call("X0", 20.0), + call("A", 4.0), + call("B", 5.0), + call("S", 6.0), ] - mock_gen.get_final_fit_function.side_effect = [er["expected_func"] for er in repeated_expected_results] - mock_peak_func_gen_cls.return_value = mock_gen - - # mock Output Parameter Table - mock_param_ws = MagicMock() - mock_get_name = MagicMock() - mock_get_val = MagicMock() - mock_get_err = MagicMock() - mock_get_name.side_effect = [er["expected_name_col"] for er in repeated_expected_results] - mock_get_val.side_effect = [er["expected_val_col"] for er in repeated_expected_results] - mock_get_err.side_effect = [er["expected_err_col"] for er in repeated_expected_results] - mock_param_ws.column.side_effect = lambda name: ( - mock_get_name() if name == "Name" else mock_get_val() if name == "Value" else mock_get_err() + new_peak0.setParameter.assert_has_calls(expected_set_calls_peak0, any_order=False) + new_peak1.setParameter.assert_has_calls(expected_set_calls_peak1, any_order=False) + + # constraints updated around new values + # note intens is max(I, 1) + # I=0.5 and x0=10 then bounds should be 0.5