-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathriver_analysis.py
More file actions
219 lines (177 loc) · 8.03 KB
/
river_analysis.py
File metadata and controls
219 lines (177 loc) · 8.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""
River erosion and channel migration analysis – extract centerlines,
compute lateral migration, and identify erosion hotspots.
"""
import signal
import ee
import config as cfg
# Timeout for individual GEE getInfo() calls (seconds)
GEE_TIMEOUT = 300 # 5 minutes
class GEETimeoutError(Exception):
pass
def _timeout_handler(signum, frame):
raise GEETimeoutError("GEE call timed out")
def _getinfo_with_timeout(ee_obj, timeout=GEE_TIMEOUT):
"""Call getInfo() with a timeout to prevent indefinite hangs."""
old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(timeout)
try:
result = ee_obj.getInfo()
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return result
from data_acquisition import (
get_study_area, get_landsat_collection, make_composite
)
from water_classification import classify_water
# ═══════════════════════════════════════════════════════════════════════════════
# River Corridor ROI
# ═══════════════════════════════════════════════════════════════════════════════
def get_river_roi(river_name):
"""Create a buffered corridor geometry for a named river."""
river = cfg.RIVERS[river_name]
points = [ee.Geometry.Point(lon, lat) for lat, lon in river["points"]]
line = ee.Geometry.MultiPoint(points).convexHull()
return line.buffer(river["buffer_m"])
# ═══════════════════════════════════════════════════════════════════════════════
# Decadal Water Masks & Centerlines
# ═══════════════════════════════════════════════════════════════════════════════
def get_decadal_water_mask(year, river_roi, window=2):
"""
Get a water mask for a river corridor around a target year.
Uses a ±window year range for dry-season compositing to reduce noise.
Uses method='fixed' because river corridor ROIs are too small for
reliable Otsu histogram thresholding.
"""
import datetime
start = f"{year - window}-12-01"
# Clamp end date to today so we never query into the future
max_end = datetime.date.today().isoformat()
end = min(f"{year + window}-02-28", max_end)
col = get_landsat_collection(start, end, river_roi)
# Guard against empty collections (e.g. for very recent years)
col_size = _getinfo_with_timeout(col.size())
if col_size == 0:
# Fall back to a wider window
start = f"{year - window - 2}-12-01"
end = max_end
col = get_landsat_collection(start, end, river_roi)
composite = make_composite(col, method="median")
return classify_water(composite, region=river_roi, method="fixed")
def extract_centerline(water_mask, river_roi, scale=30):
"""
Extract river centerline by morphological thinning of the water mask.
Uses iterative erosion to approximate the skeleton/medial axis.
"""
# Focal erosion iterations to thin the mask
kernel = ee.Kernel.circle(radius=scale, units="meters")
thinned = water_mask
for _ in range(5):
eroded = thinned.focalMin(kernel=kernel)
thinned = thinned.subtract(eroded).gt(0).Or(eroded)
# The skeleton is the set of pixels that remain after thinning
skeleton = water_mask.subtract(
water_mask.focalMin(kernel=ee.Kernel.circle(radius=scale * 2, units="meters"))
).gt(0).selfMask()
return skeleton.rename("centerline")
def get_decadal_centerlines(river_name, decades=None):
"""
Extract centerlines for a river across multiple decades.
Returns dict {year: ee.Image centerline}.
"""
if decades is None:
decades = cfg.DECADES
river_roi = get_river_roi(river_name)
centerlines = {}
for year in decades:
water_mask = get_decadal_water_mask(year, river_roi)
centerlines[year] = extract_centerline(water_mask, river_roi)
return centerlines
# ═══════════════════════════════════════════════════════════════════════════════
# Channel Migration & Erosion
# ═══════════════════════════════════════════════════════════════════════════════
def compute_channel_migration(water_masks, decades=None):
"""
Compute channel migration between consecutive decades.
Returns list of dicts with migration info.
"""
if decades is None:
decades = cfg.DECADES
migrations = []
for i in range(len(decades) - 1):
y1, y2 = decades[i], decades[i + 1]
mask1 = water_masks[y1]
mask2 = water_masks[y2]
# Erosion: was water in y1 but not in y2 (land gain / bank erosion on opposite side)
eroded = mask1.And(mask2.Not()).rename("eroded")
# Accretion: was not water in y1 but is water in y2
accreted = mask2.And(mask1.Not()).rename("accreted")
# Net change
net_change = mask2.subtract(mask1).rename("net_change")
migrations.append({
"period": f"{y1}-{y2}",
"start_year": y1,
"end_year": y2,
"eroded": eroded,
"accreted": accreted,
"net_change": net_change,
})
return migrations
def compute_erosion_area(eroded_mask, region, scale=30):
"""Calculate total erosion area in hectares."""
area = eroded_mask.multiply(ee.Image.pixelArea()).reduceRegion(
reducer=ee.Reducer.sum(),
geometry=region,
scale=scale,
maxPixels=cfg.MAX_PIXELS,
bestEffort=True,
)
return ee.Number(area.get("eroded")).divide(10000) # m² to ha
def compute_erosion_rate(eroded_mask, region, years_elapsed, scale=30):
"""Calculate erosion rate in hectares per year."""
area_ha = compute_erosion_area(eroded_mask, region, scale)
return area_ha.divide(years_elapsed)
def identify_erosion_hotspots(migrations, river_roi, scale=30):
"""
Identify persistent erosion hotspots – areas eroded in multiple periods.
Returns an image where pixel value = number of periods with erosion.
"""
hotspot = ee.Image.constant(0).clip(river_roi)
for m in migrations:
hotspot = hotspot.add(m["eroded"])
return hotspot.rename("erosion_frequency")
def run_river_analysis(river_name):
"""
Full river analysis pipeline for a single river.
Returns dict with all products.
"""
river_roi = get_river_roi(river_name)
decades = cfg.DECADES
# Water masks per decade
water_masks = {}
for year in decades:
water_masks[year] = get_decadal_water_mask(year, river_roi)
# Centerlines
centerlines = {}
for year in decades:
centerlines[year] = extract_centerline(water_masks[year], river_roi)
# Channel migration
migrations = compute_channel_migration(water_masks, decades)
# Erosion hotspots
hotspots = identify_erosion_hotspots(migrations, river_roi)
# Erosion rates per period
rates = []
for m in migrations:
years_elapsed = m["end_year"] - m["start_year"]
rate = compute_erosion_rate(m["eroded"], river_roi, years_elapsed)
rates.append({"period": m["period"], "rate_ha_per_year": rate})
return {
"river_name": river_name,
"roi": river_roi,
"water_masks": water_masks,
"centerlines": centerlines,
"migrations": migrations,
"hotspots": hotspots,
"erosion_rates": rates,
}