-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmyioda.py
More file actions
98 lines (81 loc) · 3.81 KB
/
myioda.py
File metadata and controls
98 lines (81 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import netCDF4 as nc
import pandas as pd
import numpy as np
import xarray as xr
class iodaapi(object):
def __init__(self, iodafile):
self.filename = iodafile
self.existence = os.path.exists(iodafile)
def load_to_groupds(self):
groups = xr.open_groups(self.filename)
groups_list = groups.keys()
self.global_group = groups['/']
self.metadata = groups['/MetaData']
self.obsvalue = groups['/ObsValue']
self.obserror = groups['/ObsError']
self.preqc = groups['/PreQC']
if '/hofx' in groups_list:
self.hofx = groups['/hofx']
else:
self.hofx = None
def crop(self, output, polygon):
'''
output: path of cropped IODA file
polygon: cropping area
'''
src = nc.Dataset(self.filename, 'r')
if src.dimensions['Location'].size == 0:
raise Exception('no obs available')
lat = src.groups['MetaData'].variables['latitude'][:].ravel()
lon = src.groups['MetaData'].variables['longitude'][:].ravel()
mask = isinside(lat, lon, polygon)
if np.count_nonzero(mask) == 0:
raise Exception('no obs available in the target area')
else:
print(f'{np.count_nonzero(mask)} obs in the target area')
dst = nc.Dataset(output, 'w')
dst.setncatts(src.__dict__)
for name, dimension in src.dimensions.items():
if name == 'Location':
dst.createDimension(name, np.count_nonzero(mask))
else:
dst.createDimension(name, len(dimension) if not dimension.isunlimited() else None)
for name, variable in src.variables.items():
print(f'Processing {variable}')
# Define the variable in the new file
dst_var = dst.createVariable(name, variable.datatype, variable.dimensions)
# Copy variable attributes
dst_var.setncatts(variable.__dict__)
if 'Location' in variable.dimensions:
indices = [slice(None)] * variable.ndim
dim_index = variable.dimensions.index('Location')
indices[dim_index] = mask
dst_var[:] = variable[tuple(indices)]
else:
dst_var[:] = variable[:]
for grp, group in src.groups.items():
dst_grp = dst.createGroup(grp)
for var, variable in src.groups[grp].variables.items():
print(f'Processing {grp} / {var}')
fill_value = variable.getncattr('_FillValue') if '_FillValue' in variable.ncattrs() else None
if fill_value is not None:
try:
dst_var = dst_grp.createVariable(var, variable.datatype, variable.dimensions,
fill_value=variable.datatype.type(fill_value))
except Exception as e:
print(f"Could not set _FillValue during variable creation: {e}")
dst_var = dst_grp.createVariable(var, variable.datatype, variable.dimensions)
else:
dst_var = dst_grp.createVariable(var, variable.datatype, variable.dimensions)
# Then copy remaining attributes
for attr in variable.ncattrs():
if attr != '_FillValue':
dst_var.setncattr(attr, variable.getncattr(attr))
if 'Location' in variable.dimensions:
indices = [slice(None)] * variable.ndim
dim_index = variable.dimensions.index('Location')
indices[dim_index] = mask
dst_var[:] = variable[tuple(indices)]
else:
dst_var[:] = variable[:]