|
14 | 14 | from pathlib import Path |
15 | 15 | from itertools import product |
16 | 16 | from functools import partial |
17 | | -from p_tqdm import p_map |
| 17 | +from p_tqdm import p_map, p_umap |
18 | 18 | from dscim.menu.simple_storage import EconVars |
19 | 19 | from zarr.errors import GroupNotFoundError |
20 | 20 |
|
@@ -220,13 +220,50 @@ def concatenate_labor_damages( |
220 | 220 |
|
221 | 221 | # save out |
222 | 222 | if format_file == "zarr": |
223 | | - concat_ds.to_zarr(f"{path_to_file}.zarr", consolidated=True, mode="w") |
| 223 | + to_store = concat_ds.copy() |
| 224 | + for var in to_store.variables: |
| 225 | + to_store[var].encoding.clear() |
| 226 | + |
| 227 | + to_store.to_zarr(f"{path_to_file}.zarr", mode="w", consolidated=True) |
224 | 228 | elif format_file == "netcdf": |
225 | 229 | concat_ds.to_netcdf(f"{path_to_file}.nc4") |
226 | 230 |
|
227 | 231 | return concat_ds |
228 | 232 |
|
229 | 233 |
|
| 234 | +def calculate_labor_batch_damages(batch, ec, input_path, save_path): |
| 235 | + print(f"Processing batch={batch} damages in {os.getpid()}") |
| 236 | + concatenate_labor_damages( |
| 237 | + input_path=input_path, |
| 238 | + save_path=save_path, |
| 239 | + ec_cls=ec, |
| 240 | + variable="rebased", |
| 241 | + val_type="wage-levels", |
| 242 | + format_file="zarr", |
| 243 | + query=f"exists==True&batch=='batch{batch}'", |
| 244 | + ) |
| 245 | + print(f"Saved!") |
| 246 | + |
| 247 | + |
| 248 | +def calculate_labor_damages( |
| 249 | + path_econ="/shares/gcp/integration/float32/dscim_input_data/econvars/zarrs/integration-econ-bc39.zarr", |
| 250 | + input_path="/shares/gcp/outputs/labor/impacts-woodwork/mc_correct_rebasing_for_integration", |
| 251 | + save_path="/shares/gcp/integration/float32/input_data_histclim/labor_data/replication/", |
| 252 | +): |
| 253 | + ec = EconVars(path_econ) |
| 254 | + # process in 3 rounds to limit memory usage |
| 255 | + for i in range(0, 3): |
| 256 | + partial_func = partial( |
| 257 | + calculate_labor_batch_damages, |
| 258 | + input_path=input_path, |
| 259 | + save_path=save_path, |
| 260 | + ec=ec, |
| 261 | + ) |
| 262 | + print("Processing batches:") |
| 263 | + print(list(range(i * 5, i * 5 + 5))) |
| 264 | + r = p_umap(partial_func, list(range(i * 5, i * 5 + 5))) |
| 265 | + |
| 266 | + |
230 | 267 | def compute_ag_damages( |
231 | 268 | input_path, |
232 | 269 | save_path, |
@@ -542,13 +579,66 @@ def concatenate_energy_damages( |
542 | 579 | logger.info(f"Concatenating and processing {i}") |
543 | 580 |
|
544 | 581 | if format_file == "zarr": |
545 | | - concat_ds.to_zarr(f"{path_to_file}.zarr", consolidated=True, mode="w") |
| 582 | + to_store = concat_ds.copy() |
| 583 | + for var in to_store.variables: |
| 584 | + to_store[var].encoding.clear() |
| 585 | + |
| 586 | + to_store.to_zarr(f"{path_to_file}.zarr", mode="w", consolidated=True) |
546 | 587 | elif format_file == "netcdf": |
547 | 588 | concat_ds.to_netcdf(f"{path_to_file}.nc4") |
548 | 589 |
|
549 | 590 | return concat_ds |
550 | 591 |
|
551 | 592 |
|
| 593 | +def calculate_energy_batch_damages(batch, ec, input_path, save_path): |
| 594 | + print(f"Processing batch={batch} damages in {os.getpid()}") |
| 595 | + concatenate_energy_damages( |
| 596 | + input_path=input_path, |
| 597 | + file_prefix="TINV_clim_integration_total_energy", |
| 598 | + save_path=save_path, |
| 599 | + ec_cls=ec, |
| 600 | + variable="rebased", |
| 601 | + format_file="zarr", |
| 602 | + query=f"exists==True&batch=='batch{batch}'", |
| 603 | + ) |
| 604 | + print(f"Saved!") |
| 605 | + |
| 606 | + |
| 607 | +def calculate_energy_damages( |
| 608 | + re_calculate=True, |
| 609 | + path_econ="/shares/gcp/integration/float32/dscim_input_data/econvars/zarrs/integration-econ-bc39.zarr", |
| 610 | + input_path="/shares/gcp/outputs/energy_pixel_interaction/impacts-blueghost/integration_resampled", |
| 611 | + save_path="/shares/gcp/integration/float32/input_data_histclim/energy_data/replication_2022aug/", |
| 612 | +): |
| 613 | + ec = EconVars(path_econ) |
| 614 | + |
| 615 | + if re_calculate: |
| 616 | + read_energy_files_parallel( |
| 617 | + input_path=input_path, |
| 618 | + save_path=save_path, |
| 619 | + ec_cls=ec, |
| 620 | + seed="TINV_clim_integration_total_energy_delta", |
| 621 | + ) |
| 622 | + read_energy_files_parallel( |
| 623 | + input_path=input_path, |
| 624 | + save_path=save_path, |
| 625 | + ec_cls=ec, |
| 626 | + seed="TINV_clim_integration_total_energy_histclim", |
| 627 | + ) |
| 628 | + |
| 629 | + # process in 3 rounds to limit memory usage |
| 630 | + for i in range(0, 3): |
| 631 | + partial_func = partial( |
| 632 | + calculate_energy_batch_damages, |
| 633 | + input_path=input_path, |
| 634 | + save_path=save_path, |
| 635 | + ec=ec, |
| 636 | + ) |
| 637 | + print("Processing batches:") |
| 638 | + print(list(range(i * 5, i * 5 + 5))) |
| 639 | + r = p_umap(partial_func, list(range(i * 5, i * 5 + 5))) |
| 640 | + |
| 641 | + |
552 | 642 | def prep_mortality_damages( |
553 | 643 | gcms, |
554 | 644 | paths, |
|
0 commit comments