diff --git a/src/nomad_simulations/schema_packages/numerical_settings.py b/src/nomad_simulations/schema_packages/numerical_settings.py index 84dad867e..0e33f8413 100644 --- a/src/nomad_simulations/schema_packages/numerical_settings.py +++ b/src/nomad_simulations/schema_packages/numerical_settings.py @@ -331,6 +331,15 @@ class KMesh(Mesh): """, ) + symmetrized_points = Quantity( + type=np.float64, + shape=['*', 3], + description=""" + List of the mesh points after applying symmetry operations in units of the `reciprocal_lattice_vectors`. This quantity + is typically smaller than `all_points` and contains only the unique points in the Brillouin zone. + """, + ) + high_symmetry_points = Quantity( type=JSON, description=""" @@ -785,11 +794,6 @@ class KSpace(NumericalSettings): k_line_path = SubSection(sub_section=KLinePath.m_def) - def __init__(self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs): - super().__init__(m_def, m_context, **kwargs) - # Set the name of the section - self.name = self.m_def.name - def resolve_reciprocal_lattice_vectors( self, model_systems: list[ModelSystem], logger: 'BoundLogger' ) -> Optional[pint.Quantity]: diff --git a/src/nomad_simulations/schema_packages/outputs.py b/src/nomad_simulations/schema_packages/outputs.py index abc8570be..166b27359 100644 --- a/src/nomad_simulations/schema_packages/outputs.py +++ b/src/nomad_simulations/schema_packages/outputs.py @@ -1,7 +1,7 @@ from typing import TYPE_CHECKING, Optional +from nomad_simulations.schema_packages.utils.electronic import bandstructure_to_bandgap, bandstructure_to_dos import numpy as np -from nomad.datamodel.data import ArchiveSection from nomad.metainfo import Quantity, SubSection if TYPE_CHECKING: @@ -27,7 +27,6 @@ HoppingMatrix, HybridizationFunction, KineticEnergy, - Occupancy, Permittivity, PotentialEnergy, QuasiparticleWeight, @@ -78,23 +77,23 @@ class Outputs(Time): hopping_matrices = SubSection(sub_section=HoppingMatrix.m_def, repeats=True) - electronic_eigenvalues = SubSection( - sub_section=ElectronicEigenvalues.m_def, repeats=True + electronic_band_structures = SubSection( + sub_section=ElectronicBandStructure.m_def, # repeats=True ) - electronic_band_gaps = SubSection(sub_section=ElectronicBandGap.m_def, repeats=True) + electronic_eigenvalues = SubSection( + sub_section=ElectronicEigenvalues.m_def, repeats=True + ) # TODO: @EBB2675 should we remove these? electronic_dos = SubSection( - sub_section=ElectronicDensityOfStates.m_def, repeats=True + sub_section=ElectronicDensityOfStates.m_def, # repeats=True ) - fermi_surfaces = SubSection(sub_section=FermiSurface.m_def, repeats=True) - - electronic_band_structures = SubSection( - sub_section=ElectronicBandStructure.m_def, repeats=True + electronic_band_gaps = SubSection( + sub_section=ElectronicBandGap.m_def, # repeats=True ) - occupancies = SubSection(sub_section=Occupancy.m_def, repeats=True) + fermi_surfaces = SubSection(sub_section=FermiSurface.m_def, repeats=True) electronic_greens_functions = SubSection( sub_section=ElectronicGreensFunction.m_def, repeats=True @@ -197,6 +196,16 @@ def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: if self.model_method_ref is None: self.model_method_ref = self.set_model_method_ref() + # build up derived electronic structures + if not self.electronic_band_gaps: + self.electronic_band_gaps = bandstructure_to_bandgap(self.electronic_band_structures) + if self.electronic_band_gaps: + self.electronic_band_gaps.normalize(archive, logger) + if not self.electronic_dos: + self.electronic_dos = bandstructure_to_dos(self.electronic_band_structures) + if self.electronic_dos: + self.electronic_dos.normalize(archive, logger) + class SCFOutputs(Outputs): """ diff --git a/src/nomad_simulations/schema_packages/physical_property.py b/src/nomad_simulations/schema_packages/physical_property.py index 3d56820f8..31722d800 100644 --- a/src/nomad_simulations/schema_packages/physical_property.py +++ b/src/nomad_simulations/schema_packages/physical_property.py @@ -1,11 +1,11 @@ from functools import wraps -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING import numpy as np from nomad import utils -from nomad.datamodel.data import ArchiveSection +from nomad.datamodel.metainfo.plot import PlotSection from nomad.datamodel.metainfo.basesections.v2 import Entity -from nomad.metainfo import URL, MEnum, Quantity, Reference, SectionProxy, SubSection +from nomad.metainfo import URL, MEnum, Quantity, Reference, SectionProxy if TYPE_CHECKING: from nomad.datamodel.datamodel import EntryArchive @@ -57,7 +57,7 @@ def wrapper(self, *args, **kwargs): return decorator -class PhysicalProperty(ArchiveSection): +class PhysicalProperty(PlotSection): """ A base section used to define the physical properties obtained in a simulation, experiment, or in a post-processing analysis. The main quantity of the `PhysicalProperty` is `value`, whose instantiation has to be overwritten in the derived classes @@ -77,6 +77,7 @@ class PhysicalProperty(ArchiveSection): iri = Quantity( type=URL, + default='', description=""" Internationalized Resource Identifier (IRI) of the physical property defined in the FAIRmat taxonomy, https://fairmat-nfdi.github.io/fairmat-taxonomy/. @@ -223,12 +224,25 @@ def _is_derived(self) -> bool: Returns: (bool): The flag indicating whether the physical property is derived or not. """ - if self.physical_property_ref is not None: - return True - return False + return self.physical_property_ref is not None + + def plot(self, *args, **kwargs) -> list: + """ + Placeholder for a method to plot the physical property. This method should be overridden in derived classes + to provide specific plotting functionality. + + Returns: + (list): A list of figures (`PlotlyFigure`) representing the physical property. + """ + return [] - def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + def normalize(self, *args, **kwargs) -> None: self.is_derived = self._is_derived() + super(PlotSection, self).normalize(*args, **kwargs) + if (plot_figures := self.plot(*args, **kwargs)): + self.figures.extend(plot_figures) + if self.m_def.name is not None: + self.name = self.m_def.name class PropertyContribution(PhysicalProperty): diff --git a/src/nomad_simulations/schema_packages/properties/__init__.py b/src/nomad_simulations/schema_packages/properties/__init__.py index 69470a4ff..af6f29507 100644 --- a/src/nomad_simulations/schema_packages/properties/__init__.py +++ b/src/nomad_simulations/schema_packages/properties/__init__.py @@ -19,7 +19,6 @@ from .permittivity import Permittivity from .spectral_profile import ( AbsorptionSpectrum, - DOSProfile, ElectronicDensityOfStates, SpectralProfile, XASSpectrum, diff --git a/src/nomad_simulations/schema_packages/properties/band_gap.py b/src/nomad_simulations/schema_packages/properties/band_gap.py index 5e97acf0c..511f4ae05 100644 --- a/src/nomad_simulations/schema_packages/properties/band_gap.py +++ b/src/nomad_simulations/schema_packages/properties/band_gap.py @@ -1,12 +1,7 @@ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING import numpy as np -from nomad.metainfo import MEnum, Quantity - -if TYPE_CHECKING: - from nomad.datamodel.datamodel import EntryArchive - from nomad.metainfo import Context, Section - from structlog.stdlib import BoundLogger +from nomad.metainfo import Quantity from nomad_simulations.schema_packages.data_types import positive_float from nomad_simulations.schema_packages.physical_property import PhysicalProperty @@ -19,87 +14,47 @@ class ElectronicBandGap(PhysicalProperty): iri = 'http://fairmat-nfdi.eu/taxonomy/ElectronicBandGap' - type = Quantity( - type=MEnum('direct', 'indirect'), + value = Quantity( + type=positive_float(), + unit='joule', description=""" - Type categorization of the electronic band gap. This quantity is directly related with `momentum_transfer` as by - definition, the electronic band gap is `'direct'` for zero momentum transfer (or if `momentum_transfer` is `None`) and `'indirect'` - for finite momentum transfer. + The value of the electronic band gap. This value must be positive. + `None` indicates that no band gap could be determined, e.g., if the system is metallic. """, ) momentum_transfer = Quantity( type=np.float64, - shape=[2, 3], + unit='1/meter', description=""" - If the electronic band gap is `'indirect'`, the reciprocal momentum transfer for which the band gap is defined - in units of the `reciprocal_lattice_vectors`. The initial and final momentum 3D vectors are given in the first - and second element. Example, the momentum transfer in bulk Si2 happens between the Γ and the (approximately) - X points in the Brillouin zone; thus: - `momentum_transfer = [[0, 0, 0], [0.5, 0.5, 0]]`. - - Note: this quantity only refers to scalar `value`, not to arrays of `value`. + The length of the difference in reciprocal space between the initial and final momentum transfer + along which the electronic band gap is defined. + + Example, the momentum transfer in bulk Si2 happens between the Γ and (approximately) X points, thus: + `momentum_transfer = ||[0, 0, 0] - [0.5, 0.5, 0]|| ~= 0.612`. """, ) - spin_channel = Quantity( - type=np.int32, - description=""" - Spin channel of the corresponding electronic band gap. It can take values of 0 or 1. - """, - ) - - value = Quantity( - type=positive_float(), - unit='joule', - description=""" - The value of the electronic band gap. This value must be positive. - """, - ) - - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) - self.name = self.m_def.name - - def resolve_type(self, logger: 'BoundLogger') -> Optional[str]: + # TODO: give it a place + def extract_band_gap(self) -> 'ElectronicBandGap | None': """ - Resolves the `type` of the electronic band gap based on the stored `momentum_transfer` values. - - Args: - logger (BoundLogger): The logger to log messages. + Extract the electronic band gap from the `highest_occupied_energy` and `lowest_unoccupied_energy` stored + in `m_cache` from `resolve_energies_origin()`. If the difference of `highest_occupied_energy` and + `lowest_unoccupied_energy` is negative, the band gap `value` is set to 0.0. Returns: - (Optional[str]): The resolved `type` of the electronic band gap. + (Optional[ElectronicBandGap]): The extracted electronic band gap section to be stored in `Outputs`. """ - mtr = self.momentum_transfer if self.momentum_transfer is not None else [] - - # Check if the `momentum_transfer` is [], and return the type and a warning in the log for `indirect` band gaps - if len(mtr) == 0: - if self.type == 'indirect': - logger.warning( - 'The `momentum_transfer` is not stored for an `indirect` band gap.' - ) - return self.type - - # Check if the `momentum_transfer` has at least two elements, and return None if it does not - if len(mtr) == 1: - logger.warning( - 'The `momentum_transfer` should have at least two elements so that the difference can be calculated and the type of electronic band gap can be resolved.' - ) - return None - - # Resolve `type` from the difference between the initial and final momentum transfer - momentum_difference = np.diff(mtr, axis=0) - if (np.isclose(momentum_difference, np.zeros(3))).all(): - return 'direct' - else: - return 'indirect' - - def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: - super().normalize(archive, logger) - - # Resolve the `type` of the electronic band gap from `momentum_transfer`, ONLY for scalar `value` - if self.value is not None: - self.type = self.resolve_type(logger) + band_gap = None + homo = self.m_cache.get('highest_occupied_energy') + lumo = self.m_cache.get('lowest_unoccupied_energy') + if homo and lumo: + band_gap = ElectronicBandGap() + band_gap.is_derived = True + band_gap.physical_property_ref = self + + if (homo - lumo).magnitude < 0: + band_gap.value = 0.0 + else: + band_gap.value = homo - lumo + return band_gap diff --git a/src/nomad_simulations/schema_packages/properties/band_structure.py b/src/nomad_simulations/schema_packages/properties/band_structure.py index 30c81955c..35f39c402 100644 --- a/src/nomad_simulations/schema_packages/properties/band_structure.py +++ b/src/nomad_simulations/schema_packages/properties/band_structure.py @@ -1,26 +1,21 @@ -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING import numpy as np import pint from nomad.config import config from nomad.metainfo import Quantity, SubSection -from nomad_simulations.schema_packages.variables import KLinePath +from nomad_simulations.schema_packages.numerical_settings import KMesh if TYPE_CHECKING: from nomad.datamodel.datamodel import EntryArchive - from nomad.metainfo import Context, Section from structlog.stdlib import BoundLogger from nomad_simulations.schema_packages.atoms_state import AtomsState, OrbitalsState +from nomad_simulations.schema_packages.data_types import unit_float from nomad_simulations.schema_packages.numerical_settings import KSpace -from nomad_simulations.schema_packages.physical_property import ( - PhysicalProperty, - validate_quantity_wrt_value, -) -from nomad_simulations.schema_packages.properties.band_gap import ElectronicBandGap -from nomad_simulations.schema_packages.properties.fermi_surface import FermiSurface -from nomad_simulations.schema_packages.utils import get_sibling_section +from nomad_simulations.schema_packages.physical_property import PhysicalProperty +from nomad_simulations.schema_packages.utils.utils import check_not_none, inner_copy configuration = config.get_plugin_entry_point( 'nomad_simulations.schema_packages:nomad_simulations_plugin' @@ -32,23 +27,12 @@ class BaseElectronicEigenvalues(PhysicalProperty): A base section used to define basic quantities for the `ElectronicEigenvalues` and `ElectronicBandStructure` properties. """ - iri = '' - n_bands = Quantity( type=np.int32, description=""" Number of bands / eigenvalues. """, - ) - - value = Quantity( - type=np.float64, - unit='joule', - shape=['*', '*'], - description=""" - Value of the electronic eigenvalues. - """, - ) + ) # TODO: remove class ElectronicEigenvalues(BaseElectronicEigenvalues): @@ -56,47 +40,47 @@ class ElectronicEigenvalues(BaseElectronicEigenvalues): iri = 'http://fairmat-nfdi.eu/taxonomy/ElectronicEigenvalues' - spin_channel = Quantity( - type=np.int32, + # TODO: add spin annotation from @EBB2675 + + value = Quantity( + type=np.float64, + unit='joule', + shape=['spin', 'level'], description=""" - Spin channel of the corresponding electronic eigenvalues. It can take values of 0 or 1. + Value of the electronic eigenvalues. + Dimensions: [spin channel, energy level]. """, ) occupation = Quantity( - type=np.float64, - shape=['*', 'n_bands'], + type=unit_float(dtype=np.float64), + shape=['spin', 'level'], description=""" - Occupation of the electronic eigenvalues. This is a number depending whether the `spin_channel` has been set or not. - If `spin_channel` is set, then this number is between 0 and 1, where 0 means that the state is unoccupied and 1 means - that the state is fully occupied; if `spin_channel` is not set, then this number is between 0 and 2. The shape of - this quantity is defined as `[K.n_points, K.dimensionality, n_bands]`, where `K` is a `variable` which can - be `KMesh` or `KLinePath`, depending whether the simulation mapped the whole Brillouin zone or just a specific - path. + Occupation of the electronic eigenvalues, ranging from 0 to 1. + Dimensions: [spin channel, energy level]. """, - ) + ) # restructure spin for plotting? highest_occupied = Quantity( type=np.float64, + shape=['spin'], unit='joule', description=""" - Highest occupied electronic eigenvalue. Together with `lowest_unoccupied`, it defines the - electronic band gap. + Highest occupied electronic eigenvalue for each spin channel. Together with `lowest_unoccupied`, it defines the + electronic band gap. Automatically resolved using binary search on sorted eigenvalues. """, ) lowest_unoccupied = Quantity( type=np.float64, + shape=['spin'], unit='joule', description=""" - Lowest unoccupied electronic eigenvalue. Together with `highest_occupied`, it defines the - electronic band gap. + Lowest unoccupied electronic eigenvalue for each spin channel. Together with `highest_occupied`, it defines the + electronic band gap. Automatically resolved using binary search on sorted eigenvalues. """, ) - # ? Should we add functionalities to handle min/max of the `value` in some specific cases, .e.g, bands around the Fermi level, - # ? core bands separated by gaps, and equivalently, higher-energy valence bands separated by gaps? - value_contributions = SubSection( sub_section=BaseElectronicEigenvalues.m_def, repeats=True, @@ -111,152 +95,152 @@ class ElectronicEigenvalues(BaseElectronicEigenvalues): """, ) - reciprocal_cell = Quantity( - type=KSpace.reciprocal_lattice_vectors, - description=""" - Reference to the reciprocal lattice vectors stored under `KSpace`. - """, - ) + @check_not_none('self.value', 'self.occupation') + def resolve_homo_lumo(self) -> None: + """ + Resolve HOMO and LUMO eigenvalues using binary search on sorted eigenvalues. + """ - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) - self.name = self.m_def.name + def process_spin_channel(data: np.ndarray) -> list: + """Process a single spin channel to find HOMO/LUMO.""" + mid = int(len(data) / 2) # ? extra check + values, occupations = data[:mid], data[mid:] + lumo_region = np.where(occupations <= 1e-6) + + if lumo_region[0].size > 0: + lumo_idx = np.min(lumo_region) + if lumo_idx > 0: + return [values[lumo_idx - 1], values[lumo_idx]] # [HOMO, LUMO] + else: + return [np.nan, values[lumo_idx]] + else: + return [np.nan, np.nan] - @validate_quantity_wrt_value(name='occupation') - def order_eigenvalues(self) -> Union[bool, tuple[pint.Quantity, np.ndarray]]: - """ - Order the eigenvalues based on the `value` and `occupation`. The return `value` and - `occupation` are flattened. + # Stack value and occupation arrays along last axis for apply_along_axis + combined_data = np.stack([self.value.magnitude, self.occupation], axis=-1) + results = np.apply_along_axis(process_spin_channel, axis=0, arr=combined_data.T) - Returns: - (Union[bool, tuple[pint.Quantity, np.ndarray]]): The flattened and sorted `value` and `occupation`. If validation - fails, then it returns `False`. + self.highest_occupied = results[:, 0] * self.value.u + self.lowest_unoccupied = results[:, 1] * self.value.u + + def pad_out(self) -> None: """ - total_shape = np.prod(self.value.shape) + Pad out the value and occupation arrays along the spin channel dimension. + """ + spin_index = 0 # Spin is now first dimension + if ( + np.array(self.value).shape[spin_index] == 1 + ): # TODO: add model_method spin_polarized + self.value = inner_copy(self.value, 0) # TODO: dynamically set repetition + if ( + np.array(self.occupation).shape[spin_index] == 1 + ): # TODO: add model_method spin_polarized + self.occupation = inner_copy( + self.occupation, 0 + ) # TODO: dynamically set repetition - # Order the indices in the flattened list of `value` - flattened_value = self.value.reshape(total_shape) - flattened_occupation = self.occupation.reshape(total_shape) - sorted_indices = np.argsort(flattened_value, axis=0) + def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + super().normalize(archive, logger) + self.pad_out() + self.resolve_homo_lumo() - sorted_value = ( - np.take_along_axis(flattened_value.magnitude, sorted_indices, axis=0) - * flattened_value.u - ) - sorted_occupation = np.take_along_axis( - flattened_occupation, sorted_indices, axis=0 - ) - self.m_cache['sorted_eigenvalues'] = True - return sorted_value, sorted_occupation - def resolve_homo_lumo_eigenvalues( - self, - ) -> tuple[Optional[pint.Quantity], Optional[pint.Quantity]]: - """ - Resolve the `highest_occupied` and `lowest_unoccupied` eigenvalues by performing a binary search on the - flattened and sorted `value` and `occupation`. If these quantities already exist, overwrite them or return - them if it is not possible to resolve from `value` and `occupation`. +class ElectronicBandStructure(BaseElectronicEigenvalues): + """ + Accessible energies by the charges (electrons and holes) in the reciprocal space. + """ - Returns: - (tuple[Optional[pint.Quantity], Optional[pint.Quantity]]): The `highest_occupied` and - `lowest_unoccupied` eigenvalues. - """ - # Sorting `value` and `occupation` - if not self.order_eigenvalues(): # validation fails - if self.highest_occupied is not None and self.lowest_unoccupied is not None: - return self.highest_occupied, self.lowest_unoccupied - return None, None - sorted_value, sorted_occupation = self.order_eigenvalues() - sorted_value_unit = sorted_value.u - sorted_value = sorted_value.magnitude - - # Binary search ot find the transition point between `occupation = 2` and `occupation = 0` - homo = self.highest_occupied - lumo = self.lowest_unoccupied - mid = ( - np.searchsorted( - sorted_occupation <= configuration.occupation_tolerance, True - ) - - 1 - ) - if mid >= 0 and mid < len(sorted_occupation) - 1: - if sorted_occupation[mid] > 0 and ( - sorted_occupation[mid + 1] >= -configuration.occupation_tolerance - and sorted_occupation[mid + 1] <= configuration.occupation_tolerance - ): - homo = sorted_value[mid] * sorted_value_unit - lumo = sorted_value[mid + 1] * sorted_value_unit + iri = 'http://fairmat-nfdi.eu/taxonomy/ElectronicBandStructure' - return homo, lumo + kpoint = SubSection(sub_section=KMesh.m_def) - def extract_band_gap(self) -> Optional[ElectronicBandGap]: - """ - Extract the electronic band gap from the `highest_occupied` and `lowest_unoccupied` eigenvalues. - If the difference of `highest_occupied` and `lowest_unoccupied` is negative, the band gap `value` is set to 0.0. + value = Quantity( + type=np.float64, + unit='joule', + shape=['spin', 'kpoint', 'level'], + description=""" + Value of the electronic eigenvalues in the reciprocal space. + Dimensions: [spin channel, k-point, energy level]. + """, + ) - Returns: - (Optional[ElectronicBandGap]): The extracted electronic band gap section to be stored in `Outputs`. + occupation = Quantity( + type=unit_float(dtype=np.float64), + shape=['spin', 'kpoint', 'level'], + description=""" + Occupation of the electronic eigenvalues, ranging from 0 to 1. + Dimensions: [spin channel, k-point, energy level]. + """, + ) + + highest_occupied = Quantity( + type=np.float64, + shape=['spin', 'kpoint'], + unit='joule', + description=""" + Highest occupied electronic eigenvalue for each k-point and spin channel. Together with `lowest_unoccupied`, it defines the + electronic band gap. Automatically resolved using binary search on sorted eigenvalues. + """, + ) + + lowest_unoccupied = Quantity( + type=np.float64, + shape=['spin', 'kpoint'], + unit='joule', + description=""" + Lowest unoccupied electronic eigenvalue for each k-point and spin channel. Together with `highest_occupied`, it defines the + electronic band gap. Automatically resolved using binary search on sorted eigenvalues. + """, + ) + + @check_not_none('self.value', 'self.occupation') + def resolve_homo_lumo(self) -> None: + """ + Resolve HOMO and LUMO eigenvalues using binary search on sorted eigenvalues for band structure. """ - band_gap = None - homo, lumo = self.resolve_homo_lumo_eigenvalues() - if homo and lumo: - band_gap = ElectronicBandGap(is_derived=True, physical_property_ref=self) - if (lumo - homo).magnitude < 0: - band_gap.value = 0.0 + def process_spin_kpoint(data: np.ndarray) -> list: + """Process a single k-point and spin channel to find HOMO/LUMO.""" + mid = int(len(data) / 2) # ? extra check + values, occupations = data[:mid], data[mid:] + lumo_region = np.where(occupations <= 1e-6) + + if lumo_region[0].size > 0: + lumo_idx = np.min(lumo_region) + if lumo_idx > 0: + return [values[lumo_idx - 1], values[lumo_idx]] # [HOMO, LUMO] + else: + return [np.nan, values[lumo_idx]] else: - band_gap.value = lumo - homo - return band_gap + return [np.nan, np.nan] - # TODO fix this method once `FermiSurface` property is implemented - def extract_fermi_surface(self, logger: 'BoundLogger') -> Optional[FermiSurface]: - """ - Extract the Fermi surface for metal systems and using the `FermiLevel.value`. + n_spins, n_kpoints, n_levels = self.value.shape - Args: - logger (BoundLogger): The logger to log messages. + # Stack along last axis to get [n_spins, n_kpoints, n_levels, 2] + combined_data = np.stack([self.value.magnitude, self.occupation], axis=2) + reshaped_data = combined_data.reshape(n_spins * n_kpoints, n_levels * 2) - Returns: - (Optional[FermiSurface]): The extracted Fermi surface section to be stored in `Outputs`. - """ - # Check if the system has a finite band gap - homo, lumo = self.resolve_homo_lumo_eigenvalues() - if (homo and lumo) and (lumo - homo).magnitude > 0: - return None + results = np.apply_along_axis(process_spin_kpoint, axis=1, arr=reshaped_data) + results = results.reshape(n_spins, n_kpoints, 2) - # Get the `fermi_level.value` - fermi_level = get_sibling_section( - section=self, sibling_section_name='fermi_level', logger=logger - ) - if fermi_level is None: - logger.warning( - 'Could not extract the `FermiSurface`, because `FermiLevel` is not stored.' - ) - return None - fermi_level_value = fermi_level.value.magnitude - - # Extract values close to the `fermi_level.value` - fermi_indices = np.logical_and( - self.value.magnitude - >= (fermi_level_value - configuration.fermi_surface_tolerance), - self.value.magnitude - <= (fermi_level_value + configuration.fermi_surface_tolerance), - ) - fermi_values = self.value[fermi_indices] - - # Store `FermiSurface` values - # ! This is wrong (!) the `value` should be the `KMesh.points`, not the `ElectronicEigenvalues.value` - fermi_surface = FermiSurface( - n_bands=self.n_bands, - is_derived=True, - physical_property_ref=self, - ) - fermi_surface.value = fermi_values - return fermi_surface + self.highest_occupied = results[:, :, 0] * self.value.u + self.lowest_unoccupied = results[:, :, 1] * self.value.u - def resolve_reciprocal_cell(self) -> Optional[pint.Quantity]: + def pad_out(self) -> None: + """ + Pad out the value and occupation arrays along the spin channel dimension. + """ + spin_index = 0 + if self.value.shape[spin_index] == 1: # TODO: add model_method spin_polarized + self.value = inner_copy(self.value, 0) # TODO: dynamically set repetition + if ( + self.occupation.shape[spin_index] == 1 + ): # TODO: add model_method spin_polarized + self.occupation = inner_copy( + self.occupation, 0 + ) # TODO: dynamically set repetition + + def resolve_reciprocal_cell(self) -> pint.Quantity | None: # ? remove """ Resolve the reciprocal cell from the `KSpace` numerical settings section. @@ -268,54 +252,42 @@ def resolve_reciprocal_cell(self) -> Optional[pint.Quantity]: ) if numerical_settings is None: return None - k_space = None + for setting in numerical_settings: if isinstance(setting, KSpace): - k_space = setting - break - if k_space is None: - return None - return k_space - - def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: - super().normalize(archive, logger) + return setting - # Resolve `highest_occupied` and `lowest_unoccupied` eigenvalues - self.highest_occupied, self.lowest_unoccupied = ( - self.resolve_homo_lumo_eigenvalues() + def resolve_kpoints_from_kspace(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + """ + Search for KSpace in archive.data.model_methods[..].numerical_settings[..] + and set it to ElectronicBandStructure.kpoints using xpath. + """ + # Search for KSpace objects in all model_methods numerical_settings + kspace_objects = archive.m_xpath( + 'data.model_method[*].numerical_settings[*].k_mesh', dict=False ) + + if kspace_objects is None: + logger.warning( + 'No KSpace object found in numerical_settings, cannot resolve `ElectronicBandStructure.kpoint`.' + ) + return + elif len(kspace_objects) > 1: + logger.warning( + 'Multiple KSpace objects found in numerical_settings, cannot resolve `ElectronicBandStructure.kpoint`.' + ) + return - # `ElectronicBandGap` extraction - band_gap = self.extract_band_gap() - if band_gap is not None: - self.m_parent.electronic_band_gaps.append(band_gap) - - # TODO uncomment once `FermiSurface` property is implemented - # `FermiSurface` extraction - # fermi_surface = self.extract_fermi_surface(logger) - # if fermi_surface is not None: - # self.m_parent.fermi_surfaces.append(fermi_surface) - - # Resolve `reciprocal_cell` from the `KSpace` numerical settings section - self.reciprocal_cell = self.resolve_reciprocal_cell() - - -class ElectronicBandStructure(ElectronicEigenvalues): - """ - Accessible energies by the charges (electrons and holes) in the reciprocal space. - """ - - iri = 'http://fairmat-nfdi.eu/taxonomy/ElectronicBandStructure' - - k_path = SubSection(sub_section=KLinePath.m_def) + self.kpoint = kspace_objects[0][0][0] - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) - self.name = self.m_def.name + def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + super().normalize(archive, logger) + self.pad_out() + self.resolve_homo_lumo() + self.resolve_kpoints_from_kspace(archive, logger) +# defunct class Occupancy(PhysicalProperty): """ Electrons occupancy of an atom per orbital and spin. This is a number defined between 0 and 1 for @@ -356,11 +328,3 @@ class Occupancy(PhysicalProperty): fully occupied; if `spin_channel` is not set, then this number is between 0 and 2. """, ) - - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) - self.name = self.m_def.name - - # TODO add extraction from `ElectronicEigenvalues.occupation` diff --git a/src/nomad_simulations/schema_packages/properties/spectral_profile.py b/src/nomad_simulations/schema_packages/properties/spectral_profile.py index 8e0510fdb..f1e5787c3 100644 --- a/src/nomad_simulations/schema_packages/properties/spectral_profile.py +++ b/src/nomad_simulations/schema_packages/properties/spectral_profile.py @@ -1,9 +1,12 @@ from typing import TYPE_CHECKING, Optional import numpy as np -import pint +import plotly.express as px +import plotly.graph_objects as go from nomad.config import config +from nomad.datamodel.metainfo.plot import PlotlyFigure from nomad.metainfo import MEnum, Quantity, SubSection +from nomad.units import ureg if TYPE_CHECKING: from nomad.datamodel.datamodel import EntryArchive @@ -13,8 +16,8 @@ from nomad_simulations.schema_packages.atoms_state import AtomsState, OrbitalsState from nomad_simulations.schema_packages.data_types import positive_float from nomad_simulations.schema_packages.physical_property import PhysicalProperty -from nomad_simulations.schema_packages.properties.band_gap import ElectronicBandGap from nomad_simulations.schema_packages.utils import get_sibling_section +from nomad_simulations.schema_packages.utils.utils import check_not_none, inner_copy from nomad_simulations.schema_packages.variables import Energy2 as Energy configuration = config.get_plugin_entry_point( @@ -27,48 +30,70 @@ class SpectralProfile(PhysicalProperty): A base section used to define the spectral profile. """ - value = Quantity( - type=positive_float(), + energies = Quantity( + type=np.float64, + unit='joule', shape=['*'], description=""" - The value of the intensities of a spectral profile. Must be positive. + Energy sampling. """, - ) # TODO check units and normalization_factor of DOS and Spectra and see whether they can be merged - - energies = SubSection(sub_section=Energy.m_def) - - frequencies = SubSection(sub_section=Energy.m_def) + ) - def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: - super().normalize(archive, logger) + frequencies = Quantity( + type=np.float64, + unit='hertz', + shape=['*'], + description=""" + Frequency sampling. + """, + ) -class DOSProfile(SpectralProfile): +class ElectronicDensityOfStates(SpectralProfile): """ A base section used to define the `value` of the `ElectronicDensityOfState` property. This is useful when containing contributions for `projected_dos` with the correct unit. """ + iri = 'http://fairmat-nfdi.eu/taxonomy/ElectronicDensityOfStates' + value = Quantity( - type=positive_float(), + type=positive_float(dtype=np.float64), unit='1/joule', - shape=['*'], + shape=['spin', '*'], description=""" - The value of the electronic DOS. Must be positive. + The value of the electronic DOS. """, ) - energies = SubSection( - sub_section=Energy.m_def, + highest_occupied = Quantity( + type=np.float64, + shape=['spin', 'kpoint'], + unit='joule', description=""" - Energy grid points of the projected electronic DOS. + Highest occupied electronic density per spin channel. Together with `lowest_unoccupied`, it defines the + electronic band gap. Automatically resolved using binary search on sorted eigenvalues. """, ) - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) + lowest_unoccupied = Quantity( + type=np.float64, + shape=['spin', 'kpoint'], + unit='joule', + description=""" + Lowest unoccupied electronic density per spin channel. Together with `highest_occupied`, it defines the + electronic band gap. Automatically resolved using binary search on sorted eigenvalues. + """, + ) + + normalization_factor = Quantity( + type=np.float64, + description=""" + Normalization factor for electronic DOS, converting from an extensive to an intensive quantity. + The intensive DOS is defined as the integral from the lowest (most negative) energy to the highest occupied energy + for a neutral system (i.e., the sum of `AtomsState.charge` is zero). + """, + ) # This requires knowing the units of the parsed DOS def resolve_pdos_name(self, logger: 'BoundLogger') -> Optional[str]: """ @@ -120,199 +145,6 @@ def resolve_pdos_name(self, logger: 'BoundLogger') -> Optional[str]: ) return None - def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: - super().normalize(archive, logger) - - # We resolve - self.name = self.resolve_pdos_name(logger) - - -class ElectronicDensityOfStates(DOSProfile): - """ - Number of electronic states accessible for the charges per energy and per volume. - """ - - iri = 'http://fairmat-nfdi.eu/taxonomy/ElectronicDensityOfStates' - - spin_channel = Quantity( - type=np.int32, - description=""" - Spin channel of the corresponding electronic DOS. It can take values of 0 or 1. - """, - ) - - # TODO clarify the role of `energies_origin` once `ElectronicEigenvalues` is implemented - energies_origin = Quantity( - type=np.float64, - unit='joule', - description=""" - Energy level denoting the origin along the energy axis, used for comparison and visualization. It is - defined as the `ElectronicEigenvalues.highest_occupied_energy`. - """, - ) - - normalization_factor = Quantity( - type=np.float64, - description=""" - Normalization factor for electronic DOS to get a cell-independent intensive DOS. The cell-independent - intensive DOS is as the integral from the lowest (most negative) energy to the Fermi level for a neutrally - charged system (i.e., the sum of `AtomsState.charge` is zero). - """, - ) - - # ? Do we want to store the integrated value here os as part of an nomad-analysis tool? Check `dos_integrator.py` module in dos normalizer repository - # value_integrated = Quantity( - # type=np.float64, - # description=""" - # The cumulative intensities integrated from from the lowest (most negative) energy to the Fermi level. - # """, - # ) - - energies = SubSection( - sub_section=Energy.m_def, - description=""" - Energy grid points of the electronic DOS. - """, - ) # ? convert to `Quantity` - - projected_dos = SubSection( - sub_section=DOSProfile.m_def, - repeats=True, - description=""" - Projected DOS. It can be atom- (different elements in the unit cell) or orbital-projected. These can be calculated in a cascade as: - - If the total DOS is not present, we sum all atom-projected DOS to obtain it. - - If the atom-projected DOS is not present, we sum all orbital-projected DOS to obtain it. - Note: the cover given by summing up contributions is not perfect, and will depend on the projection functions used. - - In `projected_dos`, `name` and `entity_ref` must be set in order for normalization to work: - - The `entity_ref` is the `OrbitalsState` or `AtomsState` sections. - - The `name` of the projected DOS should be `'atom X'` or `'orbital Y X'`, with 'X' being the chemical symbol and 'Y' the orbital label. - These can be extracted from `entity_ref`. - """, - ) - - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) - self.name = self.m_def.name - - def resolve_energies_origin( - self, - energies_points: pint.Quantity, - fermi_level: Optional[pint.Quantity], - logger: 'BoundLogger', - ) -> Optional[pint.Quantity]: - """ - Resolve the origin of reference for the energies from the sibling `ElectronicEigenvalues` section and its - `highest_occupied` level, or if this does not exist, from the `fermi_level` value as extracted from the sibling property, `FermiLevel`. - - Args: - fermi_level (Optional[pint.Quantity]): The resolved Fermi level. - energies_points (pint.Quantity): The grid points of the `Energy` variable. - logger (BoundLogger): The logger to log messages. - - Returns: - (Optional[pint.Quantity]): The resolved origin of reference for the energies. - """ - - # Extract the `ElectronicEigenvalues` section to get the `highest_occupied` and `lowest_unoccupied` energies - # TODO implement once `ElectronicEigenvalues` is in the schema - eigenvalues = get_sibling_section( - section=self, sibling_section_name='electronic_eigenvalues', logger=logger - ) # we consider `index_sibling` to be 0 - highest_occupied_energy = ( - eigenvalues.highest_occupied if eigenvalues is not None else None - ) - lowest_unoccupied_energy = ( - eigenvalues.lowest_unoccupied if eigenvalues is not None else None - ) - # and set defaults for `highest_occupied_energy` and `lowest_unoccupied_energy` in `m_cache` - if highest_occupied_energy is not None: - self.m_cache['highest_occupied_energy'] = highest_occupied_energy - if lowest_unoccupied_energy is not None: - self.m_cache['lowest_unoccupied_energy'] = lowest_unoccupied_energy - - # Check that the closest `energies` to the energy reference is not too far away. - # If it is very far away, normalization may be very inaccurate and we do not report it. - dos_values = self.value.magnitude - eref = highest_occupied_energy if fermi_level is None else fermi_level - fermi_idx = (np.abs(energies_points - eref)).argmin() - fermi_energy_closest = energies_points[fermi_idx] - distance = np.abs(fermi_energy_closest - eref) - single_peak_fermi = False - if distance.magnitude <= configuration.dos_energy_tolerance: - # See if there are zero values close below the energy reference. - idx = fermi_idx - idx_descend = fermi_idx - while True: - try: - value = dos_values[idx] - energy_distance = np.abs(eref - energies_points[idx]) - except IndexError: - break - if energy_distance.magnitude > configuration.dos_energy_tolerance: - break - if value <= configuration.dos_intensities_threshold: - idx_descend = idx - break - idx -= 1 - - # See if there are zero values close above the fermi energy. - idx = fermi_idx - idx_ascend = fermi_idx - while True: - try: - value = dos_values[idx] - energy_distance = np.abs(eref - energies_points[idx]) - except IndexError: - break - if energy_distance.magnitude > configuration.dos_energy_tolerance: - break - if value <= configuration.dos_intensities_threshold: - idx_ascend = idx - break - idx += 1 - - # If there is a single peak at fermi energy, no - # search needs to be performed. - if idx_ascend != fermi_idx and idx_descend != fermi_idx: - self.m_cache['highest_occupied_energy'] = fermi_energy_closest - self.m_cache['lowest_unoccupied_energy'] = fermi_energy_closest - single_peak_fermi = True - - if not single_peak_fermi: - # Look for highest occupied energy below the descend index - idx = idx_descend - while True: - try: - value = dos_values[idx] - except IndexError: - break - if value > configuration.dos_intensities_threshold: - idx = idx if idx == idx_descend else idx + 1 - self.m_cache['highest_occupied_energy'] = energies_points[idx] - break - idx -= 1 - # Look for lowest unoccupied energy above idx_ascend - idx = idx_ascend - while True: - try: - value = dos_values[idx] - except IndexError: - break - if value > configuration.dos_intensities_threshold: - idx = idx if idx == idx_ascend else idx - 1 - self.m_cache['highest_occupied_energy'] = energies_points[idx] - break - idx += 1 - - # Return the `highest_occupied_energy` as the `energies_origin`, or the `fermi_level` if it is not None - energies_origin = self.m_cache.get('highest_occupied_energy') - if energies_origin is None: - energies_origin = fermi_level - return energies_origin - def resolve_normalization_factor(self, logger: 'BoundLogger') -> Optional[float]: """ Resolve the `normalization_factor` for the electronic DOS to get a cell-independent intensive DOS. @@ -342,41 +174,11 @@ def resolve_normalization_factor(self, logger: 'BoundLogger') -> Optional[float] ) return None - atomic_numbers = [atom.atomic_number for atom in model_system.particle_states] - - # Compute normalization_factor. If spin_channel is set, assume spin-polarized system. - if self.spin_channel is not None: - normalization_factor = 1 / (2 * sum(atomic_numbers)) - else: - normalization_factor = 1 / sum(atomic_numbers) - return normalization_factor - - def extract_band_gap(self) -> Optional[ElectronicBandGap]: - """ - Extract the electronic band gap from the `highest_occupied_energy` and `lowest_unoccupied_energy` stored - in `m_cache` from `resolve_energies_origin()`. If the difference of `highest_occupied_energy` and - `lowest_unoccupied_energy` is negative, the band gap `value` is set to 0.0. - - Returns: - (Optional[ElectronicBandGap]): The extracted electronic band gap section to be stored in `Outputs`. - """ - band_gap = None - homo = self.m_cache.get('highest_occupied_energy') - lumo = self.m_cache.get('lowest_unoccupied_energy') - if homo and lumo: - band_gap = ElectronicBandGap() - band_gap.is_derived = True - band_gap.physical_property_ref = self - - if (homo - lumo).magnitude < 0: - band_gap.value = 0.0 - else: - band_gap.value = homo - lumo - return band_gap + return 1 / sum([atom.atomic_number for atom in model_system.particle_states]) def extract_projected_dos( self, type: str, logger: 'BoundLogger' - ) -> list[Optional[DOSProfile]]: + ) -> 'list[ElectronicDensityOfStates | None]': """ Extract the projected DOS from the `projected_dos` section and the specified `type`. @@ -384,7 +186,7 @@ def extract_projected_dos( type (str): The type of the projected DOS to extract. It can be `'atom'` or `'orbital'`. Returns: - (DOSProfile): The extracted projected DOS. + (ElectronicDensityOfStates): The extracted projected DOS. """ extracted_pdos = [] for pdos in self.projected_dos: @@ -402,97 +204,73 @@ def extract_projected_dos( extracted_pdos.append(pdos) return extracted_pdos - def generate_from_projected_dos( - self, logger: 'BoundLogger' - ) -> Optional[pint.Quantity]: + @check_not_none('self.value') + def pad_out(self) -> None: """ - Generate the total `value` of the electronic DOS from the `projected_dos` contributions. If the `projected_dos` - is not present, it returns `None`. - - Args: - logger (BoundLogger): The logger to log messages. + Pad out the value and energies arrays along the spin channel dimension. + """ + spin_index = 0 # Spin is first dimension + if np.array(self.value).shape[spin_index] == 1: + self.value = inner_copy(self.value, 0) + @check_not_none('self.value', 'self.energies') + def plot(self, *args, **kwargs) -> list['PlotlyFigure']: + """ + Plot the electronic density of states. + Returns: - (Optional[pint.Quantity]): The total `value` of the electronic DOS. + list['PlotlyFigure']: A list containing the DOS plot figure(s). """ - if self.projected_dos is None or len(self.projected_dos) == 0: - return None - - # We distinguish between orbital and atom `projected_dos` - orbital_projected = self.extract_projected_dos('orbital', logger) - atom_projected = self.extract_projected_dos('atom', logger) - - # if we only have orbital entries, build atom entries - orbital_projected = self.extract_projected_dos('orbital', logger) - atom_projected = self.extract_projected_dos('atom', logger) - - if not atom_projected: - # group orbitals by their AtomsState - atom_orbital_map: dict[AtomsState, list[DOSProfile]] = {} - for orb in orbital_projected: - parent = getattr(orb.entity_ref, 'm_parent', None) - if isinstance(parent, AtomsState): - atom_orbital_map.setdefault(parent, []).append(orb) - - for atom_state, orbs in atom_orbital_map.items(): - # sum their values - vals = [o.value.magnitude for o in orbs] - unit = orbs[0].value.u - pd = DOSProfile( - entity_ref=atom_state, - energies=self.energies, - ) - pd.name = f'atom {atom_state.chemical_symbol}' - pd.value = np.sum(vals, axis=0) * unit - atom_projected.append(pd) - - # now store the full projected list - self.projected_dos = orbital_projected + atom_projected - - # finally compute or reuse self.value - if self.value is None: - vals = [pd.value.magnitude for pd in atom_projected] - unit = atom_projected[0].value.u - self.value = np.sum(vals, axis=0) * unit + fig = go.Figure() + energies_ev = self.energies.to('eV').magnitude + dos_values = self.value.to('1/eV').magnitude + + # Spin up (positive) + fig.add_trace(go.Scatter( + x=energies_ev, + y=dos_values[0], + mode='lines', + name='Spin up', + line=dict(color='blue') + )) + + # Spin down (negative for traditional representation) + fig.add_trace(go.Scatter( + x=energies_ev, + y=-dos_values[1], + mode='lines', + name='Spin down', + line=dict(color='red') + )) + + fig.update_layout( + title=f"Electronic Density of States{' - ' + self.name if self.name else ''}", + xaxis_title='Energy (eV)', + yaxis_title='DOS (states/eV)', + showlegend=True, + hovermode='x' + ) - return self.value + # Add horizontal and vertical line at y=0 + fig.add_hline(y=0, line_dash="dash", line_color="gray", opacity=0.5) + + fig.add_vline(x=max(self.highest_occupied), line_dash="dash", line_color="gray", annotation_text="E_F") + + return [ + PlotlyFigure( + label=f'DOS{" - " + self.name if self.name else ""}', + figure=fig.to_plotly_json() + ) + ] def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: - super().normalize(archive, logger) - - # Initial check to see whether `energies` is defined - if self.energies is None: - return - - # Resolve `fermi_level` from a sibling section with respect to `ElectronicDensityOfStates` - fermi_level = get_sibling_section( - section=self, sibling_section_name='fermi_level', logger=logger - ) # * we consider `index_sibling` to be 0 - if fermi_level is not None: - fermi_level = fermi_level.value - # and the `energies_origin` from the sibling `ElectronicEigenvalues` section - self.energies_origin = self.resolve_energies_origin( - self.energies.points, fermi_level, logger - ) - if self.energies_origin is None: - logger.info('Could not resolve the `energies_origin` for the DOS') + # self.name = self.resolve_pdos_name(logger) + self.pad_out() - # Resolve `normalization_factor` if self.normalization_factor is None: self.normalization_factor = self.resolve_normalization_factor(logger) - # `ElectronicBandGap` extraction - band_gap = self.extract_band_gap() - if band_gap is not None: - self.m_parent.electronic_band_gap.append(band_gap) - - # Total `value` extraction from `projected_dos` - value_from_pdos = self.generate_from_projected_dos(logger) - if self.value is None and value_from_pdos is not None: - logger.info( - 'The `ElectronicDensityOfStates.value` is not stored. We will attempt to obtain it by summing up projected DOS contributions, if these are present.' - ) - self.value = value_from_pdos + super().normalize(archive, logger) class AbsorptionSpectrum(SpectralProfile): @@ -506,13 +284,6 @@ class AbsorptionSpectrum(SpectralProfile): """, ) - def __init__( - self, m_def: 'Section' = None, m_context: 'Context' = None, **kwargs - ) -> None: - super().__init__(m_def, m_context, **kwargs) - # Set the name of the section - self.name = self.m_def.name - class XASSpectrum(AbsorptionSpectrum): """ @@ -545,26 +316,26 @@ def __init__( def generate_from_contributions(self, logger: 'BoundLogger') -> None: """ Generate the `value` of the XAS spectrum by concatenating the XANES and EXAFS contributions. It also concatenates - the `Energy` grid points of the XANES and EXAFS parts. + the `Energy` grid of the XANES and EXAFS parts. Args: logger (BoundLogger): The logger to log messages. """ # TODO check if this method is general enough if self.xanes_spectrum is not None and self.exafs_spectrum is not None: - # Concatenate XANE and EXAFS `Energy` grid points + # Concatenate XANE and EXAFS `Energy` grid xanes_variables = self.xanes_spectrum.energies exafs_variables = self.exafs_spectrum.energies if len(xanes_variables) == 0 or len(exafs_variables) == 0: logger.warning( - 'Could not extract the `Energy` grid points from XANES or EXAFS.' + 'Could not extract the `Energy` grid from XANES or EXAFS.' ) return - xanes_energies = xanes_variables.points - exafs_energies = exafs_variables.points + xanes_energies = xanes_variables + exafs_energies = exafs_variables if xanes_energies.max() > exafs_energies.min(): logger.warning( - 'The XANES `Energy` grid points are not below the EXAFS `Energy` grid points.' + 'The XANES `Energy` grid is not below the EXAFS `Energy` grid.' ) return self.energies = Energy( diff --git a/src/nomad_simulations/schema_packages/utils/electronic.py b/src/nomad_simulations/schema_packages/utils/electronic.py new file mode 100644 index 000000000..1a5b2eec2 --- /dev/null +++ b/src/nomad_simulations/schema_packages/utils/electronic.py @@ -0,0 +1,165 @@ +""" +Electronic structure utility functions. +""" + +import numpy as np +from scipy.stats import gaussian_kde +from pymatgen.electronic_structure.core import Spin +from nomad_simulations.schema_packages.properties import ( + ElectronicBandStructure, + ElectronicDensityOfStates, + ElectronicBandGap, +) +from nomad_simulations.schema_packages.utils.utils import check_not_none +from nomad.units import ureg + + +@check_not_none('input.bandstructure.highest_occupied', 'input.bandstructure.lowest_unoccupied') +def bandstructure_to_bandgap( + bandstructure: 'ElectronicBandStructure', +) -> 'ElectronicBandGap | None': + """ + Convert an `ElectronicBandStructure` to an `ElectronicBandGap` + based on the highest occupied and lowest unoccupied energies within the k-space. + """ + band_gap = ElectronicBandGap(is_derived=True) + homo_k, lumo_k = None, None + + homo_idx = np.unravel_index(np.argmax(bandstructure.highest_occupied), bandstructure.highest_occupied.shape) + homo = bandstructure.highest_occupied[homo_idx] + if (hasattr(bandstructure, 'kpoint') and bandstructure.kpoint is not None and + hasattr(bandstructure.kpoint, 'all_points') and bandstructure.kpoint.all_points is not None): + homo_k = bandstructure.kpoint.all_points[homo_idx[-1]] + + lumo_idx = np.unravel_index(np.argmin(bandstructure.lowest_unoccupied), bandstructure.lowest_unoccupied.shape) + lumo = bandstructure.lowest_unoccupied[lumo_idx] + if (hasattr(bandstructure, 'kpoint') and bandstructure.kpoint is not None and + hasattr(bandstructure.kpoint, 'all_points') and bandstructure.kpoint.all_points is not None): + lumo_k = bandstructure.kpoint.all_points[lumo_idx[-1]] + + band_gap.value = lumo - homo + if homo_k is not None and lumo_k is not None: + band_gap.momentum_transfer = np.linalg.norm(lumo_k - homo_k) + + return band_gap + +@check_not_none('input.bandstructure.value', 'input.bandstructure.occupation') +def bandstructure_to_dos( + bandstructure: 'ElectronicBandStructure', + energy_bins: int = None, + sigma: float = 0.05, +) -> 'ElectronicDensityOfStates': + """ + Convert an `ElectronicBandStructure` to an `ElectronicDensityOfStates` using pymatgen's + Gaussian smearing for smooth DOS curves. + + Args: + bandstructure: The electronic band structure to convert. + energy_bins: Number of energy bins. If None, calculated dynamically. + sigma: Gaussian smearing width in eV for DOS broadening. + + Returns: + An `ElectronicDensityOfStates` object derived from the band structure. + """ + dos = ElectronicDensityOfStates(is_derived=True) + n_spins = bandstructure.value.shape[0] + all_energies = bandstructure.value.magnitude.flatten() + e_min, e_max = np.min(all_energies), np.max(all_energies) + + # Calculate dynamic energy bins if not provided + if energy_bins is None: + energy_range = e_max - e_min + n_bands = bandstructure.value.shape[1] if len(bandstructure.value.shape) > 1 else 1 + n_kpoints = bandstructure.value.shape[2] if len(bandstructure.value.shape) > 2 else 1 + + # Base bins on energy range with minimum resolution of sigma/4 + min_bins = int(energy_range / (sigma / 4)) + + # Scale with data density: more data points = more bins + data_factor = np.sqrt(n_bands * n_kpoints) + data_bins = int(data_factor * 50) # 50 bins per sqrt(data_points) + + # Use the larger of the two, but cap at reasonable limits + energy_bins = max(min_bins, data_bins) + energy_bins = min(max(energy_bins, 500), 5000) # Between 500-5000 bins + + energies = np.linspace(e_min, e_max, energy_bins) + + # Create smooth DOS using kernel density estimation + dos_dict = {} + for spin_idx in range(n_spins): + spin = Spin.up if spin_idx == 0 else Spin.down + energies_spin = bandstructure.value.magnitude[spin_idx].flatten() + occupations_spin = bandstructure.occupation[spin_idx].flatten() + + # Only use occupied states for KDE + occupied_mask = occupations_spin > 0.01 # Small threshold to avoid numerical issues + if np.sum(occupied_mask) > 1: # Need at least 2 points for KDE + occupied_energies = energies_spin[occupied_mask] + occupied_weights = occupations_spin[occupied_mask] + + # Use weighted KDE - repeat energies based on occupation + weighted_energies = [] + # Scale weights to have better dynamic range + max_weight = np.max(occupied_weights) + for e, w in zip(occupied_energies, occupied_weights): + # Nonlinear scaling to emphasize variations in occupation + normalized_w = w / max_weight + n_reps = max(1, int(normalized_w ** 0.7 * 300)) # Power scaling + higher base + weighted_energies.extend([e] * n_reps) + + if len(weighted_energies) > 1: + # Create KDE and evaluate on energy grid + kde = gaussian_kde(weighted_energies) + + # Smart bandwidth selection based on data characteristics + energy_range = e_max - e_min + data_density = len(weighted_energies) / energy_range + + # Calculate local energy spacing to assess clustering + sorted_energies = np.sort(occupied_energies) + energy_spacings = np.diff(sorted_energies) + median_spacing = np.median(energy_spacings) if len(energy_spacings) > 0 else energy_range / len(occupied_energies) + + # Advanced adaptive bandwidth for better substructure preservation + base_bandwidth = kde.factor + + # Data density factor: logarithmic scaling for better dynamic range + log_density = np.log10(data_density + 1) + density_factor = np.clip(0.3 + log_density / 5, 0.15, 1.8) + + # Spacing factor: more sensitive to local structure + relative_spacing = median_spacing / (energy_range / 200) # Finer resolution + spacing_factor = np.clip(relative_spacing ** 0.8, 0.2, 1.2) # Nonlinear response + + # Size factor: reduced influence for better detail preservation + size_factor = (len(weighted_energies) / 2000) ** 0.08 # Much gentler scaling + + # Combine factors with emphasis on preserving structure + adaptive_bandwidth = base_bandwidth * density_factor * spacing_factor * size_factor + kde.set_bandwidth(adaptive_bandwidth) + + dos_values = kde(energies) + + # Normalize to conserve total electron count + total_electrons = np.sum(occupations_spin) + dos_values = dos_values * total_electrons / np.trapz(dos_values, energies) + + dos_dict[spin] = dos_values + else: + dos_dict[spin] = np.zeros(energy_bins) + else: + dos_dict[spin] = np.zeros(energy_bins) + + # Use the energies and densities directly from KDE + dos.energies = energies + + if n_spins == 1: + dos.value = np.array([dos_dict[Spin.up]]) + else: + dos.value = np.array([ + dos_dict[Spin.up], + dos_dict[Spin.down] + ]) + + return dos diff --git a/src/nomad_simulations/schema_packages/utils/utils.py b/src/nomad_simulations/schema_packages/utils/utils.py index 3e36d1336..2bed6c2fe 100644 --- a/src/nomad_simulations/schema_packages/utils/utils.py +++ b/src/nomad_simulations/schema_packages/utils/utils.py @@ -147,3 +147,87 @@ def wrapper(self, other) -> bool: return False return wrapper + + +def check_not_none(*attributes: str) -> 'Callable': + """ + Decorator that checks if specified object or class attributes are not `None`. + Returns `None` if any of the specified attributes are `None`, otherwise executes the function. + + Args: + *attributes: Names of attributes to check for None values + Use 'input.' for input attributes + Use 'self.' for object attributes + Use 'class.' for class attributes + Use '' for global attributes + """ + + def decorator(func: 'Callable') -> 'Callable': + import inspect + + def wrapper(*args, **kwargs): + # Get function signature to map arguments by name + sig = inspect.signature(func) + bound_args = sig.bind(*args, **kwargs) + bound_args.apply_defaults() + + for attr in attributes: + # Parse attribute path + if attr.startswith('input.'): + param_name, attr_name = attr[6:].split('.', 1) if '.' in attr[6:] else ('', attr[6:]) + if param_name: + # Specific parameter name given + source = bound_args.arguments.get(param_name) + else: + # Default to first parameter for backward compatibility + source = list(bound_args.arguments.values())[0] if bound_args.arguments else None + elif attr.startswith('self.'): + attr_name = attr[5:] + source = bound_args.arguments.get('self') + elif attr.startswith('class.'): + attr_name = attr[6:] + self_obj = bound_args.arguments.get('self') + source = self_obj.__class__ if self_obj else None + else: + attr_name = attr + source = globals() + + if source is None or not hasattr(source, attr_name) or getattr(source, attr_name) is None: + return None + + return func(*args, **kwargs) + + return wrapper + + return decorator + +def inner_copy( + tensor: np.ndarray, rank_selection: int | tuple[int] | slice, repeat: int = 0 +) -> np.ndarray: + """ + Take a chunk of a high-ranked array and extend it with exact copies of the selection. + + This function selects a portion of a tensor along its first axis and repeats it + the specified number of times, effectively extending the tensor. + + Args: + tensor: Input `numpy` array to copy from + rank_selection: `int`, `tuple`, `slice` specifying which elements to select + repeat: Number of times to repeat the selection. Counting starts from 0 (default: 0) + + Example: + >>> arr = np.array([[1, 2], [3, 4], [5, 6]]) + >>> inner_copy(arr, slice(0, None), repeat=2) + array([[1, 2], [1, 2], [1, 2]]) + """ + if tensor.size == 0: + return tensor + + selected_chunk = tensor[rank_selection] + + # If selection results in 1D array, ensure it maintains proper shape + if selected_chunk.ndim == tensor.ndim - 1: + selected_chunk = np.expand_dims(selected_chunk, axis=0) + + repeated_chunks = np.tile(selected_chunk, (repeat + 1, *([1] * (tensor.ndim - 1)))) + return np.concatenate([tensor, repeated_chunks], axis=0) diff --git a/src/nomad_simulations/schema_packages/workflow/phonon.py b/src/nomad_simulations/schema_packages/workflow/phonon.py index d143bf86a..eb8f1a54b 100644 --- a/src/nomad_simulations/schema_packages/workflow/phonon.py +++ b/src/nomad_simulations/schema_packages/workflow/phonon.py @@ -3,8 +3,6 @@ from nomad.metainfo import Quantity from structlog.stdlib import BoundLogger -from nomad_simulations.schema_packages.properties.spectral_profile import DOSProfile - from .general import ( INCORRECT_N_TASKS, SimulationWorkflow,