|
| 1 | +# (C) Copyright 2025 Anemoi contributors. |
| 2 | +# |
| 3 | +# This software is licensed under the terms of the Apache Licence Version 2.0 |
| 4 | +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. |
| 5 | +# |
| 6 | +# In applying this licence, ECMWF does not waive the privileges and immunities |
| 7 | +# granted to it by virtue of its status as an intergovernmental organisation |
| 8 | +# nor does it submit to any jurisdiction. |
| 9 | + |
| 10 | + |
| 11 | +from abc import ABC |
| 12 | +from abc import abstractmethod |
| 13 | +from typing import Optional |
| 14 | + |
| 15 | +import einops |
| 16 | +import torch |
| 17 | +from torch import nn |
| 18 | +from torch_geometric.data import HeteroData |
| 19 | + |
| 20 | +from anemoi.models.distributed.graph import gather_channels |
| 21 | +from anemoi.models.distributed.graph import shard_channels |
| 22 | +from anemoi.models.distributed.shapes import apply_shard_shapes |
| 23 | +from anemoi.models.layers.sparse_projector import build_sparse_projector |
| 24 | + |
| 25 | + |
| 26 | +class BaseResidualConnection(nn.Module, ABC): |
| 27 | + """Base class for residual connection modules.""" |
| 28 | + |
| 29 | + def __init__(self, graph: HeteroData | None = None) -> None: |
| 30 | + super().__init__() |
| 31 | + |
| 32 | + @abstractmethod |
| 33 | + def forward(self, x: torch.Tensor, grid_shard_shapes=None, model_comm_group=None) -> torch.Tensor: |
| 34 | + """Define the residual connection operation. |
| 35 | +
|
| 36 | + Should be overridden by subclasses. |
| 37 | + """ |
| 38 | + pass |
| 39 | + |
| 40 | + |
| 41 | +class SkipConnection(BaseResidualConnection): |
| 42 | + """Skip connection module |
| 43 | +
|
| 44 | + This layer returns the most recent timestep from the input sequence. |
| 45 | +
|
| 46 | + This module is used to bypass processing layers and directly pass the latest input forward. |
| 47 | + """ |
| 48 | + |
| 49 | + def __init__(self, step: int = -1, **_) -> None: |
| 50 | + super().__init__() |
| 51 | + self.step = step |
| 52 | + |
| 53 | + def forward(self, x: torch.Tensor, grid_shard_shapes=None, model_comm_group=None) -> torch.Tensor: |
| 54 | + """Return the last timestep of the input sequence.""" |
| 55 | + return x[:, self.step, ...] # x shape: (batch, time, ens, nodes, features) |
| 56 | + |
| 57 | + |
| 58 | +class TruncatedConnection(BaseResidualConnection): |
| 59 | + """Truncated skip connection |
| 60 | +
|
| 61 | + This connection applies a coarse-graining and reconstruction of input features using sparse |
| 62 | + projections to truncate high frequency features. |
| 63 | +
|
| 64 | + This module uses two projection operators: one to map features from the full-resolution |
| 65 | + grid to a truncated (coarse) grid, and another to project back to the original resolution. |
| 66 | +
|
| 67 | + Parameters |
| 68 | + ---------- |
| 69 | + graph : HeteroData, optional |
| 70 | + The graph containing the subgraphs for down and up projections. |
| 71 | + data_nodes : str, optional |
| 72 | + Name of the nodes representing the data nodes. |
| 73 | + truncation_nodes : str, optional |
| 74 | + Name of the nodes representing the truncated (coarse) nodes. |
| 75 | + edge_weight_attribute : str, optional |
| 76 | + Name of the edge attribute to use as weights for the projections. |
| 77 | + src_node_weight_attribute : str, optional |
| 78 | + Name of the source node attribute to use as weights for the projections. |
| 79 | + autocast : bool, default False |
| 80 | + Whether to use automatic mixed precision for the projections. |
| 81 | + truncation_up_file_path : str, optional |
| 82 | + File path (.npz) to load the up-projection matrix from. |
| 83 | + truncation_down_file_path : str, optional |
| 84 | + File path (.npz) to load the down-projection matrix from. |
| 85 | +
|
| 86 | + Example |
| 87 | + ------- |
| 88 | + >>> from torch_geometric.data import HeteroData |
| 89 | + >>> import torch |
| 90 | + >>> # Assume graph is a HeteroData object with the required edges and node types |
| 91 | + >>> graph = HeteroData() |
| 92 | + >>> # ...populate graph with nodes and edges for 'data' and 'int'... |
| 93 | + >>> # Example creating the projection matrices from the graph |
| 94 | + >>> conn = TruncatedConnection( |
| 95 | + ... graph=graph, |
| 96 | + ... data_nodes="data", |
| 97 | + ... truncation_nodes="int", |
| 98 | + ... edge_weight_attribute="gauss_weight", |
| 99 | + ... ) |
| 100 | + >>> x = torch.randn(2, 4, 1, 40192, 44) # (batch, time, ens, nodes, features) |
| 101 | + >>> out = conn(x) |
| 102 | + >>> print(out.shape) |
| 103 | + torch.Size([2, 4, 1, 40192, 44]) |
| 104 | +
|
| 105 | + >>> # Example specifying .npz files for projection matrices |
| 106 | + >>> conn = TruncatedConnection( |
| 107 | + ... truncation_down_file_path="n320_to_o96.npz", |
| 108 | + ... truncation_up_file_path="o96_to_n320.npz", |
| 109 | + ... ) |
| 110 | + >>> x = torch.randn(2, 4, 1, 40192, 44) |
| 111 | + >>> out = conn(x) |
| 112 | + >>> print(out.shape) |
| 113 | + torch.Size([2, 4, 1, 40192, 44]) |
| 114 | + """ |
| 115 | + |
| 116 | + def __init__( |
| 117 | + self, |
| 118 | + graph: Optional[HeteroData] = None, |
| 119 | + data_nodes: Optional[str] = None, |
| 120 | + truncation_nodes: Optional[str] = None, |
| 121 | + edge_weight_attribute: Optional[str] = None, |
| 122 | + src_node_weight_attribute: Optional[str] = None, |
| 123 | + truncation_up_file_path: Optional[str] = None, |
| 124 | + truncation_down_file_path: Optional[str] = None, |
| 125 | + autocast: bool = False, |
| 126 | + ) -> None: |
| 127 | + super().__init__() |
| 128 | + up_edges, down_edges = self._get_edges_name( |
| 129 | + graph, |
| 130 | + data_nodes, |
| 131 | + truncation_nodes, |
| 132 | + truncation_up_file_path, |
| 133 | + truncation_down_file_path, |
| 134 | + edge_weight_attribute, |
| 135 | + ) |
| 136 | + |
| 137 | + self.project_down = build_sparse_projector( |
| 138 | + graph=graph, |
| 139 | + edges_name=down_edges, |
| 140 | + edge_weight_attribute=edge_weight_attribute, |
| 141 | + src_node_weight_attribute=src_node_weight_attribute, |
| 142 | + file_path=truncation_down_file_path, |
| 143 | + autocast=autocast, |
| 144 | + ) |
| 145 | + |
| 146 | + self.project_up = build_sparse_projector( |
| 147 | + graph=graph, |
| 148 | + edges_name=up_edges, |
| 149 | + edge_weight_attribute=edge_weight_attribute, |
| 150 | + src_node_weight_attribute=src_node_weight_attribute, |
| 151 | + file_path=truncation_up_file_path, |
| 152 | + autocast=autocast, |
| 153 | + ) |
| 154 | + |
| 155 | + def _get_edges_name( |
| 156 | + self, |
| 157 | + graph, |
| 158 | + data_nodes, |
| 159 | + truncation_nodes, |
| 160 | + truncation_up_file_path, |
| 161 | + truncation_down_file_path, |
| 162 | + edge_weight_attribute, |
| 163 | + ): |
| 164 | + are_files_specified = truncation_up_file_path is not None and truncation_down_file_path is not None |
| 165 | + if not are_files_specified: |
| 166 | + assert graph is not None, "graph must be provided if file paths are not specified." |
| 167 | + assert data_nodes is not None, "data nodes name must be provided if file paths are not specified." |
| 168 | + assert ( |
| 169 | + truncation_nodes is not None |
| 170 | + ), "truncation nodes name must be provided if file paths are not specified." |
| 171 | + up_edges = (truncation_nodes, "to", data_nodes) |
| 172 | + down_edges = (data_nodes, "to", truncation_nodes) |
| 173 | + assert up_edges in graph.edge_types, f"Graph must contain edges {up_edges} for up-projection." |
| 174 | + assert down_edges in graph.edge_types, f"Graph must contain edges {down_edges} for down-projection." |
| 175 | + else: |
| 176 | + assert ( |
| 177 | + data_nodes is None or truncation_nodes is None or edge_weight_attribute is None |
| 178 | + ), "If file paths are specified, node and attribute names should not be provided." |
| 179 | + up_edges = down_edges = None # Not used when loading from files |
| 180 | + return up_edges, down_edges |
| 181 | + |
| 182 | + def forward(self, x: torch.Tensor, grid_shard_shapes=None, model_comm_group=None) -> torch.Tensor: |
| 183 | + """Apply truncated skip connection.""" |
| 184 | + batch_size = x.shape[0] |
| 185 | + x = x[:, -1, ...] # pick latest step |
| 186 | + shard_shapes = apply_shard_shapes(x, 0, grid_shard_shapes) if grid_shard_shapes is not None else None |
| 187 | + |
| 188 | + x = einops.rearrange(x, "batch ensemble grid features -> (batch ensemble) grid features") |
| 189 | + x = self._to_channel_shards(x, shard_shapes, model_comm_group) |
| 190 | + x = self.project_down(x) |
| 191 | + x = self.project_up(x) |
| 192 | + x = self._to_grid_shards(x, shard_shapes, model_comm_group) |
| 193 | + x = einops.rearrange(x, "(batch ensemble) grid features -> batch ensemble grid features", batch=batch_size) |
| 194 | + |
| 195 | + return x |
| 196 | + |
| 197 | + def _to_channel_shards(self, x, shard_shapes=None, model_comm_group=None): |
| 198 | + return self._reshard(x, shard_channels, shard_shapes, model_comm_group) |
| 199 | + |
| 200 | + def _to_grid_shards(self, x, shard_shapes=None, model_comm_group=None): |
| 201 | + return self._reshard(x, gather_channels, shard_shapes, model_comm_group) |
| 202 | + |
| 203 | + def _reshard(self, x, fn, shard_shapes=None, model_comm_group=None): |
| 204 | + if shard_shapes is not None: |
| 205 | + x = fn(x, shard_shapes, model_comm_group) |
| 206 | + return x |
0 commit comments