diff --git a/mypy.ini b/mypy.ini index d87115bc20..d401a8871f 100644 --- a/mypy.ini +++ b/mypy.ini @@ -3,6 +3,9 @@ ignore_missing_imports = True exclude = doc/sphinx/source/ +[mypy-MDAnalysis.analysis.rms] +ignore_errors = False + [mypy-MDAnalysis.analysis.*] ignore_errors = True diff --git a/package/AUTHORS b/package/AUTHORS index e23f2afcf2..4960d6ff81 100644 --- a/package/AUTHORS +++ b/package/AUTHORS @@ -265,6 +265,7 @@ Chronological list of authors - Raúl Lois-Cuns - Pranay Pelapkar - Shreejan Dolai + - Chetan Agarwal External code ------------- diff --git a/package/MDAnalysis/analysis/rms.py b/package/MDAnalysis/analysis/rms.py index 55a1322a75..b9b545b693 100644 --- a/package/MDAnalysis/analysis/rms.py +++ b/package/MDAnalysis/analysis/rms.py @@ -166,16 +166,38 @@ import logging import warnings -from ..lib import qcprot as qcp +from ..lib import qcprot as qcp # type: ignore[attr-defined] from ..analysis.base import AnalysisBase, ResultsGroup from ..exceptions import SelectionError from ..lib.util import asiterable, iterable, get_weights +from typing import ( + Optional, + Union, + List, + Dict, + Any, + Sequence, + Tuple, + TYPE_CHECKING, +) +from numpy.typing import NDArray + +if TYPE_CHECKING: # pragma: no cover + from ..core.groups import AtomGroup + from ..core.universe import Universe + logger = logging.getLogger("MDAnalysis.analysis.rmsd") -def rmsd(a, b, weights=None, center=False, superposition=False): +def rmsd( + a: NDArray, + b: NDArray, + weights: Optional[NDArray] = None, + center: bool = False, + superposition: bool = False, +) -> float: r"""Returns RMSD between two coordinate sets `a` and `b`. `a` and `b` are arrays of the coordinates of N atoms of shape @@ -282,7 +304,9 @@ def rmsd(a, b, weights=None, center=False, superposition=False): return np.sqrt(np.sum((a - b) ** 2) / N) -def process_selection(select): +def process_selection( + select: Union[str, Tuple[str, str], Dict[str, str]] +) -> Dict[str, Any]: """Return a canonical selection dictionary. Parameters @@ -365,6 +389,14 @@ class RMSD(AnalysisBase): _analysis_algorithm_is_parallelizable = True + atomgroup: Union["AtomGroup", "Universe"] + reference: Union["AtomGroup", "Universe"] + groupselections: List[Dict[str, Any]] + weights: Optional[Union[str, NDArray, List[Any]]] + weights_groupselections: Union[bool, List[Any]] + tol_mass: float + ref_frame: int + @classmethod def get_supported_backends(cls): return ( @@ -375,15 +407,17 @@ def get_supported_backends(cls): def __init__( self, - atomgroup, - reference=None, - select="all", - groupselections=None, - weights=None, - weights_groupselections=False, - tol_mass=0.1, - ref_frame=0, - **kwargs, + atomgroup: Union["AtomGroup", "Universe"], + reference: Optional[Union["AtomGroup", "Universe"]] = None, + select: Union[str, Dict[str, str], Tuple[str, str]] = "all", + groupselections: Optional[ + Sequence[Union[str, Dict[str, str], Tuple[str, str]]] + ] = None, + weights: Optional[Union[str, NDArray, List[Any]]] = None, + weights_groupselections: Union[bool, List[Any]] = False, + tol_mass: float = 0.1, + ref_frame: int = 0, + **kwargs: Any, ): r"""Parameters ---------- @@ -631,7 +665,7 @@ def __init__( if iterable(self.weights): element_lens = [] - for element in self.weights: + for element in self.weights: # type: ignore[union-attr] if iterable(element): element_lens.append(len(element)) else: @@ -646,13 +680,13 @@ def __init__( get_weights(self.mobile_atoms, self.weights) if self.weights_groupselections: - if len(self.weights_groupselections) != len(self.groupselections): + if len(self.weights_groupselections) != len(self.groupselections): # type: ignore[arg-type] raise ValueError( "Length of weights_groupselections is not equal to " "length of groupselections " ) for weights, atoms, selection in zip( - self.weights_groupselections, + self.weights_groupselections, # type: ignore[arg-type] self._groupselections_atoms, self.groupselections, ): @@ -665,7 +699,7 @@ def __init__( + " happens in selection %s" % selection["mobile"] ) - def _prepare(self): + def _prepare(self) -> None: self._n_atoms = self.mobile_atoms.n_atoms if not self.weights_groupselections: if not iterable( @@ -679,15 +713,17 @@ def _prepare(self): self.groupselections ) + weights_gs = self.weights_groupselections + assert isinstance(weights_gs, list) # pragma: no cover for igroup, (weights, atoms) in enumerate( - zip(self.weights_groupselections, self._groupselections_atoms) + zip(weights_gs, self._groupselections_atoms) ): if str(weights) == "mass": - self.weights_groupselections[igroup] = atoms["mobile"].masses + weights_gs[igroup] = atoms["mobile"].masses if weights is not None: - self.weights_groupselections[igroup] = np.asarray( - self.weights_groupselections[igroup], dtype=np.float64 - ) / np.mean(self.weights_groupselections[igroup]) + weights_gs[igroup] = np.asarray( + weights_gs[igroup], dtype=np.float64 + ) / np.mean(weights_gs[igroup]) # add the array of weights to weights_select self.weights_select = get_weights(self.mobile_atoms, self.weights) self.weights_ref = get_weights(self.ref_atoms, self.weights) @@ -722,7 +758,7 @@ def _prepare(self): # Move back to the original frame self.reference.universe.trajectory[current_frame] - self._ref_coordinates64 = self._ref_coordinates.astype(np.float64) + self._ref_coordinates64 = self._ref_coordinates.astype(np.float64) # type: ignore[assignment] if self._groupselections_atoms: # Only carry out a rotation if we want to calculate secondary @@ -733,7 +769,7 @@ def _prepare(self): self._rot = np.zeros(9, dtype=np.float64) # allocate space self._R = self._rot.reshape(3, 3) else: - self._rot = None + self._rot = None # type: ignore[assignment] self.results.rmsd = np.zeros( (self.n_frames, 3 + len(self._groupselections_atoms)) @@ -746,7 +782,7 @@ def _prepare(self): def _get_aggregator(self): return ResultsGroup(lookup={"rmsd": ResultsGroup.ndarray_vstack}) - def _single_frame(self): + def _single_frame(self) -> None: mobile_com = self.mobile_atoms.center(self.weights_select).astype( np.float64 ) @@ -787,6 +823,9 @@ def _single_frame(self): # 2) calculate secondary RMSDs (without any further # superposition) + assert isinstance( + self.weights_groupselections, list + ) # pragma: no cover for igroup, (refpos, atoms) in enumerate( zip( self._groupselections_ref_coords64, @@ -846,7 +885,7 @@ class RMSF(AnalysisBase): def get_supported_backends(cls): return ("serial",) - def __init__(self, atomgroup, **kwargs): + def __init__(self, atomgroup: "AtomGroup", **kwargs: Any): r"""Parameters ---------- atomgroup : AtomGroup @@ -969,18 +1008,18 @@ def __init__(self, atomgroup, **kwargs): super(RMSF, self).__init__(atomgroup.universe.trajectory, **kwargs) self.atomgroup = atomgroup - def _prepare(self): + def _prepare(self) -> None: self.sumsquares = np.zeros((self.atomgroup.n_atoms, 3)) self.mean = self.sumsquares.copy() - def _single_frame(self): + def _single_frame(self) -> None: k = self._frame_index self.sumsquares += (k / (k + 1.0)) * ( self.atomgroup.positions - self.mean ) ** 2 self.mean = (k * self.mean + self.atomgroup.positions) / (k + 1) - def _conclude(self): + def _conclude(self) -> None: k = self._frame_index self.results.rmsf = np.sqrt(self.sumsquares.sum(axis=1) / (k + 1)) diff --git a/testsuite/MDAnalysisTests/analysis/test_rms.py b/testsuite/MDAnalysisTests/analysis/test_rms.py index f39baa68f1..3ccfd2de7d 100644 --- a/testsuite/MDAnalysisTests/analysis/test_rms.py +++ b/testsuite/MDAnalysisTests/analysis/test_rms.py @@ -391,6 +391,26 @@ def test_rmsd_list_of_weights_wrong_length(self, universe): weights_groupselections=[None], ) + def test_rmsd_groupselections_with_mass_weights( + self, universe, client_RMSD + ): + # Test to ensure line 694 coverage with weights="mass" in groupselections + # This tests the code path where weights="mass" is propagated to + # groupselections when weights_groupselections=False + RMSD = MDAnalysis.analysis.rms.RMSD( + universe, + select="backbone", + groupselections=["name CA", "backbone and resid 1:10"], + weights="mass", + weights_groupselections=False, + ).run(step=49, **client_RMSD) + + # Just verify it runs without error and produces valid results + assert RMSD.results.rmsd.shape[0] == 2 # 2 frames (0 and 49) + assert ( + RMSD.results.rmsd.shape[1] == 5 + ) # frame, time, rmsd, group1, group2 + def test_rmsd_group_selections( self, universe, correct_values_group, client_RMSD ):