Skip to content

Commit 5fc26c1

Browse files
committed
refactored data caching
1 parent 4b8c891 commit 5fc26c1

File tree

7 files changed

+145
-107
lines changed

7 files changed

+145
-107
lines changed

cems/src/main/python/pmonitor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ def _inquire_status(self):
590590
inquiry = {}
591591
# retrieve status with polling script
592592
if external_ids:
593-
print('retrieving status with ' + self._polling + ' ' + ' '.join(external_ids))
593+
# print('retrieving status with ' + self._polling + ' ' + ' '.join(external_ids))
594594
process = subprocess.Popen(self._polling + ' ' + ' '.join(external_ids),
595595
shell=True, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
596596
for l in process.stdout:

post-processing-tool/src/main/java/com/bc/fiduceo/post/plugin/era5/MatchupFields.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,18 @@ void compute(Configuration config, NetcdfFile reader, NetcdfFileWriter writer) t
107107

108108
final InterpolationContext interpolationContext = Era5PostProcessing.getInterpolationContext(lonLayer, latLayer);
109109
final Rectangle layerRegion = interpolationContext.getEra5Region();
110-
final int[] offset = new int[]{0, layerRegion.y, layerRegion.x};
111-
final int[] shape = new int[]{1, layerRegion.height, layerRegion.width};
110+
final int[] offset = new int[]{layerRegion.y, layerRegion.x};
111+
final int[] shape = new int[]{layerRegion.height, layerRegion.width};
112112

113113
// iterate over time stamps
114114
for (int t = 0; t < numTimeSteps; t++) {
115115
timeIndex.set(m, t);
116116
final int timeStamp = targetTimeArray.getInt(timeIndex);
117-
final Variable variable = variableCache.get(variableKey, timeStamp);
117+
VariableCache.CacheEntry cacheEntry = variableCache.get(variableKey, timeStamp);
118118

119119
// read and get rid of fake z-dimension
120-
Array subset = variable.read(offset, shape).reduce();
121-
subset = NetCDFUtils.scaleIfNecessary(variable, subset);
120+
Array subset = cacheEntry.array.section(offset, shape);
121+
subset = NetCDFUtils.scaleIfNecessary(cacheEntry.variable, subset);
122122
final Index subsetIndex = subset.getIndex();
123123
final BilinearInterpolator bilinearInterpolator = interpolationContext.get(0, 0);
124124

post-processing-tool/src/main/java/com/bc/fiduceo/post/plugin/era5/SatelliteFields.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,20 @@ class SatelliteFields extends FieldsProcessor {
2424
private List<Dimension> dimension3d;
2525
private Map<String, TemplateVariable> variables;
2626

27-
static Array readSubset(int numLayers, Rectangle era5RasterPosition, Variable variable) throws IOException, InvalidRangeException {
27+
static Array readSubset(int numLayers, Rectangle era5RasterPosition, VariableCache.CacheEntry cacheEntry) throws IOException, InvalidRangeException {
2828
Array subset;
2929

3030
final int maxRequestedX = era5RasterPosition.x + era5RasterPosition.width - 1;
3131
if (era5RasterPosition.x < 0 || maxRequestedX >= RASTER_WIDTH) {
32-
subset = readVariableDataOverlapped(numLayers, era5RasterPosition, variable);
32+
subset = readVariableDataOverlapped(numLayers, era5RasterPosition, cacheEntry.array);
3333
} else {
34-
subset = readVariableData(numLayers, era5RasterPosition, variable);
34+
subset = readVariableData(numLayers, era5RasterPosition, cacheEntry.array);
3535
}
3636

37-
return NetCDFUtils.scaleIfNecessary(variable, subset);
37+
return NetCDFUtils.scaleIfNecessary(cacheEntry.variable, subset);
3838
}
3939

40-
private static Array readVariableDataOverlapped(int numLayers, Rectangle era5RasterPosition, Variable variable) throws IOException, InvalidRangeException {
40+
private static Array readVariableDataOverlapped(int numLayers, Rectangle era5RasterPosition, Array array) throws IOException, InvalidRangeException {
4141
Array subset;
4242
int xMin = 0;
4343
int xMax;
@@ -53,20 +53,20 @@ private static Array readVariableDataOverlapped(int numLayers, Rectangle era5Ras
5353
leftWidth = era5RasterPosition.width - rightWidth;
5454
}
5555
final Rectangle leftEraPos = new Rectangle(xMin, era5RasterPosition.y, leftWidth, era5RasterPosition.height);
56-
final Array leftSubset = readVariableData(numLayers, leftEraPos, variable);
56+
final Array leftSubset = readVariableData(numLayers, leftEraPos, array);
5757

5858
final Rectangle rightEraPos = new Rectangle(xMax, era5RasterPosition.y, rightWidth, era5RasterPosition.height);
59-
final Array rightSubset = readVariableData(numLayers, rightEraPos, variable);
59+
final Array rightSubset = readVariableData(numLayers, rightEraPos, array);
6060

61-
subset = mergeData(leftSubset, rightSubset, numLayers, era5RasterPosition, variable);
61+
subset = mergeData(leftSubset, rightSubset, numLayers, era5RasterPosition, array);
6262
return subset;
6363
}
6464

65-
static Array mergeData(Array leftSubset, Array rightSubset, int numLayers, Rectangle era5RasterPosition, Variable variable) {
66-
final int rank = variable.getRank();
65+
static Array mergeData(Array leftSubset, Array rightSubset, int numLayers, Rectangle era5RasterPosition, Array array) {
66+
final int rank = array.getRank();
6767
final Array mergedArray;
6868
if (rank == 4) {
69-
mergedArray = Array.factory(variable.getDataType(), new int[]{numLayers, era5RasterPosition.height, era5RasterPosition.width});
69+
mergedArray = Array.factory(array.getDataType(), new int[]{numLayers, era5RasterPosition.height, era5RasterPosition.width});
7070
if (era5RasterPosition.x < 0) {
7171
final int xMax = era5RasterPosition.width + era5RasterPosition.x;
7272
mergeArrays_3D(leftSubset, rightSubset, era5RasterPosition, mergedArray, xMax);
@@ -75,7 +75,7 @@ static Array mergeData(Array leftSubset, Array rightSubset, int numLayers, Recta
7575
mergeArrays_3D(rightSubset, leftSubset, era5RasterPosition, mergedArray, xMax);
7676
}
7777
} else {
78-
mergedArray = Array.factory(variable.getDataType(), new int[]{era5RasterPosition.height, era5RasterPosition.width});
78+
mergedArray = Array.factory(array.getDataType(), new int[]{era5RasterPosition.height, era5RasterPosition.width});
7979
if (era5RasterPosition.x < 0) {
8080
final int xMax = era5RasterPosition.width + era5RasterPosition.x;
8181
mergeArrays(leftSubset, rightSubset, era5RasterPosition, mergedArray, xMax);
@@ -141,24 +141,23 @@ private static void mergeArrays_3D(Array leftSubset, Array rightSubset, Rectangl
141141
}
142142
}
143143

144-
private static Array readVariableData(int numLayers, Rectangle era5RasterPosition, Variable variable) throws IOException, InvalidRangeException {
145-
final int rank = variable.getRank();
144+
private static Array readVariableData(int numLayers, Rectangle era5RasterPosition, Array array) throws IOException, InvalidRangeException {
145+
final int rank = array.getRank();
146146
Array subset;
147-
if (rank == 3) {
147+
if (rank == 2) {
148+
final int[] origin = new int[]{era5RasterPosition.y, era5RasterPosition.x};
149+
final int[] shape = new int[]{era5RasterPosition.height, era5RasterPosition.width};
150+
final int[] stride = new int[]{1, 1};
151+
subset = array.sectionNoReduce(origin, shape, stride);
152+
} else if (rank == 3) {
148153
final int[] origin = new int[]{0, era5RasterPosition.y, era5RasterPosition.x};
149-
final int[] shape = new int[]{1, era5RasterPosition.height, era5RasterPosition.width};
150-
subset = variable.read(origin, shape);
151-
} else if (rank == 4) {
152-
final int[] origin = new int[]{0, 0, era5RasterPosition.y, era5RasterPosition.x};
153-
final int[] shape = new int[]{1, numLayers, era5RasterPosition.height, era5RasterPosition.width};
154-
subset = variable.read(origin, shape);
154+
final int[] shape = new int[]{numLayers, era5RasterPosition.height, era5RasterPosition.width};
155+
final int[] stride = new int[]{1, 1, 1};
156+
subset = array.sectionNoReduce(origin, shape, stride);
155157
} else {
156-
throw new IOException("variable rank invalid: " + variable.getShortName());
158+
throw new IOException("variable rank invalid");
157159
}
158160

159-
// remove the time dimension of length 1 tb 2021-01-14
160-
subset = subset.reduce(0);
161-
162161
return subset;
163162
}
164163

@@ -182,7 +181,7 @@ void compute(Configuration config, NetcdfFile reader, NetcdfFileWriter writer) t
182181
final SatelliteFieldsConfiguration satFieldsConfig = config.getSatelliteFields();
183182
final int numLayers = satFieldsConfig.get_z_dim();
184183
final Era5Archive era5Archive = new Era5Archive(config.getNWPAuxDir());
185-
final VariableCache variableCache = new VariableCache(era5Archive, 52); // 4 * 13 variables tb 2020-11-25
184+
final VariableCache variableCache = new VariableCache(era5Archive, 156); // 4 * 13 variables tb 2020-11-25
186185

187186
try {
188187
// open input time variable
@@ -240,8 +239,8 @@ void compute(Configuration config, NetcdfFile reader, NetcdfFileWriter writer) t
240239
// - store to target raster
241240
final Set<String> variableKeys = variables.keySet();
242241
for (final String variableKey : variableKeys) {
243-
final Variable variable = variableCache.get(variableKey, era5Time);
244-
final Array subset = readSubset(numLayers, layerRegion, variable);
242+
VariableCache.CacheEntry cacheEntry = variableCache.get(variableKey, era5Time);
243+
final Array subset = readSubset(numLayers, layerRegion, cacheEntry);
245244
final Index subsetIndex = subset.getIndex();
246245

247246
final Array targetArray = targetArrays.get(variableKey);

post-processing-tool/src/main/java/com/bc/fiduceo/post/plugin/era5/VariableCache.java

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.bc.fiduceo.post.plugin.era5;
22

33
import org.esa.snap.core.util.io.FileUtils;
4+
import ucar.ma2.Array;
45
import ucar.nc2.NetcdfFile;
56
import ucar.nc2.Variable;
67

@@ -14,7 +15,7 @@
1415
class VariableCache {
1516

1617
private final Era5Archive archive;
17-
private final HashMap<String, CacheEntry> cache;
18+
private final HashMap<String, CacheContainer> cache;
1819
private final int cacheSize;
1920

2021

@@ -24,13 +25,18 @@ class VariableCache {
2425
cache = new HashMap<>();
2526
}
2627

27-
Variable get(String variableKey, int era5TimeStamp) throws IOException {
28+
CacheEntry get(String variableKey, int era5TimeStamp) throws IOException {
29+
final CacheContainer cacheContainer = getCacheContainer(variableKey, era5TimeStamp);
30+
return cacheContainer.cacheEntry;
31+
}
32+
33+
private CacheContainer getCacheContainer(String variableKey, int era5TimeStamp) throws IOException {
2834
final String filePath = archive.get(variableKey, era5TimeStamp);
2935
final String variableName = getVariableName(variableKey);
3036
final String key = FileUtils.getFilenameWithoutExtension(new File(filePath));
3137

32-
CacheEntry cacheEntry = cache.get(key);
33-
if (cacheEntry == null) {
38+
CacheContainer cacheContainer = cache.get(key);
39+
if (cacheContainer == null) {
3440
final NetcdfFile netcdfFile = NetcdfFile.open(filePath);
3541
final Variable variable = netcdfFile.findVariable(variableName);
3642
if (variable == null) {
@@ -39,21 +45,21 @@ Variable get(String variableKey, int era5TimeStamp) throws IOException {
3945
if (cache.size() == cacheSize) {
4046
removeOldest();
4147
}
42-
cacheEntry = new CacheEntry(variable, netcdfFile, System.currentTimeMillis());
43-
cache.put(key, cacheEntry);
48+
final Array array = variable.read().reduce();
49+
50+
cacheContainer = new CacheContainer(variable, netcdfFile, array, System.currentTimeMillis());
51+
cache.put(key, cacheContainer);
4452
}
4553

46-
cacheEntry.lastAccess = System.currentTimeMillis();
47-
return cacheEntry.variable;
54+
cacheContainer.lastAccess = System.currentTimeMillis();
55+
return cacheContainer;
4856
}
4957

50-
51-
5258
void close() throws IOException {
53-
final Collection<CacheEntry> cacheEntries = cache.values();
54-
for (CacheEntry cacheEntry : cacheEntries) {
55-
cacheEntry.netcdfFile.close();
56-
cacheEntry.netcdfFile = null;
59+
final Collection<CacheContainer> cacheEntries = cache.values();
60+
for (CacheContainer cacheContainer : cacheEntries) {
61+
cacheContainer.netcdfFile.close();
62+
cacheContainer.netcdfFile = null;
5763
}
5864

5965
cache.clear();
@@ -67,31 +73,43 @@ private String getVariableName(String variableKey) {
6773
private void removeOldest() throws IOException {
6874
long minTime = Long.MAX_VALUE;
6975
String toRemove = null;
70-
CacheEntry entryToRemove = null;
71-
final Set<Map.Entry<String, CacheEntry>> cacheEntries = cache.entrySet();
72-
for (Map.Entry<String, CacheEntry> cacheMapEntry : cacheEntries) {
73-
final CacheEntry cacheEntry = cacheMapEntry.getValue();
74-
if (cacheEntry.lastAccess < minTime) {
75-
minTime = cacheEntry.lastAccess;
76+
CacheContainer entryToRemove = null;
77+
final Set<Map.Entry<String, CacheContainer>> cacheEntries = cache.entrySet();
78+
for (Map.Entry<String, CacheContainer> cacheMapEntry : cacheEntries) {
79+
final CacheContainer cacheContainer = cacheMapEntry.getValue();
80+
if (cacheContainer.lastAccess < minTime) {
81+
minTime = cacheContainer.lastAccess;
7682
toRemove = cacheMapEntry.getKey();
77-
entryToRemove = cacheEntry;
83+
entryToRemove = cacheContainer;
7884
}
7985
}
8086

8187
if (entryToRemove != null) {
88+
entryToRemove.cacheEntry = null;
8289
entryToRemove.netcdfFile.close();
8390

8491
cache.remove(toRemove);
8592
}
8693
}
8794

88-
private class CacheEntry {
89-
Variable variable;
95+
static class CacheEntry {
96+
97+
final Variable variable;
98+
final Array array;
99+
100+
CacheEntry(Variable variable, Array array) {
101+
this.variable = variable;
102+
this.array = array;
103+
}
104+
}
105+
106+
private static class CacheContainer {
107+
CacheEntry cacheEntry;
90108
NetcdfFile netcdfFile;
91109
long lastAccess;
92110

93-
CacheEntry(Variable variable, NetcdfFile netcdfFile, long lastAccess) {
94-
this.variable = variable;
111+
CacheContainer(Variable variable, NetcdfFile netcdfFile, Array array, long lastAccess) {
112+
this.cacheEntry = new CacheEntry(variable, array);
95113
this.netcdfFile = netcdfFile;
96114
this.lastAccess = lastAccess;
97115
}

0 commit comments

Comments
 (0)