Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ jobs:
runs-on: ubuntu-24.04
env:
UV_FROZEN: 1
ARTISATOMIC_TESTMODE: 1
steps:
- name: Checkout Code
uses: actions/checkout@v4
Expand Down Expand Up @@ -108,11 +109,12 @@ jobs:
tests:
strategy:
matrix:
testname: [cmfgen, floers25, jplt, qub]
testname: [cmfgen, floers25, jplt, kurucz, qub]
fail-fast: false
runs-on: ubuntu-24.04
env:
UV_FROZEN: 1
ARTISATOMIC_TESTMODE: 1
timeout-minutes: 45
name: test ${{ matrix.testname }}

Expand Down Expand Up @@ -162,11 +164,6 @@ jobs:
working-directory: atomic-data-floers25/
run: tar -xJvf testdata.tar.xz

- name: Extract QUB test data
if: matrix.testname == 'qub'
working-directory: atomic-data-qub/
run: rsync -av co_tyndall_test_sample/ co_tyndall/

- name: Generate artis atomic data files
run: |
cp tests/${{ matrix.testname }}/artisatomicionhandlers.json .
Expand Down
301 changes: 227 additions & 74 deletions artisatomic/readkuruczdata.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,259 @@
from collections import defaultdict
from collections import namedtuple
import os
import re
from pathlib import Path

import artisatomic
import numpy as np
import polars as pl

hc_in_ev_cm = 0.0001239841984332003
import artisatomic

kuruczdatapath = Path(__file__).parent.absolute() / ".." / "atomic-data-kurucz"
if os.environ.get("ARTISATOMIC_TESTMODE") == "1":
kuruczdatapath = kuruczdatapath / "test_sample"


def find_gfall(atomic_number: int, ion_charge: int) -> Path:
path_gfall = (kuruczdatapath / "extendedatoms" / f"gf{atomic_number:02d}{ion_charge:02d}.lines").resolve()

if not path_gfall.is_file():
path_gfall = (kuruczdatapath / "extendedatoms" / f"gf{atomic_number:02d}{ion_charge:02d}z.lines").resolve()

if not path_gfall.is_file():
path_gfall = (kuruczdatapath / "zztar" / f"gf{atomic_number:02d}{ion_charge:02d}.all").resolve()

if not path_gfall.is_file():
raise FileNotFoundError(f"No Kurucz file for Z={atomic_number} ion_charge {ion_charge}")

return path_gfall


def get_levelname(row) -> str:
return f"{row.label},enpercm={row.energy},j={row.j}"


def read_levels_data(dflevels):
energy_level_tuple = namedtuple("energylevel", "levelname energyabovegsinpercm g parity")

energy_levels = []
def parse_gfall(fname: str) -> pl.LazyFrame:
# Code derived from the GFALL reader of carsus
# https://github.com/tardis-sn/carsus/blob/master/carsus/io/kurucz/gfall.py
gfall_fortran_format = (
"F11.4,F7.3,F6.2,F12.3,F5.2,1X,A10,F12.3,F5.2,1X,"
"A10,F6.2,F6.2,F6.2,A4,I2,I2,I3,F6.3,I3,F6.3,I5,I5,"
"1X,I1,A1,1X,I1,A1,I1,A3,I5,I5,I6"
)

for index, row in dflevels.iterrows():
parity = -index # give a unique parity so that all transitions are permitted
energyabovegsinpercm = float(row.energy)
g = 2 * row.j + 1
newlevel = energy_level_tuple(
levelname=get_levelname(row), parity=parity, g=g, energyabovegsinpercm=energyabovegsinpercm
gfall_columns = [
"wavelength_nm",
"loggf",
"z_dot_ioncharge",
"energyabovegsinpercm_first",
"j_first",
"blank1",
"label_first",
"energyabovegsinpercm_second",
"j_second",
"blank2",
"label_second",
"log_gamma_rad",
"log_gamma_stark",
"log_gamma_vderwaals",
"ref",
"nlte_level_no_first",
"nlte_level_no_second",
"isotope",
"log_f_hyperfine",
"isotope2",
"log_iso_abundance",
"hyper_shift_first",
"hyper_shift_second",
"blank3",
"hyperfine_f_first",
"hyperfine_note_first",
"blank4",
"hyperfine_f_second",
"hyperfine_note_second",
"line_strength_class",
"line_code",
"lande_g_first",
"lande_g_second",
"isotopic_shift",
]
number_match = re.compile(r"\d+(\.\d+)?")
type_match = re.compile(r"[FIXA]")
type_dict = {"F": np.float64, "I": np.int64, "X": str, "A": str}
field_types = tuple(type_dict[item] for item in number_match.sub("", gfall_fortran_format).split(","))

field_widths = list(map(int, re.sub(r"\.\d+", "", type_match.sub("", gfall_fortran_format)).split(",")))

import pandas as pd

gfall = (
pl.from_pandas(
pd.read_fwf(
fname,
widths=field_widths,
skip_blank_lines=True,
names=gfall_columns,
dtypes=dict(zip(gfall_columns, field_types, strict=True)),
compression="infer",
dtype_backend="pyarrow",
)
)
energy_levels.append(newlevel)

energy_levels.sort(key=lambda x: x.energyabovegsinpercm)

return [None, *energy_levels]


def read_lines_data(energy_levels, dflines):
transitions = []
transition_count_of_level_name = defaultdict(int)
transitiontuple = namedtuple("transition", "lowerlevel upperlevel A coll_str")
.lazy()
.drop_nulls(["z_dot_ioncharge", "energyabovegsinpercm_first", "energyabovegsinpercm_second"])
)
double_columns = [col.replace("_first", "") for col in gfall.collect_schema().names() if col.endswith("first")]

for (lowerindex, upperindex), row in dflines.iterrows():
lowerlevel = lowerindex + 1
upperlevel = upperindex + 1
# due to the fact that energy is stored in 1/cm
gfall = gfall.with_columns(
order_lower_upper=pl.col("energyabovegsinpercm_first").abs() < pl.col("energyabovegsinpercm_second").abs()
)
gfall = gfall.with_columns(
pl.when(pl.col("order_lower_upper"))
.then(f"{column}_first")
.otherwise(f"{column}_second")
.alias(f"{column}_lower")
for column in double_columns
).with_columns(
pl.when(pl.col("order_lower_upper"))
.then(f"{column}_second")
.otherwise(f"{column}_first")
.alias(f"{column}_upper")
for column in double_columns
)

transtuple = transitiontuple(lowerlevel=lowerlevel, upperlevel=upperlevel, A=row.A, coll_str=-1)
# Clean labels
ignored_labels = ["AVERAGE", "ENERGIES", "CONTINUUM"]
gfall = gfall.with_columns(
pl.col("label_lower").str.strip_chars().replace(r"\s+", " "),
pl.col("label_upper").str.strip_chars().replace(r"\s+", " "),
).filter(
(pl.col("label_lower").is_in(ignored_labels).not_()) & (pl.col("label_upper").is_in(ignored_labels).not_())
)

# print(line)
transition_count_of_level_name[energy_levels[lowerlevel].levelname] += 1
transition_count_of_level_name[energy_levels[upperlevel].levelname] += 1
gfall = gfall.with_columns(
energyabovegsinpercm_lower_predicted=pl.col("energyabovegsinpercm_lower") < 0,
energyabovegsinpercm_lower=pl.col("energyabovegsinpercm_lower").abs(),
energyabovegsinpercm_upper_predicted=pl.col("energyabovegsinpercm_upper") < 0,
energyabovegsinpercm_upper=pl.col("energyabovegsinpercm_upper").abs(),
)

transitions.append(transtuple)
gfall = gfall.with_columns(atomic_number=pl.col("z_dot_ioncharge").cast(pl.Int64)).with_columns(
ion_charge=((pl.col("z_dot_ioncharge") - pl.col("atomic_number")) * 100).round().cast(pl.Int64),
)
if gfall.select(pl.n_unique("z_dot_ioncharge")).collect().item() != 1:
raise ValueError(f"Expected exactly one unique ion in file {fname}, but found multiple")

return transitions, transition_count_of_level_name
return gfall


def read_levels_and_transitions(atomic_number, ion_stage, flog):
def find_gfall(atomic_number: int, ion_charge: int) -> Path:
extended_atoms_filenames = [
f"gf{atomic_number:02d}{ion_charge:02d}.lines.zst",
f"gf{atomic_number:02d}{ion_charge:02d}.lines",
f"gf{atomic_number:02d}{ion_charge:02d}z.lines.zst",
f"gf{atomic_number:02d}{ion_charge:02d}z.lines",
]
for filename in extended_atoms_filenames:
path_gfall = (kuruczdatapath / "extendedatoms" / filename).resolve()
if path_gfall.is_file():
return path_gfall
zztar_filenames = [f"gf{atomic_number:02d}{ion_charge:02d}.all", f"gf{atomic_number:02d}{ion_charge:02d}.all.zst"]
for filename in zztar_filenames:
path_gfall = (kuruczdatapath / "zztar" / filename).resolve()
if path_gfall.is_file():
return path_gfall

raise FileNotFoundError(f"No Kurucz file for Z={atomic_number} ion_charge {ion_charge}.")


def read_levels_and_transitions(
atomic_number: int, ion_stage: int, flog
) -> tuple[float, pl.DataFrame, pl.DataFrame, dict[str, int]]:
ion_charge = ion_stage - 1

artisatomic.log_and_print(flog, f"Using Kurucz for Z={atomic_number} ion_stage {ion_stage}")

from carsus.io.kurucz import GFALLReader # ty:ignore[unresolved-import]

# path_gfall = (Path(__file__).parent.absolute() / ".." / "atomic-data-kurucz" / "gfall.dat").resolve()
path_gfall = find_gfall(atomic_number, ion_charge)
artisatomic.log_and_print(flog, f"Reading {path_gfall}")
gfall_reader = GFALLReader(
ions=f"{artisatomic.elsymbols[atomic_number]} {ion_charge}",
fname=str(path_gfall),
unique_level_identifier=["energy", "j"],
)

dflevels = gfall_reader.extract_levels().loc[atomic_number, ion_charge]

energy_levels = read_levels_data(dflevels)

artisatomic.log_and_print(flog, f"Read {len(energy_levels[1:]):d} levels")

dflines = gfall_reader.extract_lines().loc[atomic_number, ion_charge]

# wavelengths are in nanometers, so multiply by 10 to get Angstroms
dflines["A"] = dflines["gf"] / (1.49919e-16 * (2 * dflines["j_upper"] + 1) * (dflines["wavelength"] * 10.0) ** 2)
gfall = parse_gfall(fname=str(path_gfall))
column_renames = {
"energyabovegsinpercm_{0}": "energyabovegsinpercm",
"j_{0}": "j",
"label_{0}": "label",
"energyabovegsinpercm_{0}_predicted": "theoretical",
}

e_lower_levels = gfall.rename({key.format("lower"): value for key, value in column_renames.items()})
e_upper_levels = gfall.rename({key.format("upper"): value for key, value in column_renames.items()})

selected_columns = ["atomic_number", "ion_charge", "energyabovegsinpercm", "j", "label", "theoretical"]
dflevels = (
pl.concat([e_lower_levels.select(selected_columns), e_upper_levels.select(selected_columns)])
.unique(["energyabovegsinpercm", "j"], keep="first")
.sort(["energyabovegsinpercm", "j", "label"])
.with_row_index("levelid")
.select(
pl.col("energyabovegsinpercm"),
pl.col("j"),
levelname=(
pl.col("label")
+ ",enpercm="
+ pl.col("energyabovegsinpercm").cast(pl.Utf8)
+ ",j="
+ pl.col("j").cast(pl.String)
),
g=2 * pl.col("j") + 1,
)
.sort("energyabovegsinpercm", "j")
.collect()
)
dflevels = (
artisatomic.add_dummy_zero_level(dflevels)
.with_row_index("levelid")
.with_columns(pl.col("levelid").cast(pl.Int64))
.with_columns(parity=-pl.col("levelid")) # give a unique parity so that all transitions are permitted
)
artisatomic.log_and_print(flog, f"Read {len(dflevels) - 1:d} levels")

transitions = (
gfall.select(
[
"atomic_number",
"ion_charge",
"energyabovegsinpercm_lower",
"j_lower",
"energyabovegsinpercm_upper",
"j_upper",
"wavelength_nm",
"loggf",
]
)
.with_columns(gf=10 ** pl.col("loggf"))
.drop("loggf")
.join(
dflevels.lazy().select(
energyabovegsinpercm_lower=pl.col("energyabovegsinpercm"),
j_lower=pl.col("j"),
levelid_lower=pl.col("levelid"),
),
on=["energyabovegsinpercm_lower", "j_lower"],
how="left",
)
.join(
dflevels.lazy().select(
energyabovegsinpercm_upper=pl.col("energyabovegsinpercm"),
j_upper=pl.col("j"),
levelid_upper=pl.col("levelid"),
),
on=["energyabovegsinpercm_upper", "j_upper"],
how="left",
)
.with_columns(
# wavelengths are in nanometers, so multiply by 10 to get Angstroms
A=pl.col("gf") / (1.49919e-16 * (2 * pl.col("j_upper") + 1) * (pl.col("wavelength_nm") * 10.0).pow(2))
)
.select(
upperlevel=pl.col("levelid_upper"),
lowerlevel=pl.col("levelid_lower"),
A=pl.col("A"),
coll_str=pl.lit(-1),
)
.collect()
)

transitions, transition_count_of_level_name = read_lines_data(energy_levels, dflines)
transition_count_of_level_name = {
levelname: (
transitions.select(((pl.col("lowerlevel") == levelid) | (pl.col("upperlevel") == levelid)).sum()).item()
)
for levelid, levelname in dflevels.select("levelid", "levelname").iter_rows(named=False)
}
artisatomic.log_and_print(flog, f"Read {len(transitions):d} transitions")

ionization_energy_in_ev = artisatomic.get_nist_ionization_energies_ev()[(atomic_number, ion_stage)]
artisatomic.log_and_print(flog, f"ionization energy: {ionization_energy_in_ev} eV")

return ionization_energy_in_ev, energy_levels, transitions, transition_count_of_level_name
return ionization_energy_in_ev, dflevels, transitions, transition_count_of_level_name


def get_level_valence_n(levelname: str):
Expand Down
8 changes: 7 additions & 1 deletion artisatomic/readqubdata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import typing as t
from collections import defaultdict
from collections import namedtuple
Expand All @@ -11,7 +12,12 @@

import artisatomic

tyndall_co3_path = (Path(__file__).parent.resolve() / ".." / "atomic-data-qub" / "co_tyndall").resolve()
tyndall_co3_path = (
Path(__file__).parent.resolve()
/ ".."
/ "atomic-data-qub"
/ ("co_tyndall_test_sample" if os.environ.get("ARTISATOMIC_TESTMODE") == "1" else "co_tyndall")
).resolve()

ryd_to_ev = 13.605693122994232

Expand Down
7 changes: 3 additions & 4 deletions atomic-data-kurucz/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
gfall*
*.lines
*.all
creationdates.dat
extendedatoms/*.lines*
zztar/*.all*
zztar.tar
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading