-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathkWaveSimulation.py
More file actions
1555 lines (1306 loc) · 67 KB
/
kWaveSimulation.py
File metadata and controls
1555 lines (1306 loc) · 67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import warnings
from copy import deepcopy
from dataclasses import dataclass
import numpy as np
from deprecated import deprecated
from kwave.data import Vector
from kwave.kgrid import kWaveGrid
from kwave.kmedium import kWaveMedium
from kwave.ksensor import kSensor
from kwave.ksource import kSource
from kwave.ktransducer import NotATransducer
from kwave.kWaveSimulation_helper import (
create_absorption_variables,
display_simulation_params,
expand_grid_matrices,
scale_source_terms_func,
set_sound_speed_ref,
)
from kwave.options.simulation_options import SimulationOptions, SimulationType
from kwave.recorder import Recorder
from kwave.utils.checks import check_stability
from kwave.utils.colormap import get_color_map
from kwave.utils.conversion import cart2grid, cast_to_type
from kwave.utils.data import get_date_string, get_smallest_possible_type
from kwave.utils.dotdictionary import dotdict
from kwave.utils.filters import smooth
from kwave.utils.matlab import matlab_find, matlab_mask
from kwave.utils.matrix import num_dim2
@dataclass
class kWaveSimulation(object):
def __init__(
self, kgrid: kWaveGrid, source: kSource, sensor: NotATransducer, medium: kWaveMedium, simulation_options: SimulationOptions
):
self.precision = None
self.kgrid = kgrid
self.medium = medium
self.original_source = source
self.original_sensor = sensor
self.source = deepcopy(source)
self.sensor = deepcopy(sensor)
self.options = simulation_options
# =========================================================================
# FLAGS WHICH DEPEND ON USER INPUTS (THESE SHOULD NOT BE MODIFIED)
# =========================================================================
# flags which control the characteristics of the sensor
#: Whether the sensor sensor mask is defined by cuboid corners
#: Whether time reversal simulation is enabled
# check if the sensor mask is defined as a list of cuboid corners
if self.sensor.mask is not None and self.sensor.mask.shape[0] == (2 * self.kgrid.dim):
self.userarg_cuboid_corners = True
else:
self.userarg_cuboid_corners = False
# check if performing time reversal, and replace inputs to explicitly use a
# source with a dirichlet boundary condition
if self.sensor.time_reversal_boundary_data is not None:
warnings.warn(
"Time reversal simulation is deprecated. Use TimeReversal class instead. See examples/pr_2D_TR_line_sensor/pr_2D_TR_line_sensor.py for an example."
)
# define a new source structure
self.source = kSource()
self.source.p_mask = self.sensor.mask
self.source.p = np.flip(self.sensor.time_reversal_boundary_data, 1)
self.source.p_mode = "dirichlet"
# define a new sensor structure
Nx, Ny, Nz = self.kgrid.Nx, self.kgrid.Ny, self.kgrid.Nz
self.sensor = kSensor(mask=np.ones((Nx, Ny, max(1, Nz))), record=["p_final"])
# set time reversal flag
self.userarg_time_rev = True
self.record = Recorder()
self.record.p = False
else:
# set time reversal flag
self.userarg_time_rev = False
#: Whether sensor.mask should be re-ordered.
#: True if sensor.mask is Cartesian with nearest neighbour interpolation which is calculated using a binary mask
#: and thus must be re-ordered
self.reorder_data = False
#: Whether the sensor.mask is binary
self.binary_sensor_mask = True
#: If tse sensor is an object of the kWaveTransducer class
self.transducer_sensor = False
self.record = Recorder()
# transducer source flags
#: transducer is object of kWaveTransducer class
self.transducer_source = False
#: Apply receive elevation focus on the transducer
self.transducer_receive_elevation_focus = False
# general
self.COLOR_MAP = get_color_map() #: default color map
self.ESTIMATE_SIM_TIME_STEPS = 50 #: time steps used to estimate simulation time
self.HIGHEST_PRIME_FACTOR_WARNING = 7 #: largest prime factor before warning
self.KSPACE_CFL = 0.3 #: default CFL value used if kgrid.t_array is set to 'auto'
self.PSTD_CFL = 0.1 #: default CFL value used if kgrid.t_array is set to 'auto'
# source types
self.SOURCE_S_MODE_DEF = "additive" #: source mode for stress sources
self.SOURCE_P_MODE_DEF = "additive" #: source mode for pressure sources
self.SOURCE_U_MODE_DEF = "additive" #: source mode for velocity sources
# filenames
self.STREAM_TO_DISK_FILENAME = "temp_sensor_data.bin" #: default disk stream filename
self.LOG_NAME = ["k-Wave-Log-", get_date_string()] #: default log filename
self.calling_func_name = None
logging.log(logging.INFO, f" start time: {get_date_string()}")
self.c_ref, self.c_ref_compression, self.c_ref_shear = [None] * 3
self.transducer_input_signal = None
#: Indexing variable corresponding to the location of all the pressure source elements
self.p_source_pos_index = None
#: Indexing variable corresponding to the location of all the velocity source elements
self.u_source_pos_index = None
#: Indexing variable corresponding to the location of all the stress source elements
self.s_source_pos_index = None
#: Delay mask that accounts for the beamforming delays and elevation focussing
self.delay_mask = None
self.absorb_nabla1 = None #: absorbing fractional Laplacian operator
self.absorb_tau = None #: absorbing fractional Laplacian coefficient
self.absorb_nabla2 = None #: dispersive fractional Laplacian operator
self.absorb_eta = None #: dispersive fractional Laplacian coefficient
self.dt = None #: Alias to kgrid.dt
self.rho0 = None #: Alias to medium.density
self.c0 = None #: Alias to medium.sound_speed
self.index_data_type = None
@property
def equation_of_state(self):
"""
Returns:
Set equation of state variable
"""
if self.medium.absorbing:
if self.medium.stokes:
return "stokes"
else:
return "absorbing"
else:
return "loseless"
@property
def use_sensor(self):
"""
Returns:
False if no output of any kind is required
"""
return self.sensor is not None
@property
def blank_sensor(self):
"""
Returns
True if sensor.mask is not defined but _max_all or _final variables are still recorded
"""
fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"]
if not (isinstance(self.sensor, NotATransducer) or any(self.record.is_set(fields)) or self.time_rev):
return True
return False
@property
def kelvin_voigt_model(self):
"""
Returns:
Whether the simulation is elastic with absorption
"""
return False
@property
def nonuniform_grid(self):
"""
Returns:
True if the computational grid is non-uniform
"""
return self.kgrid.nonuniform
@property
@deprecated(version="0.4.1", reason="Use TimeReversal class instead")
def time_rev(self) -> bool:
if self.sensor and not isinstance(self.sensor, NotATransducer):
return not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None
return self.userarg_time_rev
@property
@deprecated(version="0.4.1", reason="Use TimeReversal class instead")
def elastic_time_rev(self):
"""
Returns:
True if using time reversal with the elastic code
"""
return False
@property
def compute_directivity(self):
"""
Returns:
True if directivity calculations in 2D are used by setting sensor.directivity_angle
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if self.kgrid.dim == 2:
# check for sensor directivity input and set flag
directivity = self.sensor.directivity
if directivity is not None and directivity.angle is not None:
return True
return False
@property
def cuboid_corners(self):
"""
Returns:
Whether the sensor.mask is a list of cuboid corners
"""
if self.sensor is not None and not isinstance(self.sensor, NotATransducer):
if not self.blank_sensor and self.sensor.mask.shape[0] == 2 * self.kgrid.dim:
return True
return self.userarg_cuboid_corners
##############
# flags which control the types of source used
##############
@property
def source_p0(self): # initial pressure
"""
Returns:
Whether initial pressure source is present (default=False)
"""
flag = False # default
if not isinstance(self.source, NotATransducer) and self.source.p0 is not None:
# set flag
flag = True
return flag
@property
def source_p0_elastic(self): # initial pressure in the elastic code
"""
Returns:
Whether initial pressure source is present in the elastic code (default=False)
"""
# Not clear where this flag is set
return False
@property
def source_p(self):
"""
Returns:
Whether time-varying pressure source is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.p is not None:
# set source flag to the length of the source, this allows source.p
# to be shorter than kgrid.Nt
flag = len(self.source.p[0])
return flag
@property
def source_p_labelled(self): # time-varying pressure with labelled source mask
"""
Returns:
True/False if labelled/binary source mask, respectively.
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.p is not None:
# check if the mask is binary or labelled
p_unique = np.unique(self.source.p_mask)
flag = not (p_unique.size <= 2 and p_unique.sum() == 1)
return flag
@property
def source_ux(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in X-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.ux is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.ux[0])
return flag
@property
def source_uy(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in Y-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.uy is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.uy[0])
return flag
@property
def source_uz(self) -> bool:
"""
Returns:
Whether time-varying particle velocity source is used in Z-direction
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.uz is not None:
# set source flgs to the length of the sources, this allows the
# inputs to be defined independently and be of any length
flag = len(self.source.uz[0])
return flag
@property
def source_u_labelled(self):
"""
Returns:
Whether time-varying velocity source with labelled source mask is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.u_mask is not None:
# check if the mask is binary or labelled
u_unique = np.unique(self.source.u_mask)
if u_unique.size <= 2 and u_unique.sum() == 1:
# binary source mask
flag = False
else:
# labelled source mask
flag = True
return flag
@property
def source_sxx(self):
"""
Returns:
Whether time-varying stress source in X->X direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxx is not None:
flag = len(self.source.sxx[0])
return flag
@property
def source_syy(self):
"""
Returns:
Whether time-varying stress source in Y->Y direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.syy is not None:
flag = len(self.source.syy[0])
return flag
@property
def source_szz(self):
"""
Returns:
Whether time-varying stress source in Z->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.szz is not None:
flag = len(self.source.szz[0])
return flag
@property
def source_sxy(self):
"""
Returns:
Whether time-varying stress source in X->Y direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxy is not None:
flag = len(self.source.sxy[0])
return flag
@property
def source_sxz(self):
"""
Returns:
Whether time-varying stress source in X->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.sxz is not None:
flag = len(self.source.sxz[0])
return flag
@property
def source_syz(self):
"""
Returns:
Whether time-varying stress source in Y->Z direction is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.syz is not None:
flag = len(self.source.syz[0])
return flag
@property
def source_s_labelled(self):
"""
Returns:
Whether time-varying stress source with labelled source mask is present (default=False)
"""
flag = False
if not isinstance(self.source, NotATransducer) and self.source.s_mask is not None:
# check if the mask is binary or labelled
s_unique = np.unique(self.source.s_mask)
if s_unique.size <= 2 and s_unique.sum() == 1:
# binary source mask
flag = False
else:
# labelled source mask
flag = True
return flag
@property
def use_w_source_correction_p(self):
"""
Returns:
Whether to use the w source correction instead of the k-space source correction for pressure sources
"""
flag = False
if not isinstance(self.source, NotATransducer):
if self.source.p is not None and self.source.p_frequency_ref is not None:
flag = True
return flag
@property
def use_w_source_correction_u(self):
"""
Returns:
Whether to use the w source correction instead of the k-space source correction for velocity sources
"""
flag = False
if not isinstance(self.source, NotATransducer):
if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]):
if self.source.u_frequency_ref is not None:
flag = True
return flag
def input_checking(self, calling_func_name) -> None:
"""
Check the input fields for correctness and validness
Args:
calling_func_name: Name of the script that calls this function
Returns:
None
"""
self.calling_func_name = calling_func_name
k_dim = self.kgrid.dim
self.check_calling_func_name_and_dim(calling_func_name, k_dim)
# run subscript to check optional inputs
self.options = SimulationOptions.option_factory(self.kgrid, self.options)
opt = self.options
# TODO(Walter): clean this up with getters in simulation options pml size
pml_x_size, pml_y_size, pml_z_size = opt.pml_x_size, opt.pml_y_size, opt.pml_z_size
pml_size = Vector([pml_x_size, pml_y_size, pml_z_size])
is_elastic_code = opt.simulation_type.is_elastic_simulation()
self.print_start_status(is_elastic_code=is_elastic_code)
self.set_index_data_type()
user_medium_density_input = self.check_medium(self.medium, self.kgrid.k, simulation_type=opt.simulation_type)
# select the reference sound speed used in the k-space operator
self.c_ref, self.c_ref_compression, self.c_ref_shear = set_sound_speed_ref(self.medium, opt.simulation_type)
self.check_source(k_dim, self.kgrid.Nt)
self.check_sensor(k_dim)
self.check_kgrid_time()
self.precision = self.select_precision(opt)
self.check_input_combinations(opt, user_medium_density_input, k_dim, pml_size, self.kgrid.N)
# run subscript to display time step, max supported frequency etc.
display_simulation_params(self.kgrid, self.medium, is_elastic_code)
self.smooth_and_enlarge(self.source, k_dim, Vector(self.kgrid.N), opt)
self.create_sensor_variables()
self.create_absorption_vars()
self.assign_pseudonyms(self.medium, self.kgrid)
self.scale_source_terms(opt.scale_source_terms)
self.create_pml_indices(
kgrid_dim=self.kgrid.dim,
kgrid_N=Vector(self.kgrid.N),
pml_size=pml_size,
pml_inside=opt.pml_inside,
is_axisymmetric=opt.simulation_type.is_axisymmetric(),
)
@staticmethod
def check_calling_func_name_and_dim(calling_func_name, kgrid_dim) -> None:
"""
Check correct function has been called for the dimensionality of kgrid
Args:
calling_func_name: Name of the script that makes calls to kWaveSimulation
kgrid_dim: Dimensionality of the kWaveGrid
Returns:
None
"""
assert not calling_func_name.startswith(("pstdElastic", "kspaceElastic")), "Elastic simulation is not supported."
if calling_func_name == "kspaceFirstOrder1D":
assert kgrid_dim == 1, f"kgrid has the wrong dimensionality for {calling_func_name}."
elif calling_func_name in ["kspaceFirstOrder2D", "pstdElastic2D", "kspaceElastic2D", "kspaceFirstOrderAS"]:
assert kgrid_dim == 2, f"kgrid has the wrong dimensionality for {calling_func_name}."
elif calling_func_name in ["kspaceFirstOrder3D", "pstdElastic3D", "kspaceElastic3D"]:
assert kgrid_dim == 3, f"kgrid has the wrong dimensionality for {calling_func_name}."
@staticmethod
def print_start_status(is_elastic_code: bool) -> None:
"""
Update command-line status with the start time
Args:
is_elastic_code: is the simulation elastic
Returns:
None
"""
if is_elastic_code: # pragma: no cover
logging.log(logging.INFO, "Running k-Wave elastic simulation...")
else:
logging.log(logging.INFO, "Running k-Wave simulation...")
logging.log(logging.INFO, f" start time: {get_date_string()}")
def set_index_data_type(self) -> None:
"""
Pre-calculate the data type needed to store the matrix indices given the
total number of grid points: indexing variables will be created using this data type to save memory
Returns:
None
"""
total_grid_points = self.kgrid.total_grid_points
self.index_data_type = get_smallest_possible_type(total_grid_points, "uint", default="double")
@staticmethod
def check_medium(medium, kgrid_k, simulation_type: SimulationType) -> bool:
"""
Check the properties of the medium structure for correctness and validity
Args:
medium: kWaveMedium instance
kgrid_k: kWaveGrid.k matrix
is_elastic: Whether the simulation is elastic
is_axisymmetric: Whether the simulation is axisymmetric
Returns:
Medium Density
"""
# if using the fluid code, allow the density field to be blank if the medium is homogeneous
if (not simulation_type.is_elastic_simulation()) and medium.density is None and medium.sound_speed.size == 1:
user_medium_density_input = False
medium.density = 1
else:
medium.ensure_defined("density")
user_medium_density_input = True
# check medium absorption inputs for the fluid code
is_absorbing = any(medium.is_defined("alpha_coeff", "alpha_power"))
is_stokes = simulation_type.is_axisymmetric() or medium.alpha_mode == "stokes"
medium.set_absorbing(is_absorbing, is_stokes)
if is_absorbing:
medium.check_fields(kgrid_k.shape)
return user_medium_density_input
def check_sensor(self, kgrid_dim) -> None:
"""
Check the Sensor properties for correctness and validity
Args:
k_dim: kWaveGrid dimensionality
Returns:
None
"""
# =========================================================================
# CHECK SENSOR STRUCTURE INPUTS
# =========================================================================
# check sensor fields
if self.sensor is not None:
# check the sensor input is valid
# TODO FARID move this check as a type checking
assert isinstance(
self.sensor, (kSensor, NotATransducer)
), "sensor must be defined as an object of the kSensor or kWaveTransducer class."
# check if sensor is a transducer, otherwise check input fields
if isinstance(self.sensor, kSensor):
if kgrid_dim == 2:
# check for sensor directivity input and set flag
directivity = self.sensor.directivity
if directivity is not None and self.sensor.directivity.angle is not None:
# make sure the sensor mask is not blank
assert self.sensor.mask is not None, "The mask must be defined for the sensor"
# check sensor.directivity.pattern and sensor.mask have the same size
assert (
directivity.angle.shape == self.sensor.mask.shape
), "sensor.directivity.angle and sensor.mask must be the same size."
# check if directivity size input exists, otherwise make it
# a constant times kgrid.dx
if directivity.size is None:
directivity.set_default_size(self.kgrid)
# find the unique directivity angles
# assign the wavenumber vectors
directivity.set_unique_angles(self.sensor.mask)
directivity.set_wavenumbers(self.kgrid)
# check for time reversal inputs and set flags
if not self.options.simulation_type.is_elastic_simulation() and self.sensor.time_reversal_boundary_data is not None:
self.record.p = False
# check for sensor.record and set usage flgs - if no flgs are
# given, the time history of the acoustic pressure is recorded by
# default
if self.sensor.record is not None:
# check for time reversal data
if self.time_rev:
logging.log(logging.WARN, "sensor.record is not used for time reversal reconstructions")
# check the input is a cell array
assert isinstance(self.sensor.record, list), 'sensor.record must be given as a list, e.g. ["p", "u"]'
# check the sensor record flgs
self.record.set_flags_from_list(self.sensor.record, self.options.simulation_type.is_elastic_simulation())
# enforce the sensor.mask field unless just recording the max_all
# and _final variables
fields = ["p", "p_max", "p_min", "p_rms", "u", "u_non_staggered", "u_split_field", "u_max", "u_min", "u_rms", "I", "I_avg"]
if any(self.record.is_set(fields)):
assert self.sensor.mask is not None
# check if sensor mask is a binary grid, a set of cuboid corners,
# or a set of Cartesian interpolation points
if not self.blank_sensor:
if self._is_binary_sensor_mask(kgrid_dim):
# check the grid is binary
assert self.sensor.mask.sum() == (
self.sensor.mask.size - (self.sensor.mask == 0).sum()
), "sensor.mask must be a binary grid (numeric values must be 0 or 1)."
# check the grid is not empty
assert self.sensor.mask.sum() != 0, "sensor.mask must be a binary grid with at least one element set to 1."
elif self._is_cuboid_corners_mask(kgrid_dim):
# make sure the points are integers
assert np.all(self.sensor.mask % 1 == 0), "sensor.mask cuboid corner indices must be integers."
# store a copy of the cuboid corners
self.record.cuboid_corners_list = self.sensor.mask
# check the list makes sense
if np.any(self.sensor.mask[self.kgrid.dim :, :] - self.sensor.mask[: self.kgrid.dim, :] < 0):
if kgrid_dim == 1:
raise ValueError("sensor.mask cuboid corners must be defined " "as [x1, x2; ...]." " where x2 => x1, etc.")
elif kgrid_dim == 2:
raise ValueError(
"sensor.mask cuboid corners must be defined " "as [x1, y1, x2, y2; ...]." " where x2 => x1, etc."
)
elif kgrid_dim == 3:
raise ValueError(
"sensor.mask cuboid corners must be defined"
" as [x1, y1, z1, x2, y2, z2; ...]."
" where x2 => x1, etc."
)
# check the list are within bounds
if np.any(self.sensor.mask < 1):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
else:
if kgrid_dim == 1:
if np.any(self.sensor.mask > self.kgrid.Nx):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
elif kgrid_dim == 2:
if np.any(self.sensor.mask[[0, 2], :] > self.kgrid.Nx) or np.any(
self.sensor.mask[[1, 3], :] > self.kgrid.Ny
):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
elif kgrid_dim == 3:
if (
np.any(self.sensor.mask[[0, 3], :] > self.kgrid.Nx)
or np.any(self.sensor.mask[[1, 4], :] > self.kgrid.Ny)
or np.any(self.sensor.mask[[2, 5], :] > self.kgrid.Nz)
):
raise ValueError("sensor.mask cuboid corners must be within the grid.")
# create a binary mask for display from the list of corners
# TODO FARID mask should be option_factory in sensor not here
self.sensor.mask = np.zeros_like(self.kgrid.k, dtype=bool)
cuboid_corners_list = self.record.cuboid_corners_list
for cuboid_index in range(cuboid_corners_list.shape[1]):
if self.kgrid.dim == 1:
self.sensor.mask[cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[1, cuboid_index]] = 1
if self.kgrid.dim == 2:
self.sensor.mask[
cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[2, cuboid_index],
cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[3, cuboid_index],
] = 1
if self.kgrid.dim == 3:
self.sensor.mask[
cuboid_corners_list[0, cuboid_index] : cuboid_corners_list[3, cuboid_index],
cuboid_corners_list[1, cuboid_index] : cuboid_corners_list[4, cuboid_index],
cuboid_corners_list[2, cuboid_index] : cuboid_corners_list[5, cuboid_index],
] = 1
else:
# check the Cartesian sensor mask is the correct size
# (1 x N, 2 x N, 3 x N)
assert (
self.sensor.mask.shape[0] == kgrid_dim and num_dim2(self.sensor.mask) <= 2
), f"Cartesian sensor.mask for a {kgrid_dim}D simulation must be given as a {kgrid_dim} by N array."
# set Cartesian mask flag (this is modified in
# createStorageVariables if the interpolation setting is
# set to nearest)
self.binary_sensor_mask = False
# extract Cartesian data from sensor mask
if kgrid_dim == 1:
# align sensor data as a column vector to be the
# same as kgrid.x_vec so that calls to interp1
# return data in the correct dimension
self.sensor_x = np.reshape((self.sensor.mask, (-1, 1)))
# add sensor_x to the record structure for use with
# the _extractSensorData subfunction
self.record.sensor_x = self.sensor_x
"record.sensor_x = sensor_x;"
elif kgrid_dim == 2:
self.sensor_x = self.sensor.mask[0, :]
self.sensor_y = self.sensor.mask[1, :]
elif kgrid_dim == 3:
self.sensor_x = self.sensor.mask[0, :]
self.sensor_y = self.sensor.mask[1, :]
self.sensor_z = self.sensor.mask[2, :]
# compute an equivalent sensor mask using nearest neighbour
# interpolation, if flgs.time_rev = false and
# cartesian_interp = 'linear' then this is only used for
# display, if flgs.time_rev = true or cartesian_interp =
# 'nearest' this grid is used as the sensor.mask
self.sensor.mask, self.order_index, self.reorder_index = cart2grid(
self.kgrid, self.sensor.mask, self.options.simulation_type.is_axisymmetric()
)
# if in time reversal mode, reorder the p0 input data in
# the order of the binary sensor_mask
if self.time_rev:
raise NotImplementedError
"""
# append the reordering data
new_col_pos = length(sensor.time_reversal_boundary_data(1, :)) + 1;
sensor.time_reversal_boundary_data(:, new_col_pos) = order_index;
# reorder p0 based on the order_index
sensor.time_reversal_boundary_data = sort_rows(sensor.time_reversal_boundary_data, new_col_pos);
# remove the reordering data
sensor.time_reversal_boundary_data = sensor.time_reversal_boundary_data(:, 1:new_col_pos - 1);
"""
else:
# set transducer sensor flag
self.transducer_sensor = True
self.record.p = False
# check to see if there is an elevation focus
if not np.isinf(self.sensor.elevation_focus_distance):
# set flag
self.transducer_receive_elevation_focus = True
# get the elevation mask that is used to extract the correct values
# from the sensor data buffer for averaging
self.transducer_receive_mask = self.sensor.elevation_beamforming_mask
# check for directivity inputs with time reversal
if kgrid_dim == 2 and self.use_sensor and self.compute_directivity and self.time_rev:
logging.log(logging.WARN, "sensor directivity fields are not used for time reversal.")
def check_source(self, k_dim, k_Nt) -> None:
"""
Check the source properties for correctness and validity
Args:
kgrid_dim: kWaveGrid dimension
k_Nt: Number of time steps in kWaveGrid
Returns:
None
"""
# =========================================================================
# CHECK SOURCE STRUCTURE INPUTS
# =========================================================================
# check source inputs
if not isinstance(self.source, (kSource, NotATransducer)):
# allow an invalid or empty source input if computing time reversal,
# otherwise return error
assert self.time_rev, "source must be defined as an object of the kSource or kWaveTransducer classes."
elif not isinstance(self.source, NotATransducer):
# --------------------------
# SOURCE IS NOT A TRANSDUCER
# --------------------------
"""
check allowable source types
Depending on the kgrid dimensionality and the simulation type,
following fields are allowed & might be use:
kgrid.dim == 1:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'u_mask', 'u_mode', 'u_frequency_ref']
kgrid.dim == 2:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'u_mask', 'u_mode', 'u_frequency_ref']
elastic code:
['p0', 'sxx', 'syy', 'sxy', 's_mask', 's_mode', 'ux', 'uy', 'u_mask', 'u_mode']
kgrid.dim == 3:
non-elastic code:
['p0', 'p', 'p_mask', 'p_mode', 'p_frequency_ref', 'ux', 'uy', 'uz', 'u_mask', 'u_mode', 'u_frequency_ref']
elastic code:
['p0', 'sxx', 'syy', 'szz', 'sxy', 'sxz', 'syz', 's_mask', 's_mode', 'ux', 'uy', 'uz', 'u_mask', 'u_mode']
"""
self.source.validate(self.kgrid)
# check for a time varying pressure source input
if self.source.p is not None:
# check the source mode input is valid
if self.source.p_mode is None:
self.source.p_mode = self.SOURCE_P_MODE_DEF
if self.source_p > k_Nt:
logging.log(logging.WARN, " source.p has more time points than kgrid.Nt, remaining time points will not be used.")
# create an indexing variable corresponding to the location of all the source elements
self.p_source_pos_index = matlab_find(self.source.p_mask)
# check if the mask is binary or labelled
p_unique = np.unique(self.source.p_mask)
# create a second indexing variable
if p_unique.size <= 2 and p_unique.sum() == 1:
# set signal index to all elements
self.p_source_sig_index = ":"
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
self.p_source_sig_index = self.source.p_mask(self.source.p_mask != 0)
# convert the data type depending on the number of indices
self.p_source_pos_index = cast_to_type(self.p_source_pos_index, self.index_data_type)
if self.source_p_labelled:
self.p_source_sig_index = cast_to_type(self.p_source_sig_index, self.index_data_type)
# check for time varying velocity source input and set source flag
if any([(getattr(self.source, k) is not None) for k in ["ux", "uy", "uz", "u_mask"]]):
# check the source mode input is valid
if self.source.u_mode is None:
self.source.u_mode = self.SOURCE_U_MODE_DEF
# create an indexing variable corresponding to the location of all
# the source elements
self.u_source_pos_index = matlab_find(self.source.u_mask)
# check if the mask is binary or labelled
u_unique = np.unique(self.source.u_mask)
# create a second indexing variable
if u_unique.size <= 2 and u_unique.sum() == 1:
# set signal index to all elements
self.u_source_sig_index = ":"
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
self.u_source_sig_index = self.source.u_mask[self.source.u_mask != 0]
# convert the data type depending on the number of indices
self.u_source_pos_index = cast_to_type(self.u_source_pos_index, self.index_data_type)
if self.source_u_labelled:
self.u_source_sig_index = cast_to_type(self.u_source_sig_index, self.index_data_type)
# check for time varying stress source input and set source flag
if any([(getattr(self.source, k) is not None) for k in ["sxx", "syy", "szz", "sxy", "sxz", "syz", "s_mask"]]):
# create an indexing variable corresponding to the location of all
# the source elements
raise NotImplementedError
"s_source_pos_index = find(source.s_mask != 0);"
# check if the mask is binary or labelled
"s_unique = unique(source.s_mask);"
# create a second indexing variable
if eng.eval("numel(s_unique) <= 2 && sum(s_unique) == 1"): # noqa: F821
# set signal index to all elements
eng.workspace["s_source_sig_index"] = ":" # noqa: F821
else:
# set signal index to the labels (this allows one input signal
# to be used for each source label)
s_source_sig_index = source.s_mask(source.s_mask != 0) # noqa
f"s_source_pos_index = {self.index_data_type}(s_source_pos_index);"
if self.source_s_labelled:
f"s_source_sig_index = {self.index_data_type}(s_source_sig_index);"
else:
# ----------------------
# SOURCE IS A TRANSDUCER
# ----------------------
# if the sensor is a transducer, check that the simulation is in 3D
assert k_dim == 3, "Transducer inputs are only compatible with 3D simulations."
# get the input signal - this is appended with zeros if required to
# account for the beamforming delays (this will throw an error if the
# input signal is not defined)
self.transducer_input_signal = self.source.input_signal
# get the delay mask that accounts for the beamforming delays and
# elevation focussing; this is used so that a single time series can be
# applied to the complete transducer mask with different delays
delay_mask = self.source.delay_mask()
# set source flag - this should be the length of signal minus the
# maximum delay
self.transducer_source = self.transducer_input_signal.size - delay_mask.max()
# get the active elements mask
active_elements_mask = self.source.active_elements_mask
# get the apodization mask if not set to 'Rectangular' and convert to a
# linear array
if self.source.transmit_apodization == "Rectangular":
self.transducer_transmit_apodization = 1
else:
self.transducer_transmit_apodization = self.source.transmit_apodization_mask
self.transducer_transmit_apodization = self.transducer_transmit_apodization[active_elements_mask != 0]
# create indexing variable corresponding to the active elements
# and convert the data type depending on the number of indices
self.u_source_pos_index = matlab_find(active_elements_mask).astype(self.index_data_type)
# convert the delay mask to an indexing variable (this doesn't need to
# be modified if the grid is expanded) which tells each point in the
# source mask which point in the input_signal should be used
delay_mask = matlab_mask(delay_mask, active_elements_mask != 0) # compatibility
# convert the data type depending on the maximum value of the delay
# mask and the length of the source
smallest_type = get_smallest_possible_type(delay_mask.max(), "uint")
if smallest_type is not None:
delay_mask = delay_mask.astype(smallest_type)