|
| 1 | +""" |
| 2 | +This script creates the bash scripts necessary to apply "lst1_magic_dl1_stereo_to_dl2.py" |
| 3 | +to the DL1 stereo data. It also creates new subdirectories associated with |
| 4 | +the data level 2. |
| 5 | +
|
| 6 | +Usage: |
| 7 | +$ DL1_to_DL2 -c configuration_file.yaml (-d list_dense.txt) |
| 8 | +""" |
| 9 | +import datetime |
| 10 | +import glob |
| 11 | +import logging |
| 12 | +import os |
| 13 | +from pathlib import Path |
| 14 | + |
| 15 | +import joblib |
| 16 | +import numpy as np |
| 17 | +import pandas as pd |
| 18 | +import yaml |
| 19 | + |
| 20 | +from magicctapipe import __version__ |
| 21 | +from magicctapipe.io import resource_file |
| 22 | +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( |
| 23 | + rc_lines, |
| 24 | + slurm_lines, |
| 25 | +) |
| 26 | +from magicctapipe.utils import auto_MCP_parser |
| 27 | + |
| 28 | +__all__ = ["ST_NSB_List", "bash_DL1Stereo_to_DL2"] |
| 29 | + |
| 30 | +logger = logging.getLogger(__name__) |
| 31 | +logger.addHandler(logging.StreamHandler()) |
| 32 | +logger.setLevel(logging.INFO) |
| 33 | + |
| 34 | + |
| 35 | +def ST_NSB_List( |
| 36 | + target_dir, nsb_list, source, df_LST, MAGIC_obs_periods, version, LST_date |
| 37 | +): |
| 38 | + """ |
| 39 | + This function creates the lists of runs separeted by run period and NSB level. |
| 40 | +
|
| 41 | + Parameters |
| 42 | + ---------- |
| 43 | + target_dir : str |
| 44 | + Path to the working directory |
| 45 | + nsb_list : list |
| 46 | + List of the MC NSB values |
| 47 | + source : str |
| 48 | + Source name |
| 49 | + df_LST : :class:`pandas.DataFrame` |
| 50 | + Dataframe collecting the LST1 runs (produced by the create_LST_table script) |
| 51 | + MAGIC_obs_periods : dict |
| 52 | + Dictionary of MAGIC observation periods (key = name of period, value = list of begin/end dates) |
| 53 | + version : str |
| 54 | + Version of the input (stereo subruns) data |
| 55 | + LST_date : list |
| 56 | + List of LST dates to be processed |
| 57 | + """ |
| 58 | + width = np.diff(nsb_list, append=[nsb_list[-1] + 0.5]) / 2.0 |
| 59 | + nsb_limit = [-0.01] + list( |
| 60 | + nsb_list + width |
| 61 | + ) # arbitrary small negative number so that 0.0 > nsb_limit[0] |
| 62 | + |
| 63 | + # Loops over all runs of all nights |
| 64 | + Nights_list = np.sort( |
| 65 | + glob.glob(f"{target_dir}/v{version}/{source}/DL1Stereo/Merged/*") |
| 66 | + ) |
| 67 | + for night in Nights_list: |
| 68 | + # Night period |
| 69 | + night_date = night.split("/")[-1] |
| 70 | + if str(night_date) not in LST_date: |
| 71 | + continue |
| 72 | + outdir = f"{target_dir}/v{__version__}/{source}/DL2/{night_date}/logs" |
| 73 | + os.makedirs(outdir, exist_ok=True) |
| 74 | + |
| 75 | + date_magic = datetime.datetime.strptime( |
| 76 | + night_date, "%Y%m%d" |
| 77 | + ) + datetime.timedelta(days=1) |
| 78 | + period = None |
| 79 | + for p_name, date_list in MAGIC_obs_periods.items(): |
| 80 | + for date1, date2 in date_list: |
| 81 | + date_init = datetime.datetime.strptime(date1, "%Y_%m_%d") |
| 82 | + date_end = datetime.datetime.strptime(date2, "%Y_%m_%d") |
| 83 | + if (date_magic >= date_init) and (date_magic <= date_end): |
| 84 | + period = p_name |
| 85 | + |
| 86 | + if period is None: |
| 87 | + print(f"Could not identify MAGIC period for LST night {night_date}") |
| 88 | + continue |
| 89 | + |
| 90 | + Run_list = glob.glob(f"{night}/*.h5") |
| 91 | + for Run in Run_list: |
| 92 | + # getting the run NSB |
| 93 | + run_str = Run.split("/")[-1].split(".")[1] |
| 94 | + run_LST_id = run_str.lstrip("Run") |
| 95 | + nsb = df_LST[df_LST["LST1_run"] == run_LST_id]["nsb"].tolist()[0] |
| 96 | + # rounding the NSB to the nearest MC nsb value |
| 97 | + for j in range(0, len(nsb_list)): |
| 98 | + if (nsb <= nsb_limit[j + 1]) & (nsb > nsb_limit[j]): |
| 99 | + nsb = nsb_list[j] |
| 100 | + |
| 101 | + # Writing on output .txt file |
| 102 | + if nsb <= nsb_limit[-1]: |
| 103 | + with open( |
| 104 | + f"{outdir}/{period}_{nsb}_{night_date}.txt", |
| 105 | + "a+", |
| 106 | + ) as file: |
| 107 | + file.write(f"{Run}\n") |
| 108 | + |
| 109 | + |
| 110 | +def bash_DL1Stereo_to_DL2( |
| 111 | + target_dir, |
| 112 | + source, |
| 113 | + env_name, |
| 114 | + cluster, |
| 115 | + RF_dir, |
| 116 | + df_LST, |
| 117 | + MC_v, |
| 118 | + version, |
| 119 | + nice, |
| 120 | + dense_list, |
| 121 | + LST_date, |
| 122 | +): |
| 123 | + """ |
| 124 | + This function generates the bashscript for running the DL1Stereo to DL2 analisys. |
| 125 | +
|
| 126 | + Parameters |
| 127 | + ---------- |
| 128 | + target_dir : str |
| 129 | + Path to the working directory |
| 130 | + source : str |
| 131 | + Source name |
| 132 | + env_name : str |
| 133 | + Conda enviroment name |
| 134 | + cluster : str |
| 135 | + Cluster system |
| 136 | + RF_dir : str |
| 137 | + Path to the RFs |
| 138 | + df_LST : :class:`pandas.DataFrame` |
| 139 | + Dataframe collecting the LST1 runs (produced by the create_LST_table script) |
| 140 | + MC_v : str |
| 141 | + Version of MC processing |
| 142 | + version : str |
| 143 | + Version of the input (stereo subruns) data |
| 144 | + nice : int or None |
| 145 | + Job priority |
| 146 | + dense_list : list |
| 147 | + List of sources that use the dense MC training line |
| 148 | + LST_date : list |
| 149 | + List of LST dates to be processed |
| 150 | + """ |
| 151 | + if cluster != "SLURM": |
| 152 | + logger.warning( |
| 153 | + "Automatic processing not implemented for the cluster indicated in the config file" |
| 154 | + ) |
| 155 | + return |
| 156 | + |
| 157 | + Nights_list = np.sort( |
| 158 | + glob.glob(f"{target_dir}/v{version}/{source}/DL1Stereo/Merged/*") |
| 159 | + ) |
| 160 | + |
| 161 | + for night in Nights_list: |
| 162 | + night_date = night.split("/")[-1] |
| 163 | + if str(night_date) not in LST_date: |
| 164 | + continue |
| 165 | + outdir = f"{target_dir}/v{__version__}/{source}/DL2/{night_date}/logs" |
| 166 | + File_list = glob.glob(f"{outdir}/*.txt") |
| 167 | + |
| 168 | + for file in File_list: |
| 169 | + with open(file, "r") as f: |
| 170 | + process_size = len(f.readlines()) - 1 |
| 171 | + if process_size < 0: |
| 172 | + continue |
| 173 | + nsb = file.split("/")[-1].split("_")[1] |
| 174 | + period = file.split("/")[-1].split("_")[0] |
| 175 | + dec = df_LST[df_LST.source == source].iloc[0]["MC_dec"] |
| 176 | + if np.isnan(dec): |
| 177 | + print(f"MC_dec is NaN for {source}") |
| 178 | + continue |
| 179 | + dec = str(dec).replace(".", "").replace("-", "min_") |
| 180 | + |
| 181 | + RFdir = f"{RF_dir}/{period}/NSB{nsb}/v{MC_v}/dec_{dec}{'_high_density' if source in dense_list else ''}/" |
| 182 | + |
| 183 | + if (not os.path.isdir(RFdir)) or (len(glob.glob(f"{RFdir}/*joblib")) < 3): |
| 184 | + print(f"no RF availables in {RFdir}") |
| 185 | + continue |
| 186 | + rfsize = 0 |
| 187 | + for rffile in glob.glob(f"{RFdir}/disp*joblib"): |
| 188 | + rfsize = rfsize + os.path.getsize(rffile) / (1024 * 1024 * 1024) |
| 189 | + rfsize = (rfsize * 1.75) + 2 |
| 190 | + slurm = slurm_lines( |
| 191 | + queue="short", |
| 192 | + job_name=f"{source}_DL1_to_DL2", |
| 193 | + nice_parameter=nice, |
| 194 | + array=process_size, |
| 195 | + mem=f"{int(rfsize)}g", |
| 196 | + out_name=f"{outdir}/slurm-%x.%A_%a", |
| 197 | + ) |
| 198 | + rc = rc_lines( |
| 199 | + store="$SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID}", |
| 200 | + out=f"{outdir}/list_{nsb}_{period}", |
| 201 | + ) |
| 202 | + out_file = outdir.rstrip("/logs") |
| 203 | + lines = ( |
| 204 | + slurm |
| 205 | + + [ |
| 206 | + f"SAMPLE_LIST=($(<{file}))\n", |
| 207 | + "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n", |
| 208 | + f"export LOG={outdir}", |
| 209 | + "/DL1_to_DL2_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}.log\n", |
| 210 | + f"conda run -n {env_name} lst1_magic_dl1_stereo_to_dl2 --input-file-dl1 $SAMPLE --input-dir-rfs {RFdir} --output-dir {out_file} >$LOG 2>&1\n\n", |
| 211 | + ] |
| 212 | + + rc |
| 213 | + ) |
| 214 | + with open( |
| 215 | + f'{source}_DL1_to_DL2_{file.split("/")[-1].rstrip("txt")}sh', |
| 216 | + "w", |
| 217 | + ) as f: |
| 218 | + f.writelines(lines) |
| 219 | + |
| 220 | + |
| 221 | +def main(): |
| 222 | + """ |
| 223 | + Here we read the config_auto_MCP.yaml file and call the functions defined above. |
| 224 | + """ |
| 225 | + |
| 226 | + parser = auto_MCP_parser() |
| 227 | + parser.add_argument( |
| 228 | + "--dense_MC_sources", |
| 229 | + "-d", |
| 230 | + dest="dense_list", |
| 231 | + type=str, |
| 232 | + help="File with name of sources to be processed with the dense MC train line", |
| 233 | + ) |
| 234 | + |
| 235 | + args = parser.parse_args() |
| 236 | + with open(args.config_file, "rb") as f: |
| 237 | + config = yaml.safe_load(f) |
| 238 | + |
| 239 | + dense_list = [] |
| 240 | + if args.dense_list is not None: |
| 241 | + with open(args.dense_list) as d: |
| 242 | + dense_list = d.read().splitlines() |
| 243 | + |
| 244 | + target_dir = Path(config["directories"]["workspace_dir"]) |
| 245 | + RF_dir = config["directories"]["RF"] |
| 246 | + env_name = config["general"]["env_name"] |
| 247 | + MAGIC_obs_periods = config["expert_parameters"]["MAGIC_obs_periods"] |
| 248 | + nsb_list = config["expert_parameters"]["nsb"] |
| 249 | + |
| 250 | + source_in = config["data_selection"]["source_name_database"] |
| 251 | + source = config["data_selection"]["source_name_output"] |
| 252 | + MC_v = config["directories"]["MC_version"] |
| 253 | + if MC_v == "": |
| 254 | + MC_v = __version__ |
| 255 | + |
| 256 | + cluster = config["general"]["cluster"] |
| 257 | + in_version = config["directories"]["real_input_version"] |
| 258 | + if in_version == "": |
| 259 | + in_version = __version__ |
| 260 | + nice_parameter = config["general"]["nice"] if "nice" in config["general"] else None |
| 261 | + |
| 262 | + # LST dataframe |
| 263 | + config_db = config["general"]["base_db_config_file"] |
| 264 | + if config_db == "": |
| 265 | + config_db = resource_file("database_config.yaml") |
| 266 | + |
| 267 | + with open( |
| 268 | + config_db, "rb" |
| 269 | + ) as fc: # "rb" mode opens the file in binary format for reading |
| 270 | + config_dict_db = yaml.safe_load(fc) |
| 271 | + |
| 272 | + LST_h5 = config_dict_db["database_paths"]["LST"] |
| 273 | + LST_key = config_dict_db["database_keys"]["LST"] |
| 274 | + df_LST = pd.read_hdf( |
| 275 | + LST_h5, |
| 276 | + key=LST_key, |
| 277 | + ) |
| 278 | + |
| 279 | + if source_in is None: |
| 280 | + source_list = joblib.load("list_sources.dat") |
| 281 | + else: |
| 282 | + source_list = [source] |
| 283 | + for source_name in source_list: |
| 284 | + LST_runs_and_dates = f"{source_name}_LST_runs.txt" |
| 285 | + LST_date = [] |
| 286 | + for i in np.genfromtxt(LST_runs_and_dates, dtype=str, delimiter=",", ndmin=2): |
| 287 | + LST_date.append(str(i[0].replace("_", ""))) |
| 288 | + LST_date = list(set(LST_date)) |
| 289 | + ST_NSB_List( |
| 290 | + target_dir, |
| 291 | + nsb_list, |
| 292 | + source_name, |
| 293 | + df_LST, |
| 294 | + MAGIC_obs_periods, |
| 295 | + in_version, |
| 296 | + LST_date, |
| 297 | + ) |
| 298 | + |
| 299 | + bash_DL1Stereo_to_DL2( |
| 300 | + target_dir, |
| 301 | + source_name, |
| 302 | + env_name, |
| 303 | + cluster, |
| 304 | + RF_dir, |
| 305 | + df_LST, |
| 306 | + MC_v, |
| 307 | + in_version, |
| 308 | + nice_parameter, |
| 309 | + dense_list, |
| 310 | + LST_date, |
| 311 | + ) |
| 312 | + list_of_dl2_scripts = np.sort(glob.glob(f"{source_name}_DL1_to_DL2*.sh")) |
| 313 | + if len(list_of_dl2_scripts) < 1: |
| 314 | + logger.warning(f"No bash scripts for {source_name}") |
| 315 | + continue |
| 316 | + launch_jobs = "" |
| 317 | + for n, run in enumerate(list_of_dl2_scripts): |
| 318 | + launch_jobs += (" && " if n > 0 else "") + f"sbatch {run}" |
| 319 | + os.system(launch_jobs) |
| 320 | + |
| 321 | + |
| 322 | +if __name__ == "__main__": |
| 323 | + main() |
0 commit comments