Skip to content

Commit c68122e

Browse files
Merge pull request #609 from eqcorrscan/cc-squared
[WIP] Add option to output cross-correlation * abs(cross-correlation)
2 parents 9115cbb + b3f6996 commit c68122e

File tree

9 files changed

+211
-82
lines changed

9 files changed

+211
-82
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
- More stable extraction of data for correlation
1111
- utils.findpeaks
1212
- Added pick-time declustering.
13+
- utils.correlate
14+
- Added option to produce "cc-squared" (cc * abs(cc)) detection statistic
1315

1416
## 0.5.0
1517
* core.match_filter.tribe

eqcorrscan/core/match_filter/helpers/tribe.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ def _corr_and_peaks(
311311
concurrency: str,
312312
cores: int,
313313
i: int,
314+
cc_squared: bool,
314315
export_cccsums: bool,
315316
parallel: bool,
316317
peak_cores: int,
@@ -336,6 +337,9 @@ def _corr_and_peaks(
336337
:param concurrency: Concurrency of cross-correlation function
337338
:param cores: Cores (threads) to use for cross-correlation
338339
:param i: Group-id (internal book-keeping)
340+
:param cc_squared:
341+
Whether to detect using "cc_squared" (actually cc * abs(cc)) or
342+
just using cc.
339343
:param export_cccsums: Whether to export the raw cross-correlation sums
340344
:param parallel: Whether to compute peaks in parallel
341345
:param peak_cores: Number of cores (threads) to use for peak finding
@@ -358,6 +362,7 @@ def _corr_and_peaks(
358362
f"Starting correlation run for template group {i}")
359363
tic = default_timer()
360364
if prepped and xcorr_func == "fmf":
365+
assert cc_squared == False, "FMF does not support squared correlation"
361366
assert isinstance(templates, np.ndarray)
362367
assert isinstance(stream, np.ndarray)
363368
# These need to be passed from queues.
@@ -387,7 +392,8 @@ def _corr_and_peaks(
387392
cccsums, tr_chans = fftw_multi_normxcorr(
388393
template_array=templates, stream_array=stream,
389394
pad_array=pads, seed_ids=seed_ids, cores_inner=num_cores_inner,
390-
cores_outer=num_cores_outer, stack=True, **kwargs)
395+
cores_outer=num_cores_outer, stack=True, cc_squared=cc_squared,
396+
**kwargs)
391397
n_templates = len(cccsums)
392398
# Post processing
393399
no_chans = np.sum(np.array(tr_chans).astype(int), axis=0)
@@ -408,7 +414,8 @@ def _corr_and_peaks(
408414
multichannel_normxcorr = get_stream_xcorr(xcorr_func, concurrency)
409415
Logger.debug(f"Calling {multichannel_normxcorr}")
410416
cccsums, no_chans, chans = multichannel_normxcorr(
411-
templates=templates, stream=stream, cores=cores, **kwargs
417+
templates=templates, stream=stream, cores=cores,
418+
cc_squared=cc_squared, **kwargs
412419
)
413420
if len(cccsums[0]) == 0:
414421
raise MatchFilterError(

eqcorrscan/core/match_filter/matched_filter.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ def __str__(self):
6161
# Note: maintained for backwards compatability. All efforts now in tribes
6262
def match_filter(template_names, template_list, st, threshold,
6363
threshold_type, trig_int, plot=False, plotdir=None,
64-
xcorr_func=None, concurrency=None, cores=None,
65-
plot_format='png', output_cat=False,
64+
xcorr_func=None, concurrency=None, cc_squared=False,
65+
cores=None, plot_format='png', output_cat=False,
6666
extract_detections=False, arg_check=True, full_peaks=False,
6767
peak_cores=None, export_cccsums=False, **kwargs):
6868
"""
@@ -115,6 +115,10 @@ def match_filter(template_names, template_list, st, threshold,
115115
The type of concurrency to apply to the xcorr function. Options are
116116
'multithread', 'multiprocess', 'concurrent'. For more details see
117117
:func:`eqcorrscan.utils.correlate.get_stream_xcorr`
118+
:type cc_squared: bool
119+
:param cc_squared:
120+
Whether to detect using "cc_squared" (actually cc * abs(cc)) or
121+
just using cc.
118122
:type cores: int
119123
:param cores: Number of cores to use
120124
:type plot_format: str
@@ -319,7 +323,9 @@ def match_filter(template_names, template_list, st, threshold,
319323
overlap="calculate", full_peaks=full_peaks, save_progress=False,
320324
process_cores=None, pre_processed=True, check_processing=False,
321325
return_stream=False, plot_format=plot_format,
322-
peak_cores=peak_cores, export_cccsums=export_cccsums, **kwargs
326+
peak_cores=peak_cores, export_cccsums=export_cccsums,
327+
cc_squared=cc_squared,
328+
**kwargs
323329
)
324330
detections = [d for f in party for d in f]
325331

eqcorrscan/core/match_filter/tribe.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -675,11 +675,11 @@ def cluster(self, method, **kwargs):
675675
def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
676676
plotdir=None, parallel_process=True,
677677
xcorr_func=None, concurrency=None, cores=None,
678-
concurrent_processing=False, ignore_length=False,
679-
ignore_bad_data=False, group_size=None, overlap="calculate",
680-
full_peaks=False, save_progress=False, process_cores=None,
681-
pre_processed=False, check_processing=True, make_events=True,
682-
min_stations=0, **kwargs):
678+
cc_squared=False, concurrent_processing=False,
679+
ignore_length=False, ignore_bad_data=False, group_size=None,
680+
overlap="calculate", full_peaks=False, save_progress=False,
681+
process_cores=None, pre_processed=False, check_processing=True,
682+
make_events=True, min_stations=0, **kwargs):
683683
"""
684684
Detect using a Tribe of templates within a continuous stream.
685685
@@ -720,6 +720,10 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
720720
:func:`eqcorrscan.utils.correlate.get_stream_xcorr`
721721
:type cores: int
722722
:param cores: Number of workers for processing and detection.
723+
:type cc_squared: bool
724+
:param cc_squared:
725+
Whether to detect using "cc_squared" (actually cc * abs(cc)) or
726+
just using cc.
723727
:type concurrent_processing: bool
724728
:param concurrent_processing:
725729
Whether to process steps in detection workflow concurrently or not.
@@ -895,8 +899,9 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False,
895899
process_cores, ignore_length, overlap,
896900
ignore_bad_data, group_size, groups, sampling_rate, threshold,
897901
threshold_type, save_progress, xcorr_func, concurrency, cores,
898-
export_cccsums, parallel, peak_cores, trig_int, full_peaks,
899-
plot, plotdir, plot_format, make_events, min_stations,)
902+
cc_squared, export_cccsums, parallel, peak_cores, trig_int,
903+
full_peaks, plot, plotdir, plot_format, make_events,
904+
min_stations,)
900905

901906
if concurrent_processing:
902907
party = self._detect_concurrent(*args, **inner_kwargs)
@@ -922,9 +927,9 @@ def _detect_serial(
922927
self, stream, template_ids, pre_processed, parallel_process,
923928
process_cores, ignore_length, overlap, ignore_bad_data,
924929
group_size, groups, sampling_rate, threshold, threshold_type,
925-
save_progress, xcorr_func, concurrency, cores, export_cccsums,
926-
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
927-
make_events, min_stations, **kwargs
930+
save_progress, xcorr_func, concurrency, cores, cc_squared,
931+
export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot,
932+
plotdir, plot_format, make_events, min_stations, **kwargs
928933
):
929934
""" Internal serial detect workflow. """
930935
from eqcorrscan.core.match_filter.helpers.tribe import (
@@ -977,7 +982,8 @@ def _detect_serial(
977982
threshold=threshold, threshold_type=threshold_type,
978983
trig_int=trig_int, sampling_rate=sampling_rate,
979984
full_peaks=full_peaks, plot=plot, plotdir=plotdir,
980-
plot_format=plot_format, prepped=False, **kwargs)
985+
plot_format=plot_format, prepped=False,
986+
cc_squared=cc_squared, **kwargs)
981987
Logger.debug(chans)
982988

983989
detections = _detect(
@@ -1014,9 +1020,9 @@ def _detect_concurrent(
10141020
self, stream, template_ids, pre_processed, parallel_process,
10151021
process_cores, ignore_length, overlap, ignore_bad_data,
10161022
group_size, groups, sampling_rate, threshold, threshold_type,
1017-
save_progress, xcorr_func, concurrency, cores, export_cccsums,
1018-
parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format,
1019-
make_events, min_stations, **kwargs
1023+
save_progress, xcorr_func, concurrency, cores, cc_squared,
1024+
export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot,
1025+
plotdir, plot_format, make_events, min_stations, **kwargs
10201026
):
10211027
""" Internal concurrent detect workflow. """
10221028
from eqcorrscan.core.match_filter.helpers.processes import (
@@ -1180,7 +1186,8 @@ def _detect_concurrent(
11801186
threshold=threshold, threshold_type=threshold_type,
11811187
trig_int=trig_int, sampling_rate=sampling_rate,
11821188
full_peaks=full_peaks, plot=plot, plotdir=plotdir,
1183-
plot_format=plot_format, **inner_kwargs
1189+
plot_format=plot_format, cc_squared=cc_squared,
1190+
**inner_kwargs
11841191
)
11851192
peaks_queue.put(
11861193
(starttime, all_peaks, thresholds, no_chans, chans,
@@ -1251,9 +1258,9 @@ def client_detect(self, client, starttime, endtime, threshold,
12511258
threshold_type, trig_int, plot=False, plotdir=None,
12521259
min_gap=None, parallel_process=True,
12531260
xcorr_func=None, concurrency=None, cores=None,
1254-
concurrent_processing=False, ignore_length=False,
1255-
ignore_bad_data=False, group_size=None,
1256-
return_stream=False, full_peaks=False,
1261+
cc_squared=False, concurrent_processing=False,
1262+
ignore_length=False, ignore_bad_data=False,
1263+
group_size=None, return_stream=False, full_peaks=False,
12571264
save_progress=False, process_cores=None, retries=3,
12581265
check_processing=True, make_events=True,
12591266
min_stations=0, **kwargs):
@@ -1304,6 +1311,10 @@ def client_detect(self, client, starttime, endtime, threshold,
13041311
:func:`eqcorrscan.utils.correlate.get_stream_xcorr`
13051312
:type cores: int
13061313
:param cores: Number of workers for processing and detection.
1314+
:type cc_squared: bool
1315+
:param cc_squared:
1316+
Whether to detect using "cc_squared" (actually cc * abs(cc)) or
1317+
just using cc.
13071318
:type concurrent_processing: bool
13081319
:param concurrent_processing:
13091320
Whether to process steps in detection workflow concurrently or not.
@@ -1462,7 +1473,8 @@ def client_detect(self, client, starttime, endtime, threshold,
14621473
return_stream=return_stream, check_processing=False,
14631474
poison_queue=poison_queue, shutdown=False,
14641475
concurrent_processing=concurrent_processing, groups=groups,
1465-
make_events=make_events, min_stations=min_stations)
1476+
make_events=make_events, min_stations=min_stations,
1477+
cc_squared=cc_squared)
14661478
detector_kwargs.update(kwargs)
14671479

14681480
if not concurrent_processing:

eqcorrscan/tests/correlate_test.py

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,45 @@ def gappy_real_cc_dict(gappy_real_cc_output_dict):
490490
return {name: (result[0][0], result[1])
491491
for name, result in gappy_real_cc_output_dict.items()}
492492

493+
# ------------------------------------ correlation ** abs(correlation)
494+
495+
@pytest.fixture(scope='module')
496+
def stream_cc_output_dict_corrsq(
497+
multichannel_templates, multichannel_stream):
498+
""" return a dict of outputs from all stream_xcorr functions """
499+
# corr._get_array_dicts(multichannel_templates, multichannel_stream)
500+
for tr in multichannel_stream:
501+
tr.data = tr.data[0:unstacked_stream_len]
502+
multichannel_templates = multichannel_templates[0:5]
503+
out = {}
504+
for name, func in stream_funcs.items():
505+
if name.startswith("fmf"):
506+
print("Skipping fmf - unstacked not implemented")
507+
continue
508+
for cores in [1, cpu_count()]:
509+
print("Running {0} with {1} cores".format(name, cores))
510+
511+
cc_out = time_func(func, name, multichannel_templates,
512+
multichannel_stream, cores=cores, stack=True,
513+
cc_squared=True)
514+
out["{0}.{1}".format(name, cores)] = cc_out
515+
if "fftw" in name and cores > 1:
516+
print("Running outer core parallel")
517+
# Make sure that using both parallel methods gives the same
518+
# result
519+
cc_out = time_func(
520+
func, name, multichannel_templates, multichannel_stream,
521+
cores=1, cores_outer=cores, stack=True, cc_squared=True)
522+
out["{0}.{1}_outer".format(name, cores)] = cc_out
523+
return out
524+
525+
526+
@pytest.fixture(scope='module')
527+
def stream_cc_dict_corrsq(stream_cc_output_dict_corrsq):
528+
""" return just the cc arrays from the stream_cc functions """
529+
return {name: result[0]
530+
for name, result in stream_cc_output_dict_corrsq.items()}
531+
493532

494533
# ------------------------------------ unstacked setup
495534

@@ -759,6 +798,24 @@ def test_gappy_real_multi_channel_xcorr(self, gappy_real_cc_dict):
759798
assert np.allclose(cc_1, cc, atol=self.atol * 100)
760799

761800

801+
@pytest.mark.serial
802+
class TestStreamCorrelateFunctionsCorrSq:
803+
""" same thing as TestArrayCorrelateFunction but for stream interface """
804+
atol = TestArrayCorrelateFunctions.atol
805+
806+
def test_multi_channel_xcorr(self, stream_cc_dict_corrsq):
807+
""" test various correlation methods with multiple channels """
808+
# get correlation results into a list
809+
cc_names = list(stream_cc_dict_corrsq.keys())
810+
cc_list = [stream_cc_dict_corrsq[cc_name] for cc_name in cc_names]
811+
cc_1 = cc_list[0]
812+
# loop over correlations and compare each with the first in the list
813+
# this will ensure all cc are "close enough"
814+
for cc_name, cc in zip(cc_names[2:], cc_list[2:]):
815+
assert np.allclose(cc_1, cc, atol=self.atol)
816+
817+
818+
762819
@pytest.mark.serial
763820
class TestStreamCorrelateFunctionsUnstacked:
764821
""" same thing as TestArrayCorrelateFunction but for stream interface """
@@ -900,10 +957,10 @@ def test_callable_registered(self, multichannel_templates,
900957
""" ensure a callable can be registered """
901958
small_count = {}
902959

903-
def some_callable(template_array, stream_array, pad_array):
960+
def some_callable(template_array, stream_array, pad_array, cc_squared):
904961
small_count['name'] = 1
905962
return corr.numpy_normxcorr(template_array, stream_array,
906-
pad_array)
963+
pad_array, cc_squared)
907964

908965
func = corr.get_stream_xcorr(some_callable)
909966
func(multichannel_templates, multichannel_stream)
@@ -929,7 +986,7 @@ def func(templates, streams, pads):
929986
def test_using_custom_function_doesnt_change_default(self):
930987
""" ensure a custom function will not change the default """
931988

932-
def func(templates, streams, pads):
989+
def func(templates, streams, pads, cc_squared):
933990
pass
934991

935992
default = corr.get_array_xcorr(None)

0 commit comments

Comments
 (0)