-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsummary_process.jl
More file actions
1291 lines (1060 loc) · 52.1 KB
/
summary_process.jl
File metadata and controls
1291 lines (1060 loc) · 52.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
include("core/utilities.jl")
include("core/hydrology_calculations.jl")
include("TexNetWebToolLauncherHelperJulia.jl")
include("core/bill_pfront.jl")
include("graphs/julia_fsp_graphs.jl")
using DataFrames
using CSV
using Dates
#using PrettyTables
using Statistics
using Interpolations
using Random
using Distributions
using Base.Threads # Add Threads for parallelization
using .Utilities
using .HydroCalculations
using .TexNetWebToolLauncherHelperJulia
using .BillPFront
using .JuliaFSPGraphs
const RESULTS_FILE_NAME = "results.json"
const ARGS_FILE_NAME = "args.json"
"""
HydrologyParams struct to hold all hydrology parameters for Monte Carlo simulations
"""
struct HydrologyParams
aquifer_thickness::Float64
porosity::Float64
permeability::Float64
fluid_density::Float64
dynamic_viscosity::Float64
fluid_compressibility::Float64
rock_compressibility::Float64
plus_minus::Dict{String, Float64}
n_iterations::Int64
end
"""
Pre-process well data to avoid expensive operations in Monte Carlo loops
Returns a dictionary with well_id as key and processed well info as value
"""
function preprocess_well_data(injection_wells_df::DataFrame, well_id_col::String, injection_data_type::String)
#println("Pre-processing well data for optimization...")
# Determine column names once
lat_col = injection_data_type == "injection_tool_data" ? "Surface Latitude" : "Latitude(WGS84)"
lon_col = injection_data_type == "injection_tool_data" ? "Surface Longitude" : "Longitude(WGS84)"
# Group wells by ID and extract all needed info
well_info = Dict{String, NamedTuple}()
for well_id in unique(injection_wells_df[!, well_id_col])
well_id_str = string(well_id)
# Filter once per well (not millions of times)
well_data = injection_wells_df[string.(injection_wells_df[!, well_id_col]) .== well_id_str, :]
if isempty(well_data)
continue
end
# Extract coordinates once
well_lat = first(well_data[!, lat_col])
well_lon = first(well_data[!, lon_col])
# Process dates once based on injection type
if injection_data_type == "annual_fsp"
inj_start_year = first(well_data[!, "StartYear"])
inj_start_date = Date(inj_start_year, 1, 1)
inj_end_year = first(well_data[!, "EndYear"])
inj_end_date = Date(inj_end_year-1, 12, 31)
elseif injection_data_type == "monthly_fsp"
inj_start_year = minimum(well_data[!, "Year"])
inj_start_month = minimum(well_data[well_data[!, "Year"] .== inj_start_year, "Month"])
inj_start_date = Date(inj_start_year, inj_start_month, 1)
inj_end_year = maximum(well_data[!, "Year"])
inj_end_month = maximum(well_data[well_data[!, "Year"] .== inj_end_year, "Month"])
inj_end_date = Date(inj_end_year, inj_end_month, 1)
inj_end_date = lastdayofmonth(inj_end_date)
elseif injection_data_type == "injection_tool_data"
# Parse dates once
dates = []
if eltype(well_data[!, "Date of Injection"]) <: Date
dates = well_data[!, "Date of Injection"]
else
# Try parsing with different formats
try
dates = Date.(well_data[!, "Date of Injection"], dateformat"y-m-d")
catch
try
dates = Date.(well_data[!, "Date of Injection"], dateformat"m/d/y")
catch
try
dates = Date.(well_data[!, "Date of Injection"], dateformat"m/d/yyyy")
catch e
@warn "Could not parse dates for well $well_id_str: $e"
continue
end
end
end
end
if isempty(dates)
continue
end
inj_start_date = minimum(dates)
inj_end_date = maximum(dates)
else
error("Unsupported injection data type: $injection_data_type")
end
# Store all processed info
well_info[well_id_str] = (
data = well_data,
latitude = well_lat,
longitude = well_lon,
start_date = inj_start_date,
end_date = inj_end_date,
start_year = year(inj_start_date),
end_year = year(inj_end_date)
)
end
#println("Pre-processed $(length(well_info)) wells successfully")
return well_info
end
"""
Run Monte Carlo hydrology simulations for all years up to year_of_interest
Returns a DataFrame with fault ID, pressure, and year columns
Uses multi-threading with a ReentrantLock to avoid race conditions
"""
function run_mc_hydrology_time_series(
params::HydrologyParams,
fault_df::DataFrame,
injection_wells_df::DataFrame,
years_to_analyze::Vector{Int},
year_of_interest::Union{Int, Nothing},
injection_data_type::String,
distribution_type::String="uniform"
)
#println("Running Monte Carlo simulations with $(nthreads()) threads")
# Create distributions for MC sampling
distributions = Dict{String, Distribution}()
if distribution_type == "uniform"
distributions = Dict(
"aquifer_thickness" => Utilities.create_uniform_distribution(params.aquifer_thickness, params.plus_minus["aquifer_thickness"]),
"porosity" => Utilities.create_uniform_distribution(params.porosity, params.plus_minus["porosity"]),
"permeability" => Utilities.create_uniform_distribution(params.permeability, params.plus_minus["permeability"]),
"fluid_density" => Utilities.create_uniform_distribution(params.fluid_density, params.plus_minus["fluid_density"]),
"dynamic_viscosity" => Utilities.create_uniform_distribution(params.dynamic_viscosity, params.plus_minus["dynamic_viscosity"]),
"fluid_compressibility" => Utilities.create_uniform_distribution(params.fluid_compressibility, params.plus_minus["fluid_compressibility"]),
"rock_compressibility" => Utilities.create_uniform_distribution(params.rock_compressibility, params.plus_minus["rock_compressibility"])
)
elseif distribution_type == "gaussian"
@warn "Gaussian distribution not fully implemented, defaulting to uniform"
return run_mc_hydrology_time_series(params, fault_df, injection_wells_df, years_to_analyze, year_of_interest, injection_data_type, "uniform")
else
error("Unsupported distribution type: $distribution_type")
end
# Get well IDs
well_id_col = injection_data_type == "injection_tool_data" ? "API Number" : "WellID"
if !(well_id_col in names(injection_wells_df))
# Try alternates
if "APINumber" in names(injection_wells_df)
well_id_col = "APINumber"
elseif "UIC Number" in names(injection_wells_df)
well_id_col = "UIC Number"
elseif "Well ID" in names(injection_wells_df)
well_id_col = "Well ID"
elseif "UWI" in names(injection_wells_df)
well_id_col = "UWI"
else
error("Could not identify well ID column in injection data")
end
end
# PRE-PROCESS WELL DATA FOR OPTIMIZATION
well_info = preprocess_well_data(injection_wells_df, well_id_col, injection_data_type)
well_ids = collect(keys(well_info)) # Use pre-processed well IDs
# For injection tool data format, verify date column
if injection_data_type == "injection_tool_data"
if !("Date of Injection" in names(injection_wells_df))
error("'Date of Injection' column not found in injection well data")
end
end
# Get fault IDs
num_faults = nrow(fault_df)
fault_id_col = "FaultID" in names(fault_df) ? "FaultID" : "ID"
if !(fault_id_col in names(fault_df))
@warn "No FaultID column found, using sequential IDs"
fault_df[!, :TempID] = string.(1:num_faults)
fault_id_col = "TempID"
end
fault_ids = string.(fault_df[!, fault_id_col])
# Pre-process well data to get date boundaries
inj_start_date, inj_end_date = Utilities.get_date_bounds(injection_wells_df)
# Find the max end date of all injections
max_injection_year = year(inj_end_date)
# Structure: year -> iteration -> fault -> pressure
results = Dict{Int, Dict{Int, Dict{String, Float64}}}()
# Create range of years to analyze
if isempty(years_to_analyze)
years_to_analyze = year(inj_start_date):year_of_interest
end
# Initialize the results structure to avoid race conditions during parallel writing
for analysis_year in years_to_analyze
results[analysis_year] = Dict{Int, Dict{String, Float64}}()
for i in 1:params.n_iterations
results[analysis_year][i] = Dict{String, Float64}()
end
end
# We'll use a mutex to prevent race conditions when updating the results
# this allows only one thread/process to update the results at a time
results_lock = ReentrantLock()
# Prepare iteration indices for parallelization
iterations = collect(1:params.n_iterations)
# Process Monte Carlo iterations in parallel
@threads for i in iterations
# Sample parameters from distributions
sampled_params = Dict(
"aquifer_thickness" => rand(distributions["aquifer_thickness"]),
"porosity" => rand(distributions["porosity"]),
"permeability" => rand(distributions["permeability"]),
"fluid_density" => rand(distributions["fluid_density"]),
"dynamic_viscosity" => rand(distributions["dynamic_viscosity"]),
"fluid_compressibility" => rand(distributions["fluid_compressibility"]),
"rock_compressibility" => rand(distributions["rock_compressibility"])
)
# Calculate storativity and transmissivity
S, T, rho = HydroCalculations.calcST(
sampled_params["aquifer_thickness"],
sampled_params["porosity"],
sampled_params["permeability"],
sampled_params["fluid_density"],
sampled_params["dynamic_viscosity"],
9.81,
sampled_params["fluid_compressibility"],
sampled_params["rock_compressibility"]
)
STRho = (S, T, rho)
# Process each year for this iteration
for analysis_year in years_to_analyze
# Set up year cutoff date (Dec 31 of the year BEFORE analysis year)
# This matches MATLAB's behavior: evaluate pressure at Jan 1, 12:00 AM of analysis year
# which means include injection up to Dec 31 of the previous year
cutoff_date = Date(analysis_year - 1, 12, 31)
# Local results for this iteration and year
local_results = Dict{String, Float64}()
# Process each fault
for f in 1:num_faults
fault_id = fault_ids[f]
fault_lat = fault_df[f, "Latitude(WGS84)"]
fault_lon = fault_df[f, "Longitude(WGS84)"]
# Initialize total pressure for this fault
total_pressure = 0.0
# Process each well's contribution
for well_id in well_ids
# Use pre-processed well data (OPTIMIZED)
well = well_info[well_id]
# Skip if the well hasn't started injecting by the analysis year
if well.start_year > analysis_year
continue
end
# Use pre-processed coordinates and dates
well_lat = well.latitude
well_lon = well.longitude
inj_start_date = well.start_date
inj_end_date = well.end_date
# Skip if injection hasn't started by the cutoff date
if inj_start_date > cutoff_date
continue
end
# Limit end date to the analysis year cutoff
actual_end_date = min(inj_end_date, cutoff_date)
actual_end_year = year(actual_end_date)
# Prepare injection data using pre-processed well data
days, rates = Utilities.prepare_well_data_for_pressure_scenario(
well.data, # Use pre-filtered DataFrame
String(well_id),
well.start_year, # Use pre-processed start year
inj_start_date,
actual_end_year,
actual_end_date,
injection_data_type,
analysis_year,
false, # Don't extrapolate
cutoff_date # December 31 of the year before analysis year (Jan 1 12:00 AM of analysis year)
)
if isempty(days) || isempty(rates)
continue
end
# Calculate days from injection start to analysis date
evaluation_days_from_start = Float64((cutoff_date - inj_start_date).value + 1)
# Calculate pressure contribution from this well
pressure_contribution = HydroCalculations.pfieldcalc_all_rates(
fault_lon, #fault longitude
fault_lat, #fault latitude
STRho, #storativity, transmissivity, fluid density
days, #days of injection
rates, #rates of injection
well_lon, #well longitude
well_lat, #well latitude
evaluation_days_from_start #days from injection start to evaluation date
)
# Add to total pressure for this fault
total_pressure += pressure_contribution
end
# Ensure no negative pressure values
total_pressure = max(0.0, total_pressure)
# Store result for this fault and iteration
local_results[fault_id] = total_pressure
end
# We update the results dictionary using a lock to avoid race conditions
lock(results_lock) do
for (fault_id, pressure) in local_results
results[analysis_year][i][fault_id] = pressure
end
end
end
end
# Convert dictionary to df
result_rows = []
for year in sort(collect(keys(results)))
for iter in 1:params.n_iterations
for (fault_id, pressure) in results[year][iter]
push!(result_rows, (
ID = fault_id,
Pressure = pressure,
Year = year
))
end
end
end
# Create DataFrame
results_df = DataFrame(result_rows)
return results_df
end
"""
Calculate fault slip potential for each year using probabilistic hydrology results
"""
function calculate_fault_slip_potential(prob_geo_cdf::DataFrame, prob_hydro_df::DataFrame)
# Container for the time series results
fsp_through_time = DataFrame(
ID = String[],
Year = Int[],
FSP = Float64[],
epoch_time = Float64[]
)
# Group data by year
years = unique(prob_hydro_df.Year)
for year in years
# Filter data for this year
year_data = filter(row -> row.Year == year, prob_hydro_df)
# Get unique fault IDs for this year
fault_ids = unique(year_data.ID)
for fault_id in fault_ids
# Get fault's geomechanics CDF
fault_geo_cdf = prob_geo_cdf[prob_geo_cdf.ID .== fault_id, :]
if isempty(fault_geo_cdf)
@warn "No geomechanics CDF data found for fault ID $fault_id"
continue
end
# Get hydrology data for this fault and year
fault_hydro_data = year_data[year_data.ID .== fault_id, :]
if isempty(fault_hydro_data)
@warn "No hydrology data found for fault ID $fault_id in year $year"
continue
end
# Convert Monte Carlo results to an exceedance curve
fault_hydro_exceedance = JuliaFSPGraphs.prob_hydrology_cdf(fault_hydro_data)
if isempty(fault_hydro_exceedance)
@warn "Failed to generate exceedance curve for fault ID $fault_id in year $year"
continue
end
# Calculate mean pore pressure for this fault and year
fault_pressures = fault_hydro_data.Pressure
mean_pressure = mean(fault_pressures)
# Verify column names
geo_pressure_col = "slip_pressure" in names(fault_geo_cdf) ? "slip_pressure" : "pressure"
geo_prob_col = "probability" in names(fault_geo_cdf) ? "probability" : "cumulative_probability"
# Sort both datasets for intersection finding
sort!(fault_geo_cdf, geo_pressure_col)
sort!(fault_hydro_exceedance, :slip_pressure)
# Check for curve overlap
hydro_max_pressure = maximum(fault_hydro_exceedance.slip_pressure)
hydro_min_pressure = minimum(fault_hydro_exceedance.slip_pressure)
geo_max_pressure = maximum(fault_geo_cdf[!, geo_pressure_col])
geo_min_pressure = minimum(fault_geo_cdf[!, geo_pressure_col])
# Default FSP value
slip_potential = 0.0
# Check if hydrology is entirely to the left of geomechanics
if hydro_max_pressure < geo_min_pressure
slip_potential = 0.0
# Check if hydrology is entirely to the right of geomechanics
elseif hydro_min_pressure > geo_max_pressure
slip_potential = 1.0
else
# Find the intersection point between the curves
# Combine all pressure points for evaluation
all_pressures = unique(vcat(fault_geo_cdf[!, geo_pressure_col], fault_hydro_exceedance.slip_pressure))
sort!(all_pressures)
# Filter to pressures where both curves are defined
valid_pressures = filter(p ->
p >= max(geo_min_pressure, hydro_min_pressure) &&
p <= min(geo_max_pressure, hydro_max_pressure),
all_pressures)
# Check each pressure point to find where curves cross
intersection_found = false
intersection_probability = 0.0
for i in 1:(length(valid_pressures)-1)
p1 = valid_pressures[i]
p2 = valid_pressures[i+1]
# Evaluate both curves at p1 and p2
# Use interpolate_cdf which now handles deduplication internally
hydro_prob1 = Utilities.interpolate_cdf(
fault_hydro_exceedance.slip_pressure,
fault_hydro_exceedance.probability,
p1)
geo_prob1 = Utilities.interpolate_cdf(
fault_geo_cdf[!, geo_pressure_col],
fault_geo_cdf[!, geo_prob_col],
p1)
hydro_prob2 = Utilities.interpolate_cdf(
fault_hydro_exceedance.slip_pressure,
fault_hydro_exceedance.probability,
p2)
geo_prob2 = Utilities.interpolate_cdf(
fault_geo_cdf[!, geo_pressure_col],
fault_geo_cdf[!, geo_prob_col],
p2)
# Check if curves cross between p1 and p2
if (hydro_prob1 - geo_prob1) * (hydro_prob2 - geo_prob2) <= 0
# Found a crossing point - use linear interpolation
if hydro_prob1 == geo_prob1
# Exact intersection at p1
intersection_probability = hydro_prob1
elseif hydro_prob2 == geo_prob2
# Exact intersection at p2
intersection_probability = hydro_prob2
else
# Interpolate to find crossing point
t = (geo_prob1 - hydro_prob1) / ((hydro_prob2 - hydro_prob1) - (geo_prob2 - geo_prob1))
# Calculate probability at intersection point
intersection_probability = geo_prob1 + t * (geo_prob2 - geo_prob1)
end
intersection_found = true
break
end
end
# If no intersection found, use the higher of the two curves where they're closest
if !intersection_found
# Find the point where the curves are closest
min_diff = Inf
closest_probability = 0.0
for p in valid_pressures
# Use interpolate_cdf which now handles deduplication internally
hydro_prob = Utilities.interpolate_cdf(
fault_hydro_exceedance.slip_pressure,
fault_hydro_exceedance.probability,
p)
geo_prob = Utilities.interpolate_cdf(
fault_geo_cdf[!, geo_pressure_col],
fault_geo_cdf[!, geo_prob_col],
p)
diff = abs(hydro_prob - geo_prob)
if diff < min_diff
min_diff = diff
closest_probability = max(hydro_prob, geo_prob)
end
end
intersection_probability = closest_probability
end
slip_potential = intersection_probability
end
# Add to time series results with epoch timestamp
epoch_timestamp = JuliaFSPGraphs.date_to_js_timestamp(Date(year, 1, 1))
push!(fsp_through_time, (fault_id, year, slip_potential, epoch_timestamp))
end
end
# Sort results by ID and Year
sort!(fsp_through_time, [:ID, :Year])
return fsp_through_time
end
"""
Generate combined summary of results including both geomechanics and hydrology
"""
function generate_summary_report(fsp_results::DataFrame, prob_hydro_df::DataFrame, fault_df::DataFrame)
# Calculate statistics for hydrology results by year and fault
hydro_stats = combine(groupby(prob_hydro_df, [:Year, :ID]),
:Pressure => mean => :MeanPressure,
:Pressure => std => :StdDevPressure,
:Pressure => minimum => :MinPressure,
:Pressure => maximum => :MaxPressure
)
# Join with FSP results
summary_report = innerjoin(fsp_results, hydro_stats, on=[:Year, :ID])
# Add fault metadata if available
if !isempty(fault_df)
fault_id_col = "FaultID" in names(fault_df) ? "FaultID" : "ID"
if fault_id_col in names(fault_df)
# Select relevant columns from fault data
fault_metadata = select(fault_df,
fault_id_col => :ID,
["Strike", "Dip", "FrictionCoefficient", "slip_pressure"]
)
# Rename slip_pressure column to avoid confusion
if "slip_pressure" in names(fault_metadata)
rename!(fault_metadata, :slip_pressure => :DeterministicSlipPressure)
end
# Join with summary
summary_report = leftjoin(summary_report, fault_metadata, on=:ID)
end
end
return summary_report
end
function get_injection_dataset_path_summary_step(helper::TexNetWebToolLaunchHelperJulia, step_index::Int)
for param_name in ["injection_wells_annual_summary", "injection_wells_monthly_summary", "injection_tool_data_summary"]
filepath = get_dataset_file_path(helper, step_index, param_name)
if filepath !== nothing
if param_name == "injection_wells_annual_summary"
injection_data_type = "annual_fsp"
return filepath, injection_data_type
elseif param_name == "injection_wells_monthly_summary"
injection_data_type = "monthly_fsp"
return filepath, injection_data_type
elseif param_name == "injection_tool_data_summary"
injection_data_type = "injection_tool_data"
return filepath, injection_data_type
end
else
#println("$param_name not found")
end
end
return nothing, nothing
end
"""
Calculate fault slip potential with deterministic hydrology results
"""
function calculate_deterministic_fault_slip_potential(prob_geo_cdf::DataFrame, det_hydro_df::DataFrame, years_to_analyze::Vector{Int})
# Container for the time series results
fsp_through_time = DataFrame(
ID = String[],
Year = Int[],
FSP = Float64[],
epoch_time = Float64[]
)
# Get unique fault IDs
fault_ids = unique(det_hydro_df.ID)
for fault_id in fault_ids
# Get fault's geomechanics CDF
fault_cdf = prob_geo_cdf[string.(prob_geo_cdf.ID) .== fault_id, :]
if isempty(fault_cdf)
@warn "No geomechanics CDF data found for fault ID $fault_id"
continue
end
# Sort by slip pressure
sort!(fault_cdf, :slip_pressure)
# Get hydrology results for this fault
fault_pressures = det_hydro_df[det_hydro_df.ID .== fault_id, :]
# Process each year's pressure for this fault
for year in years_to_analyze
# Find the row for this year, if it exists
year_pressure_row = filter(row -> Dates.year(row.Date) == year, fault_pressures)
if isempty(year_pressure_row)
# No data for this year, skip or use 0
push!(fsp_through_time, (
fault_id,
year,
0.0,
JuliaFSPGraphs.date_to_js_timestamp(Date(year, 1, 1))
))
continue
end
# Get pressure value for this year
pressure = first(year_pressure_row.slip_pressure)
# Calculate slip potential
slip_prob = 0.0
# If pressure is less than minimum in CDF, slip potential is 0
if pressure < minimum(fault_cdf.slip_pressure)
slip_prob = 0.0
# If pressure is greater than maximum in CDF, slip potential is 1.0
elseif pressure > maximum(fault_cdf.slip_pressure)
slip_prob = 1.0
else
# Interpolate to find slip potential
slip_prob = Utilities.interpolate_cdf(
fault_cdf.slip_pressure,
fault_cdf.probability ./ 100.0, # Convert from percentage to 0-1 scale
pressure
)
end
# Add to time series results with epoch timestamp
epoch_timestamp = JuliaFSPGraphs.date_to_js_timestamp(Date(year, 1, 1))
push!(fsp_through_time, (fault_id, year, slip_prob, epoch_timestamp))
end
end
# Sort results by ID and Year
sort!(fsp_through_time, [:ID, :Year])
return fsp_through_time
end
"""
Run deterministic hydrology calculations for all years up to year_of_interest
Returns a DataFrame with fault ID, pressure, and year columns
"""
function run_deterministic_hydrology_time_series(
aquifer_thickness::Float64,
porosity::Float64,
permeability::Float64,
fluid_density::Float64,
dynamic_viscosity::Float64,
fluid_compressibility::Float64,
rock_compressibility::Float64,
fault_df::DataFrame,
injection_wells_df::DataFrame,
years_to_analyze::Vector{Int},
year_of_interest::Int,
injection_data_type::String
)
# Print key input parameter values for debugging
#println("===== DEBUG: Input Parameters for Deterministic Hydrology =====")
#println("aquifer_thickness: ", aquifer_thickness)
#println("porosity: ", porosity)
#println("permeability: ", permeability)
#println("fluid_density: ", fluid_density)
#println("dynamic_viscosity: ", dynamic_viscosity)
#println("fluid_compressibility: ", fluid_compressibility)
#println("rock_compressibility: ", rock_compressibility)
#println("===================================")
# Calculate storativity and transmissivity
S, T, rho = HydroCalculations.calcST(
aquifer_thickness,
porosity,
permeability,
fluid_density,
dynamic_viscosity,
9.81,
fluid_compressibility,
rock_compressibility
)
STRho = (S, T, rho)
#println("DEBUG: Calculated Storativity = $S, Transmissivity = $T")
# Get well IDs
well_id_col = injection_data_type == "injection_tool_data" ? "API Number" : "WellID"
if !(well_id_col in names(injection_wells_df))
# Try alternates
if "APINumber" in names(injection_wells_df)
well_id_col = "APINumber"
elseif "UIC Number" in names(injection_wells_df)
well_id_col = "UIC Number"
elseif "Well ID" in names(injection_wells_df)
well_id_col = "Well ID"
elseif "UWI" in names(injection_wells_df)
well_id_col = "UWI"
else
error("Could not identify well ID column in injection data")
end
end
# PRE-PROCESS WELL DATA FOR OPTIMIZATION (deterministic version)
well_info = preprocess_well_data(injection_wells_df, well_id_col, injection_data_type)
well_ids = collect(keys(well_info)) # Use pre-processed well IDs
# Get fault IDs
num_faults = nrow(fault_df)
fault_id_col = "FaultID" in names(fault_df) ? "FaultID" : "ID"
if !(fault_id_col in names(fault_df))
@warn "No FaultID column found, using sequential IDs"
fault_df[!, :TempID] = string.(1:num_faults)
fault_id_col = "TempID"
end
fault_ids = string.(fault_df[!, fault_id_col])
# Pre-process well data to get date boundaries
inj_start_date, inj_end_date = Utilities.get_date_bounds(injection_wells_df)
#println("Injection rate time window for all wells: inj_start_date = $inj_start_date, inj_end_date = $inj_end_date")
# Container for results
# Structure: year -> fault -> pressure
results = Dict{Int, Dict{String, Float64}}()
# Create years to analyze
if isempty(years_to_analyze)
years_to_analyze = year(inj_start_date):year_of_interest
end
# Find the max end date of all injections - we'll set the evaluation date to this for pressure diffusion
max_injection_year = year(inj_end_date)
#println("Maximum injection end date: $inj_end_date (year $max_injection_year)")
# Process each year
for analysis_year in years_to_analyze
# Set up year cutoff date (Dec 31 of the year BEFORE analysis year)
# This matches MATLAB's behavior: evaluate pressure at Jan 1, 12:00 AM of analysis year
# which means include injection up to Dec 31 of the previous year
cutoff_date = Date(analysis_year - 1, 12, 31)
# Initialize year results if needed
if !haskey(results, analysis_year)
results[analysis_year] = Dict{String, Float64}()
end
# Process each fault
for f in 1:num_faults
fault_id = fault_ids[f]
fault_lat = fault_df[f, "Latitude(WGS84)"]
fault_lon = fault_df[f, "Longitude(WGS84)"]
# Initialize total pressure for this fault
total_pressure = 0.0
# Process each well's contribution
for well_id in well_ids
# Use pre-processed well data (OPTIMIZED - deterministic)
well = well_info[well_id]
# Skip if the well hasn't started injecting by the analysis year
if well.start_year > analysis_year
continue
end
# Use pre-processed coordinates and dates
well_lat = well.latitude
well_lon = well.longitude
inj_start_date = well.start_date
inj_end_date = well.end_date
# Skip if injection hasn't started by the cutoff date
if inj_start_date > cutoff_date
continue
end
# Limit end date to the analysis year cutoff
actual_end_date = min(inj_end_date, cutoff_date)
actual_end_year = year(actual_end_date)
# Prepare injection data using pre-processed well data
days, rates = Utilities.prepare_well_data_for_pressure_scenario(
well.data, # Use pre-filtered DataFrame
well_id,
well.start_year, # Use pre-processed start year
inj_start_date,
actual_end_year,
actual_end_date,
injection_data_type,
analysis_year,
false, # Don't extrapolate
cutoff_date # December 31 of the year before analysis year (Jan 1 12:00 AM of analysis year)
)
if isempty(days) || isempty(rates)
continue
end
# Calculate days from injection start to analysis date
evaluation_days_from_start = Float64((cutoff_date - inj_start_date).value + 1)
# For years after injection has stopped, evaluation date should use the current year
# to properly model pressure diffusion over time
if analysis_year > max_injection_year
#println("Calculating pressure diffusion for year $analysis_year (after injection end)")
end
# Calculate pressure contribution from this well
pressure_contribution = HydroCalculations.pfieldcalc_all_rates(
fault_lon, #fault longitude
fault_lat, #fault latitude
STRho, #storativity, transmissivity, fluid density
days, #days of injection
rates, #rates of injection
well_lon, #well longitude
well_lat, #well latitude
evaluation_days_from_start #days from injection start to evaluation date
)
# Add to total pressure for this fault
total_pressure += pressure_contribution
end
# Ensure total pressure is non-negative before storing
total_pressure = max(0.0, total_pressure)
# Store result for this fault and year
results[analysis_year][fault_id] = total_pressure
end
end
# Convert nested dictionary to DataFrame
result_rows = []
for year in sort(collect(keys(results)))
for (fault_id, pressure) in results[year]
push!(result_rows, (
ID = fault_id,
Pressure = pressure,
Year = year
))
end
end
# Create DataFrame
results_df = DataFrame(result_rows)
return results_df
end
function main()
#println("\n=== Starting FSP Summary Process ===")
# 1) Get the inputs from the args.json file
scratchPath = ARGS[1]
helper = TexNetWebToolLaunchHelperJulia(scratchPath)
# Get year of interest
year_of_interest = get_parameter_value(helper, 6, "year_of_interest_summary")
#=
if year_of_interest === nothing
year_of_interest = Dates.year(Dates.today())
elseif !isa(year_of_interest, Int)
year_of_interest = parse(Int, year_of_interest)
end
=#
# print the number of threads used
#add_message_with_step_index!(helper, 6, "Number of threads used: $(nthreads())", 0)
#year_of_interest_date = Date(year_of_interest - 1, 12, 31)
# 2) Read injection wells data
injection_wells_csv_filepath, injection_data_type = get_injection_dataset_path_summary_step(helper, 6)
if injection_wells_csv_filepath === nothing
error("No injection wells dataset provided.")
elseif "WellID" in names(CSV.read(injection_wells_csv_filepath, DataFrame))
# check if we have a 'WellID' column
injection_wells_df = CSV.read(injection_wells_csv_filepath, DataFrame, types=Dict("WellID" => String), pool=false)
elseif "API Number" in names(CSV.read(injection_wells_csv_filepath, DataFrame))
# check if we have a 'API Number' column
injection_wells_df = CSV.read(injection_wells_csv_filepath, DataFrame, types=Dict("API Number" => String), pool=false)
elseif "APINumber" in names(CSV.read(injection_wells_csv_filepath, DataFrame))
# check if we have a 'APINumber' column
injection_wells_df = CSV.read(injection_wells_csv_filepath, DataFrame, types=Dict("APINumber" => String), pool=false)
else
error("No valid well ID column found in the injection wells dataset.")
end
# check the type of the WellID column
#println("DEBUG: WellID column type = $(typeof(injection_wells_df[!, "WellID"]))")
# Get the model that was run from the previous step
model_run = get_parameter_value(helper, 6, "model_run_summary")
if model_run === nothing
# Default to probabilistic if not specified
model_run = 1
#println("Model run type not specified, defaulting to probabilistic")
end
#println("Using model type: $model_run")
# 3) Read fault data
fault_data_path = get_dataset_file_path(helper, 6, "faults")
if fault_data_path === nothing
error("Required fault dataset not found.")
end
fault_df = CSV.read(fault_data_path, DataFrame, types=Dict("FaultID" => String), pool=false)
# 4) Read probabilistic geomechanics results
prob_geo_cdf_path = get_dataset_file_path(helper, 6, "prob_geomechanics_cdf_graph_data_summary")
if prob_geo_cdf_path === nothing
error("Probabilistic geomechanics CDF data not found.")
end
prob_geo_cdf = CSV.read(prob_geo_cdf_path, DataFrame, types=Dict("ID" => String), pool=false)
#println("Loaded probabilistic geomechanics data (first 10 rows) out of $(nrow(prob_geo_cdf)) rows:")
#pretty_table(prob_geo_cdf[1:10, :])
# 5) Get hydrology parameters
aquifer_thickness = get_parameter_value(helper, 4, "aquifer_thickness_ft")
porosity = get_parameter_value(helper, 4, "porosity")
permeability = get_parameter_value(helper, 4, "permeability_md")
fluid_density = get_parameter_value(helper, 4, "fluid_density")
dynamic_viscosity = get_parameter_value(helper, 4, "dynamic_viscosity")
fluid_compressibility = get_parameter_value(helper, 4, "fluid_compressibility")
rock_compressibility = get_parameter_value(helper, 4, "rock_compressibility")
for param in [aquifer_thickness, porosity, permeability, fluid_density, dynamic_viscosity, fluid_compressibility, rock_compressibility]