44from datetime import datetime
55from importlib import reload
66from types import ModuleType
7+ from typing import Any
78
9+ import numpy as np
810from nomad .config import config
911from nomad .datamodel import EntryArchive
1012from nomad .datamodel .metainfo .workflow import Link , TaskReference
1113from nomad .parsing import MatchingParser
1214from nomad .parsing .file_parser import ArchiveWriter
13- from nomad .parsing .file_parser .mapping_parser import MetainfoParser , TextParser
15+ from nomad .parsing .file_parser .mapping_parser import MetainfoParser , Path , TextParser
16+ from nomad .units import ureg
1417from nomad .utils import get_logger
1518from nomad_simulations .schema_packages .general import Program , Simulation
1619from nomad_simulations .schema_packages .workflow import (
2326from nomad_simulation_parsers .parsers .utils .general import search_files
2427from nomad_simulation_parsers .schema_packages .quantumespresso import common
2528
29+ from .common import libxc_shortcut , xc_functional_map
2630from .file_parser import QuantumEspressoFileParser
2731
2832LOGGER = get_logger (__name__ )
@@ -35,15 +39,163 @@ def logger(self):
3539 return LOGGER
3640
3741
42+ class XCFunctionalParser :
43+ @staticmethod
44+ def gen_string (data : dict [str , Any ], separator = '+' ) -> str :
45+ string = ''
46+ for key in sorted (data .keys ()):
47+ val = data [key ]
48+ weight = val .get ('XC_functional_weight' , 1.0 )
49+ if string and weight > 0 :
50+ string += separator
51+ if weight is not None :
52+ string += f'{ weight :.3f} '
53+ string += val .get ('XC_functional_name' , '' )
54+ return string
55+
56+ @staticmethod
57+ def filter_data (data : dict [str , dict [str , Any ]]) -> dict [str , Any ]:
58+ out = dict ()
59+ tol = 0.01
60+ for key , val in data .items ():
61+ val_copy = val .copy ()
62+ weight = val_copy .get ('XC_functional_weight' )
63+ if weight is None or abs (weight ) < tol :
64+ continue
65+ else :
66+ if abs (weight - 1.0 ) < tol :
67+ del val_copy ['XC_functional_weight' ]
68+ val_copy .pop ('exx_compute_weight' , None )
69+ out [key ] = val_copy
70+ return out
71+
72+
3873class MainfileParser (TextParser ):
3974 # TODO temporary fix for structlog unable to propagate logger
4075 @property
4176 def logger (self ):
4277 return LOGGER
4378
44- def get_version (self , name_version : list [str ]):
79+ def get_version (self , name_version : list [str ]) -> str :
4580 return ' ' .join (name_version [1 :]).lstrip ('v.' )
4681
82+ def get_datetime (self , date_time : str ) -> datetime :
83+ return datetime .strptime (date_time .replace (' ' , '' ), '%d%b%Y%H:%M:%S' )
84+
85+ def get_header (self , key : str , default : Any = None ) -> Any :
86+ return self .data .get (key , default )
87+
88+ def get_xc_functionals (self , source : str ) -> list [dict [str , Any ]]:
89+ numbers = source .split ('(' )[1 ].split (')' )[0 ]
90+ nval = (4 , 10 )
91+ # handle different formatting
92+ if len (numbers ) == nval [0 ]:
93+ # 4-digit format without spaces
94+ numbers_split = re .findall (r'(\d)' , numbers )
95+ elif len (numbers ) == nval [1 ]:
96+ # 5-digit format with/without spaces
97+ numbers_split = re .findall (r'[ \d]\d' , numbers )
98+ else :
99+ # 6-digit with spaces
100+ numbers_split = numbers .split ()
101+
102+ if not numbers_split :
103+ self .logger .warning (
104+ 'Unknown XC functional format' , data = dict (value = numbers )
105+ )
106+ return []
107+
108+ numbers_split = [int (n ) for n in numbers_split ]
109+ # numbers should have six digits
110+ numbers_split .extend ([0 ] * (6 - len (numbers_split )))
111+
112+ # map numbers to values
113+ xc_section_method = dict ()
114+ xc_terms = dict ()
115+ xc_terms_remove = dict ()
116+
117+ def get_data (source : list [dict [str , Any ]]) -> dict [str , Any ]:
118+ data = dict ()
119+ exx_fraction = self .get_header ('x_qe_exact_exchange_fraction' , 0.0 )
120+ for term in source :
121+ term_copy = term .copy ()
122+ weight = term_copy .get ('exx_compute_weight' , 1.0 )
123+ term_copy ['XC_functional_weight' ] = (
124+ weight (exx_fraction ) if not isinstance (weight , float ) else weight
125+ )
126+ data .setdefault (term_copy .get ('XC_functional_name' , '' ), term_copy )
127+ return data
128+
129+ for i in range (6 ):
130+ xc_component = xc_functional_map [i ]
131+ xc_number = numbers_split [i ]
132+ if xc_number >= len (xc_component ) or xc_component [xc_number ] is None :
133+ continue
134+ xc_section_method .update (
135+ xc_component [xc_number ].get ('xc_section_method' , {})
136+ )
137+ xc_terms .update (get_data (xc_component [xc_number ].get ('xc_terms' , [])))
138+ xc_terms_remove .update (
139+ get_data (xc_component [xc_number ].get ('xc_terms_remove' , []))
140+ )
141+
142+ # remove terms
143+ for key , val in xc_terms_remove .items ():
144+ weight = val .get ('XC_functional_weight' )
145+ xc_terms .setdefault (key , val )
146+ xc_terms [key ]['XC_functional_weight' ] *= - (weight or - 1.0 )
147+
148+ # filter data
149+ xc_terms = XCFunctionalParser .filter_data (xc_terms )
150+
151+ xc_functional_str = XCFunctionalParser .gen_string (xc_terms )
152+ if xc_functional_str in libxc_shortcut :
153+ # override for libXC compliance
154+ xc_terms = get_data (libxc_shortcut [xc_functional_str ]['xc_terms' ])
155+ xc_terms = XCFunctionalParser .filter_data (xc_terms )
156+ xc_functional_str = XCFunctionalParser .gen_string (xc_terms )
157+ # TODO make use of this
158+ xc_section_method ['XC_functional' ] = xc_functional_str
159+
160+ return [xc_terms [key ] for key in sorted (xc_terms .keys ())]
161+
162+ def get_value (self , source : dict [str , Any ], key : str = '' , units : str = 'units' ):
163+ key_split = key .rsplit ('.' , 1 )
164+ parent = Path (path = key_split [0 ]).get_data (source )
165+ header = self .data .get ('header' , {})
166+ if parent is None :
167+ source = header
168+ parent = self .get_value (header , key_split [0 ], '' )
169+ value = parent if len (key_split ) == 1 else parent .get (key_split [1 ])
170+
171+ if value is None or not units :
172+ return value
173+
174+ units = (source if len (key_split ) == 1 else parent ).get (units , units ).lower ()
175+ alat = source .get ('alat' , header .get ('alat' , 1.0 ))
176+ value = np .array (value , dtype = float )
177+
178+ if units in ['alat' , 'a_0' ]:
179+ value *= alat
180+ elif units in ['bohr' , 'angstrom' ]:
181+ units_mapping = dict (bohr = ureg .bohr , angstrom = ureg .angstrom )
182+ value = value * units_mapping .get (units )
183+ elif units == '2 pi/alat' :
184+ value *= 2 * np .pi / alat
185+ elif units == 'crystal' :
186+ cell = self .get_value (source , 'simulation_cell' , '' )
187+ if cell is not None :
188+ value = (
189+ np .dot (
190+ value .magnitude if hasattr (value , 'magnitude' ) else value ,
191+ cell .magnitude if hasattr (cell , 'magnitude' ) else cell ,
192+ )
193+ * cell .units
194+ if hasattr (cell , 'units' )
195+ else 1.0
196+ )
197+ return value
198+
47199
48200class QuantumEspressoArchiveWriter (ArchiveWriter ):
49201 """
@@ -130,24 +282,21 @@ def parse_workflow(self) -> None:
130282 )
131283
132284 def write_to_archive (self ) -> None :
133- def load_writer (header : str ) -> QuantumEspressoArchiveWriter :
134- if 'pwscf' in header :
135- from .pwscf .parser import PWSCFArchiveWriter
285+ from .epw .parser import EPWArchiveWriter
286+ from .phonon .parser import PhononArchiveWriter
287+ from .pwscf .parser import PWSCFArchiveWriter
288+ from .xspectra .parser import XSpectraArchiveWriter
289+
290+ writers = {
291+ 'pwscf' : PWSCFArchiveWriter (),
292+ 'epw' : EPWArchiveWriter (),
293+ 'phonon' : PhononArchiveWriter (),
294+ 'xspectra' : XSpectraArchiveWriter (),
295+ }
136296
137- return PWSCFArchiveWriter ()
138- if 'epw' in header :
139- from .epw .parser import EPWArchiveWriter
140-
141- return EPWArchiveWriter ()
142- if 'phonon' in header :
143- from .phonon .parser import PhononArchiveWriter
144-
145- return PhononArchiveWriter ()
146- if 'xspectra' in header :
147- from .xspectra .parser import XSpectraArchiveWriter
148-
149- return XSpectraArchiveWriter ()
150- return None
297+ def load_writer (header : str ) -> QuantumEspressoArchiveWriter :
298+ match = re .match (r'Program +(\w+)' , header )
299+ return writers .get (match .group (1 ).lower ()) if match else None
151300
152301 # set up mainfile parser
153302 self .mainfile_parser .filepath = self .mainfile
@@ -158,7 +307,7 @@ def load_writer(header: str) -> QuantumEspressoArchiveWriter:
158307 for n , program in enumerate (
159308 self .mainfile_parser .data_object .get ('program' , [])
160309 ):
161- writer = load_writer (program [:50 ]. lower () )
310+ writer = load_writer (program [:30 ] )
162311 if writer is None :
163312 self .logger .error ('Parser not found for program.' )
164313 continue
0 commit comments