|
| 1 | +# |
| 2 | +# CEBRA: Consistent EmBeddings of high-dimensional Recordings using Auxiliary variables |
| 3 | +# © Mackenzie W. Mathis & Steffen Schneider (v0.4.0+) |
| 4 | +# Source code: |
| 5 | +# https://github.com/AdaptiveMotorControlLab/CEBRA |
| 6 | +# |
| 7 | +# Please see LICENSE.md for the full license document: |
| 8 | +# https://github.com/AdaptiveMotorControlLab/CEBRA/blob/main/LICENSE.md |
| 9 | +# |
| 10 | +# Adapted from https://github.com/rpatrik96/nl-causal-representations/blob/master/care_nl_ica/dep_mat.py, |
| 11 | +# licensed under the following MIT License: |
| 12 | +# |
| 13 | +# MIT License |
| 14 | +# |
| 15 | +# Copyright (c) 2022 Patrik Reizinger |
| 16 | +# |
| 17 | +# Permission is hereby granted, free of charge, to any person obtaining a copy |
| 18 | +# of this software and associated documentation files (the "Software"), to deal |
| 19 | +# in the Software without restriction, including without limitation the rights |
| 20 | +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 21 | +# copies of the Software, and to permit persons to whom the Software is |
| 22 | +# furnished to do so, subject to the following conditions: |
| 23 | +# |
| 24 | +# The above copyright notice and this permission notice shall be included in all |
| 25 | +# copies or substantial portions of the Software. |
| 26 | +# |
| 27 | +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 28 | +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 29 | +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 30 | +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 31 | +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 32 | +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| 33 | +# SOFTWARE. |
| 34 | +# |
| 35 | + |
| 36 | +from typing import Union |
| 37 | + |
| 38 | +import numpy as np |
| 39 | +import torch |
| 40 | + |
| 41 | + |
| 42 | +def tensors_to_cpu_and_double(vars_: list[torch.Tensor]) -> list[torch.Tensor]: |
| 43 | + """Convert a list of tensors to CPU and double precision. |
| 44 | +
|
| 45 | + Args: |
| 46 | + vars_: List of PyTorch tensors to convert |
| 47 | +
|
| 48 | + Returns: |
| 49 | + List of tensors converted to CPU and double precision |
| 50 | + """ |
| 51 | + cpu_vars = [] |
| 52 | + for v in vars_: |
| 53 | + if v.is_cuda: |
| 54 | + v = v.to("cpu") |
| 55 | + cpu_vars.append(v.double()) |
| 56 | + return cpu_vars |
| 57 | + |
| 58 | + |
| 59 | +def tensors_to_cuda(vars_: list[torch.Tensor], |
| 60 | + cuda_device: str) -> list[torch.Tensor]: |
| 61 | + """Convert a list of tensors to CUDA device. |
| 62 | +
|
| 63 | + Args: |
| 64 | + vars_: List of PyTorch tensors to convert |
| 65 | + cuda_device: CUDA device to move tensors to |
| 66 | +
|
| 67 | + Returns: |
| 68 | + List of tensors moved to specified CUDA device |
| 69 | + """ |
| 70 | + cpu_vars = [] |
| 71 | + for v in vars_: |
| 72 | + if not v.is_cuda: |
| 73 | + v = v.to(cuda_device) |
| 74 | + cpu_vars.append(v) |
| 75 | + return cpu_vars |
| 76 | + |
| 77 | + |
| 78 | +def compute_jacobian( |
| 79 | + model: torch.nn.Module, |
| 80 | + input_vars: list[torch.Tensor], |
| 81 | + mode: str = "autograd", |
| 82 | + cuda_device: str = "cuda", |
| 83 | + double_precision: bool = False, |
| 84 | + convert_to_numpy: bool = True, |
| 85 | + hybrid_solver: bool = False, |
| 86 | +) -> Union[torch.Tensor, np.ndarray]: |
| 87 | + """Compute the Jacobian matrix for a given model and input. |
| 88 | +
|
| 89 | + This function computes the Jacobian matrix using PyTorch's autograd functionality. |
| 90 | + It supports both CPU and CUDA computation, as well as single and double precision. |
| 91 | +
|
| 92 | + Args: |
| 93 | + model: PyTorch model to compute Jacobian for |
| 94 | + input_vars: List of input tensors |
| 95 | + mode: Computation mode, currently only "autograd" is supported |
| 96 | + cuda_device: Device to use for CUDA computation |
| 97 | + double_precision: If True, use double precision |
| 98 | + convert_to_numpy: If True, convert output to numpy array |
| 99 | + hybrid_solver: If True, concatenate multiple outputs along dimension 1 |
| 100 | +
|
| 101 | + Returns: |
| 102 | + Jacobian matrix as either PyTorch tensor or numpy array |
| 103 | + """ |
| 104 | + if double_precision: |
| 105 | + model = model.to("cpu").double() |
| 106 | + input_vars = tensors_to_cpu_and_double(input_vars) |
| 107 | + if hybrid_solver: |
| 108 | + output = model(*input_vars) |
| 109 | + output_vars = torch.cat(output, dim=1).to("cpu").double() |
| 110 | + else: |
| 111 | + output_vars = model(*input_vars).to("cpu").double() |
| 112 | + else: |
| 113 | + model = model.to(cuda_device).float() |
| 114 | + input_vars = tensors_to_cuda(input_vars, cuda_device=cuda_device) |
| 115 | + |
| 116 | + if hybrid_solver: |
| 117 | + output = model(*input_vars) |
| 118 | + output_vars = torch.cat(output, dim=1) |
| 119 | + else: |
| 120 | + output_vars = model(*input_vars) |
| 121 | + |
| 122 | + if mode == "autograd": |
| 123 | + jacob = [] |
| 124 | + for i in range(output_vars.shape[1]): |
| 125 | + grads = torch.autograd.grad( |
| 126 | + output_vars[:, i:i + 1], |
| 127 | + input_vars, |
| 128 | + retain_graph=True, |
| 129 | + create_graph=False, |
| 130 | + grad_outputs=torch.ones(output_vars[:, i:i + 1].shape).to( |
| 131 | + output_vars.device), |
| 132 | + ) |
| 133 | + jacob.append(torch.cat(grads, dim=1)) |
| 134 | + |
| 135 | + jacobian = torch.stack(jacob, dim=1) |
| 136 | + |
| 137 | + jacobian = jacobian.detach().cpu() |
| 138 | + |
| 139 | + if convert_to_numpy: |
| 140 | + jacobian = jacobian.numpy() |
| 141 | + |
| 142 | + return jacobian |
0 commit comments