Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ ignore_missing_imports = True
exclude = doc/sphinx/source/


[mypy-MDAnalysis.analysis.rms]
ignore_errors = False

[mypy-MDAnalysis.analysis.*]
ignore_errors = True

Expand Down
1 change: 1 addition & 0 deletions package/AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ Chronological list of authors
- Raúl Lois-Cuns
- Pranay Pelapkar
- Shreejan Dolai
- Chetan Agarwal

External code
-------------
Expand Down
95 changes: 67 additions & 28 deletions package/MDAnalysis/analysis/rms.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,38 @@
import logging
import warnings

from ..lib import qcprot as qcp
from ..lib import qcprot as qcp # type: ignore[attr-defined]
from ..analysis.base import AnalysisBase, ResultsGroup
from ..exceptions import SelectionError
from ..lib.util import asiterable, iterable, get_weights

from typing import (
Optional,
Union,
List,
Dict,
Any,
Sequence,
Tuple,
TYPE_CHECKING,
)
from numpy.typing import NDArray

if TYPE_CHECKING: # pragma: no cover
from ..core.groups import AtomGroup
from ..core.universe import Universe


logger = logging.getLogger("MDAnalysis.analysis.rmsd")


def rmsd(a, b, weights=None, center=False, superposition=False):
def rmsd(
a: NDArray,
b: NDArray,
weights: Optional[NDArray] = None,
center: bool = False,
superposition: bool = False,
) -> float:
r"""Returns RMSD between two coordinate sets `a` and `b`.

`a` and `b` are arrays of the coordinates of N atoms of shape
Expand Down Expand Up @@ -282,7 +304,9 @@ def rmsd(a, b, weights=None, center=False, superposition=False):
return np.sqrt(np.sum((a - b) ** 2) / N)


def process_selection(select):
def process_selection(
select: Union[str, Tuple[str, str], Dict[str, str]]
) -> Dict[str, Any]:
"""Return a canonical selection dictionary.

Parameters
Expand Down Expand Up @@ -365,6 +389,14 @@ class RMSD(AnalysisBase):

_analysis_algorithm_is_parallelizable = True

atomgroup: Union["AtomGroup", "Universe"]
reference: Union["AtomGroup", "Universe"]
groupselections: List[Dict[str, Any]]
weights: Optional[Union[str, NDArray, List[Any]]]
weights_groupselections: Union[bool, List[Any]]
tol_mass: float
ref_frame: int

@classmethod
def get_supported_backends(cls):
return (
Expand All @@ -375,15 +407,17 @@ def get_supported_backends(cls):

def __init__(
self,
atomgroup,
reference=None,
select="all",
groupselections=None,
weights=None,
weights_groupselections=False,
tol_mass=0.1,
ref_frame=0,
**kwargs,
atomgroup: Union["AtomGroup", "Universe"],
reference: Optional[Union["AtomGroup", "Universe"]] = None,
select: Union[str, Dict[str, str], Tuple[str, str]] = "all",
groupselections: Optional[
Sequence[Union[str, Dict[str, str], Tuple[str, str]]]
] = None,
weights: Optional[Union[str, NDArray, List[Any]]] = None,
weights_groupselections: Union[bool, List[Any]] = False,
tol_mass: float = 0.1,
ref_frame: int = 0,
**kwargs: Any,
):
r"""Parameters
----------
Expand Down Expand Up @@ -631,7 +665,7 @@ def __init__(

if iterable(self.weights):
element_lens = []
for element in self.weights:
for element in self.weights: # type: ignore[union-attr]
if iterable(element):
element_lens.append(len(element))
else:
Expand All @@ -646,13 +680,13 @@ def __init__(
get_weights(self.mobile_atoms, self.weights)

if self.weights_groupselections:
if len(self.weights_groupselections) != len(self.groupselections):
if len(self.weights_groupselections) != len(self.groupselections): # type: ignore[arg-type]
raise ValueError(
"Length of weights_groupselections is not equal to "
"length of groupselections "
)
for weights, atoms, selection in zip(
self.weights_groupselections,
self.weights_groupselections, # type: ignore[arg-type]
self._groupselections_atoms,
self.groupselections,
):
Expand All @@ -665,7 +699,7 @@ def __init__(
+ " happens in selection %s" % selection["mobile"]
)

def _prepare(self):
def _prepare(self) -> None:
self._n_atoms = self.mobile_atoms.n_atoms
if not self.weights_groupselections:
if not iterable(
Expand All @@ -679,15 +713,17 @@ def _prepare(self):
self.groupselections
)

weights_gs = self.weights_groupselections
assert isinstance(weights_gs, list) # pragma: no cover
for igroup, (weights, atoms) in enumerate(
zip(self.weights_groupselections, self._groupselections_atoms)
zip(weights_gs, self._groupselections_atoms)
):
if str(weights) == "mass":
self.weights_groupselections[igroup] = atoms["mobile"].masses
weights_gs[igroup] = atoms["mobile"].masses
if weights is not None:
self.weights_groupselections[igroup] = np.asarray(
self.weights_groupselections[igroup], dtype=np.float64
) / np.mean(self.weights_groupselections[igroup])
weights_gs[igroup] = np.asarray(
weights_gs[igroup], dtype=np.float64
) / np.mean(weights_gs[igroup])
# add the array of weights to weights_select
self.weights_select = get_weights(self.mobile_atoms, self.weights)
self.weights_ref = get_weights(self.ref_atoms, self.weights)
Expand Down Expand Up @@ -722,7 +758,7 @@ def _prepare(self):
# Move back to the original frame
self.reference.universe.trajectory[current_frame]

self._ref_coordinates64 = self._ref_coordinates.astype(np.float64)
self._ref_coordinates64 = self._ref_coordinates.astype(np.float64) # type: ignore[assignment]

if self._groupselections_atoms:
# Only carry out a rotation if we want to calculate secondary
Expand All @@ -733,7 +769,7 @@ def _prepare(self):
self._rot = np.zeros(9, dtype=np.float64) # allocate space
self._R = self._rot.reshape(3, 3)
else:
self._rot = None
self._rot = None # type: ignore[assignment]

self.results.rmsd = np.zeros(
(self.n_frames, 3 + len(self._groupselections_atoms))
Expand All @@ -746,7 +782,7 @@ def _prepare(self):
def _get_aggregator(self):
return ResultsGroup(lookup={"rmsd": ResultsGroup.ndarray_vstack})

def _single_frame(self):
def _single_frame(self) -> None:
mobile_com = self.mobile_atoms.center(self.weights_select).astype(
np.float64
)
Expand Down Expand Up @@ -787,6 +823,9 @@ def _single_frame(self):

# 2) calculate secondary RMSDs (without any further
# superposition)
assert isinstance(
self.weights_groupselections, list
) # pragma: no cover
for igroup, (refpos, atoms) in enumerate(
zip(
self._groupselections_ref_coords64,
Expand Down Expand Up @@ -846,7 +885,7 @@ class RMSF(AnalysisBase):
def get_supported_backends(cls):
return ("serial",)

def __init__(self, atomgroup, **kwargs):
def __init__(self, atomgroup: "AtomGroup", **kwargs: Any):
r"""Parameters
----------
atomgroup : AtomGroup
Expand Down Expand Up @@ -969,18 +1008,18 @@ def __init__(self, atomgroup, **kwargs):
super(RMSF, self).__init__(atomgroup.universe.trajectory, **kwargs)
self.atomgroup = atomgroup

def _prepare(self):
def _prepare(self) -> None:
self.sumsquares = np.zeros((self.atomgroup.n_atoms, 3))
self.mean = self.sumsquares.copy()

def _single_frame(self):
def _single_frame(self) -> None:
k = self._frame_index
self.sumsquares += (k / (k + 1.0)) * (
self.atomgroup.positions - self.mean
) ** 2
self.mean = (k * self.mean + self.atomgroup.positions) / (k + 1)

def _conclude(self):
def _conclude(self) -> None:
k = self._frame_index
self.results.rmsf = np.sqrt(self.sumsquares.sum(axis=1) / (k + 1))

Expand Down
20 changes: 20 additions & 0 deletions testsuite/MDAnalysisTests/analysis/test_rms.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,26 @@ def test_rmsd_list_of_weights_wrong_length(self, universe):
weights_groupselections=[None],
)

def test_rmsd_groupselections_with_mass_weights(
self, universe, client_RMSD
):
# Test to ensure line 694 coverage with weights="mass" in groupselections
# This tests the code path where weights="mass" is propagated to
# groupselections when weights_groupselections=False
RMSD = MDAnalysis.analysis.rms.RMSD(
universe,
select="backbone",
groupselections=["name CA", "backbone and resid 1:10"],
weights="mass",
weights_groupselections=False,
).run(step=49, **client_RMSD)

# Just verify it runs without error and produces valid results
assert RMSD.results.rmsd.shape[0] == 2 # 2 frames (0 and 49)
assert (
RMSD.results.rmsd.shape[1] == 5
) # frame, time, rmsd, group1, group2

def test_rmsd_group_selections(
self, universe, correct_values_group, client_RMSD
):
Expand Down
Loading