diff --git a/CHANGES.md b/CHANGES.md index d09acf044..c7866f821 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,8 @@ - Made handling of horizontal_chans and vertical_chans consistent so that user can freely choose relevant channels. * utils.correlate + - Weighted correlations supported. Template weights are expected in + individual trace.stats.extra.weight attributes. - Fast Matched Filter now supported natively for version >= 1.4.0 - Only full correlation stacks are returned now (e.g. where fewer than than the full number of channels are in the stack at the end of the stack, zeros diff --git a/eqcorrscan/doc/submodules/utils.correlate.rst b/eqcorrscan/doc/submodules/utils.correlate.rst index dd05c82c9..b069aafff 100644 --- a/eqcorrscan/doc/submodules/utils.correlate.rst +++ b/eqcorrscan/doc/submodules/utils.correlate.rst @@ -102,6 +102,18 @@ supported natively, and can be used as a backend by setting `xcorr_func="fmf"`. Fast Matched Filter +Using weights +~~~~~~~~~~~~~ + +New for version 0.5.0: support for weighting cross-correlation stacks. All array +correlation functions have an optional `weights` argument to allow you to weight +different templates. This makes most sense when using the stream correlation functions +where different template channels might have different quality. For the stream +correlation functions the weight for each channel of each template is required to +be stored in `trace.stats.extra.weight`. Individual channel cross-correlations +are multiplied by this weight before stacking. Weights can be any floating point +number, although weights between 0 and 1 make the most sense. + Switching which correlation function is used ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -141,10 +153,10 @@ for example: ... 1, False, xcorr_func='numpy') >>> # do correlation using a custom function - >>> def custom_normxcorr(templates, stream, pads, *args, **kwargs): + >>> def custom_normxcorr(templates, stream, pads, weights *args, **kwargs): ... # Just to keep example short call other xcorr function ... print('calling custom xcorr function') - ... return numpy_normxcorr(templates, stream, pads, *args, **kwargs) + ... return numpy_normxcorr(templates, stream, pads, weights, *args, **kwargs) >>> detections = match_filter( ... ['1'], [template], stream, .5, 'absolute', 1, False, diff --git a/eqcorrscan/tests/correlate_test.py b/eqcorrscan/tests/correlate_test.py index d14ebba96..b748234cb 100644 --- a/eqcorrscan/tests/correlate_test.py +++ b/eqcorrscan/tests/correlate_test.py @@ -9,6 +9,7 @@ import pytest from multiprocessing import cpu_count from obspy import Trace, Stream, read +from obspy.core.util import AttribDict from scipy.fftpack import next_fast_len import eqcorrscan.utils.correlate as corr @@ -98,6 +99,16 @@ def generate_multichannel_templates(): return templates +def generate_weighted_multichannel_templates(): + random.seed(42) + templates = generate_multichannel_templates() + for template in templates: + for tr in template: + tr.stats.extra = AttribDict() + tr.stats.extra.update({"weight": random.random()}) + return templates + + def read_gappy_real_template(): return [read(join(pytest.test_data_path, "DUWZ_template.ms"))] @@ -164,6 +175,8 @@ def print_class_name(request): # array fixtures starting_index = 500 +GLOBAL_WEIGHT = 0.5 # Weight applied to array ccs funcs +# global to allow comparison to unweighted @pytest.fixture(scope='module') @@ -194,6 +207,29 @@ def pads(array_template): return np.zeros(array_template.shape[0], dtype=int) +@pytest.fixture(scope='module') +def array_ccs_weighted(array_template, array_stream, pads): + """ Use each function stored in the normxcorr cache to correlate the + templates and arrays, return a dict with keys as func names and values + as the cc calculated by said function""" + out = {} + weights = np.ones(array_template.shape[0]) * GLOBAL_WEIGHT + for name in list(corr.XCORR_FUNCS_ORIGINAL.keys()): + func = corr.get_array_xcorr(name) + print("Running %s" % name) + cc, _ = time_func( + func, name, array_template, array_stream, pads, weights) + out[name] = cc + if "fftw" in name: + print("Running fixed len fft") + fft_len = next_fast_len( + max(len(array_stream) // 4, len(array_template))) + cc, _ = time_func(func, name, array_template, array_stream, pads, + weights, fft_len=fft_len) + out[name + "_fixed_len"] = cc + return out + + @pytest.fixture(scope='module') def array_ccs(array_template, array_stream, pads): """ Use each function stored in the normxcorr cache to correlate the @@ -251,6 +287,12 @@ def multichannel_templates(): return generate_multichannel_templates() +@pytest.fixture(scope='module') +def weighted_multichannel_templates(): + """ create weighted multichannel templates """ + return generate_weighted_multichannel_templates() + + @pytest.fixture(scope='module') def multichannel_stream(): """ create a multichannel stream for tests """ @@ -290,8 +332,7 @@ def real_multichannel_stream(): if fname != 'default'} -@pytest.fixture(scope='module') -def stream_cc_output_dict(multichannel_templates, multichannel_stream): +def _stream_cc_output_dict(multichannel_templates, multichannel_stream): """ return a dict of outputs from all stream_xcorr functions """ # corr._get_array_dicts(multichannel_templates, multichannel_stream) out = {} @@ -320,12 +361,35 @@ def stream_cc_output_dict(multichannel_templates, multichannel_stream): return out +@pytest.fixture(scope='module') +def stream_cc_output_dict(multichannel_templates, multichannel_stream): + """ return a dict of outputs from all stream_xcorr functions """ + return _stream_cc_output_dict( + multichannel_templates=multichannel_templates, + multichannel_stream=multichannel_stream + ) + + +@pytest.fixture(scope='module') +def stream_cc_weighted_output_dict(weighted_multichannel_templates, multichannel_stream): + return _stream_cc_output_dict( + multichannel_templates=weighted_multichannel_templates, + multichannel_stream=multichannel_stream + ) + + @pytest.fixture(scope='module') def stream_cc_dict(stream_cc_output_dict): """ return just the cc arrays from the stream_cc functions """ return {name: result[0] for name, result in stream_cc_output_dict.items()} +@pytest.fixture(scope='module') +def stream_cc_weighted_dict(stream_cc_weighted_output_dict): + """ return just the cc arrays from the stream_cc functions """ + return {name: result[0] for name, result in stream_cc_weighted_output_dict.items()} + + @pytest.fixture(scope='module') def real_stream_cc_output_dict(real_templates, real_multichannel_stream): """ return a dict of outputs from all stream_xcorr functions """ @@ -589,6 +653,21 @@ def test_single_channel_similar(self, array_ccs): np.save("cc2.npy", cc2) assert np.allclose(cc1, cc2, atol=self.atol) + def test_known_weight_application(self, array_ccs, array_ccs_weighted): + cc_names = list(array_ccs.keys()) + print(cc_names) + for key1, key2 in itertools.combinations(cc_names, 2): + cc1 = array_ccs_weighted[key1] + cc2 = array_ccs_weighted[key2] + print(f"Comparing {key1} and {key2}") + assert np.allclose(cc1, cc2, atol=self.atol) + for cc_name in cc_names: + print(f"Checking for {cc_name}") + assert np.allclose( + array_ccs_weighted[cc_name], + array_ccs[cc_name] * GLOBAL_WEIGHT, + atol=1e-5) + def test_autocorrelation(self, array_ccs): """ ensure an autocorrelation occurred in each of ccs where it is expected, defined by starting_index variable """ @@ -623,6 +702,17 @@ def test_multi_channel_xcorr(self, stream_cc_dict): for cc_name, cc in zip(cc_names[2:], cc_list[2:]): assert np.allclose(cc_1, cc, atol=self.atol) + def test_weighted_multi_channel_xcorr(self, stream_cc_weighted_dict): + """ test various correlation methods weighted with multiple channels """ + # get correlation results into a list + cc_names = list(stream_cc_weighted_dict.keys()) + cc_list = [stream_cc_weighted_dict[cc_name] for cc_name in cc_names] + cc_1 = cc_list[0] + # loop over correlations and compare each with the first in the list + # this will ensure all cc are "close enough" + for cc_name, cc in zip(cc_names[2:], cc_list[2:]): + assert np.allclose(cc_1, cc, atol=self.atol) + def test_real_multi_channel_xcorr(self, real_stream_cc_dict): """ test various correlation methods with multiple channels """ # get correlation results into a list @@ -837,10 +927,11 @@ def test_callable_registered(self, multichannel_templates, """ ensure a callable can be registered """ small_count = {} - def some_callable(template_array, stream_array, pad_array): + def some_callable(template_array, stream_array, pad_array, + weight_array): small_count['name'] = 1 return corr.numpy_normxcorr(template_array, stream_array, - pad_array) + pad_array, weight_array) func = corr.get_stream_xcorr(some_callable) func(multichannel_templates, multichannel_stream) @@ -883,7 +974,7 @@ class TestRegisterNormXcorrs: # helper functions def name_func_is_registered(self, func_name): """ return True if func is registered as a normxcorr func """ - # Note: don not remove this fixture or bad things will happen + # Note: do not remove this fixture or bad things will happen name = func_name.__name__ if callable(func_name) else func_name return name in corr.XCOR_FUNCS diff --git a/eqcorrscan/utils/correlate.py b/eqcorrscan/utils/correlate.py index 916c944ea..c12d0cc17 100644 --- a/eqcorrscan/utils/correlate.py +++ b/eqcorrscan/utils/correlate.py @@ -184,10 +184,11 @@ def pool_boy(Pool, traces, **kwargs): def _pool_normxcorr(templates, stream, stack, pool, func, *args, **kwargs): chans = [[] for _i in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=stack) - stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple + stream_dict, template_dict, pad_dict, weight_dict, seed_ids = array_dict_tuple # get parameter iterator - params = ((template_dict[sid], stream_dict[sid], pad_dict[sid]) - for sid in seed_ids) + params = ( + (template_dict[sid], stream_dict[sid], pad_dict[sid], weight_dict[sid]) + for sid in seed_ids) # get cc results and used chans into their own lists results = [pool.apply_async(func, param) for param in params] try: @@ -236,7 +237,7 @@ def stream_xcorr(templates, stream, stack=True, *args, **kwargs): no_chans = np.zeros(len(templates)) chans = [[] for _ in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=stack) - stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple + stream_dict, template_dict, pad_dict, weight_dict, seed_ids = array_dict_tuple if stack: cccsums = np.zeros([len(templates), len(stream[0]) - len(templates[0][0]) + 1]) @@ -246,7 +247,8 @@ def stream_xcorr(templates, stream, stack=True, *args, **kwargs): for chan_no, seed_id in enumerate(seed_ids): tr_cc, tr_chans = func(template_dict[seed_id], stream_dict[seed_id], - pad_dict[seed_id]) + pad_dict[seed_id], + weight_dict[seed_id]) if stack: cccsums = np.sum([cccsums, tr_cc], axis=0) else: @@ -405,9 +407,9 @@ def get_array_xcorr(name_or_func=None): @register_array_xcorr('numpy') -def numpy_normxcorr(templates, stream, pads, *args, **kwargs): +def numpy_normxcorr(templates, stream, pads, weights=None, *args, **kwargs): """ - Compute the normalized cross-correlation using numpy and bottleneck. + Compute normalized cross-correlations of multiple templates using numpy. :param templates: 2D Array of templates :type templates: np.ndarray @@ -415,12 +417,20 @@ def numpy_normxcorr(templates, stream, pads, *args, **kwargs): :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list + :param weights: 1D Array of weights to match templates + :type weights: np.ndarray :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ import bottleneck + # Handle weights + n_templates = templates.shape[0] + if weights is None: + weights = np.ones(n_templates) + assert weights.shape[0] == templates.shape[0], "Weights required for all "\ + "templates" # Generate a template mask used_chans = ~np.isnan(templates).any(axis=1) # Currently have to use float64 as bottleneck runs into issues with other @@ -454,6 +464,7 @@ def numpy_normxcorr(templates, stream, pads, *args, **kwargs): for i, pad in enumerate(pads): res[i] = np.append(res[i], np.zeros(pad))[pad:] + res *= weights.reshape(n_templates, 1) return res.astype(np.float32), used_chans @@ -470,8 +481,8 @@ def _centered(arr, newshape): @register_array_xcorr('time_domain') -def time_multi_normxcorr(templates, stream, pads, threaded=False, *args, - **kwargs): +def time_multi_normxcorr(templates, stream, pads, weights=None, threaded=False, + *args, **kwargs): """ Compute cross-correlations in the time-domain using C routine. @@ -481,12 +492,20 @@ def time_multi_normxcorr(templates, stream, pads, threaded=False, *args, :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list + :param weights: 1D Array of weights to match templates + :type weights: np.ndarray :param threaded: Whether to use the threaded routine or not :type threaded: bool :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ + # Handle weights + n_templates = templates.shape[0] + if weights is None: + weights = np.ones(n_templates) + assert weights.shape[0] == templates.shape[0], "Weights required for all "\ + "templates" used_chans = ~np.isnan(templates).any(axis=1) utilslib = _load_cdll('libutils') @@ -514,7 +533,6 @@ def time_multi_normxcorr(templates, stream, pads, threaded=False, *args, templates = templates.astype(np.float32) - templates_means stream = stream.astype(np.float32) - stream_mean template_len = templates.shape[1] - n_templates = templates.shape[0] image_len = stream.shape[0] ccc_length = image_len - template_len + 1 assert ccc_length > 0, "Template must be shorter than stream" @@ -532,11 +550,14 @@ def time_multi_normxcorr(templates, stream, pads, threaded=False, *args, ccc[i] = np.append(ccc[i], np.zeros(pads[i]))[pads[i]:] templates += templates_means stream += stream_mean + # Apply weights + ccc *= weights.reshape(n_templates, 1) return ccc, used_chans @register_array_xcorr('fftw', is_default=True) -def fftw_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs): +def fftw_normxcorr(templates, stream, pads, weights=None, threaded=False, + *args, **kwargs): """ Normalised cross-correlation using the fftw library. @@ -556,6 +577,8 @@ def fftw_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs): :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list + :param weights: 1D Array of weights to match templates + :type weights: np.ndarray :param threaded: Whether to use the threaded routine or not - note openMP and python multiprocessing don't seem to play nice for this. @@ -564,6 +587,12 @@ def fftw_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs): :return: np.ndarray of cross-correlations :return: np.ndarray channels used """ + # Handle weights + n_templates = templates.shape[0] + if weights is None: + weights = np.ones(n_templates) + assert weights.shape[0] == templates.shape[0], "Weights required for all "\ + "templates" utilslib = _load_cdll('libutils') argtypes = [ @@ -659,6 +688,8 @@ def fftw_normxcorr(templates, stream, pads, threaded=False, *args, **kwargs): variance_warning[0])) # Remove variance correction stream /= multiplier + # Apply weights + ccc *= weights.reshape(n_templates, 1) return ccc, used_chans @@ -693,7 +724,7 @@ def _time_threaded_normxcorr(templates, stream, stack=True, *args, **kwargs): no_chans = np.zeros(len(templates)) chans = [[] for _ in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=stack) - stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple + stream_dict, template_dict, pad_dict, weight_dict, seed_ids = array_dict_tuple ccc_length = max( len(stream[0]) - len(templates[0][0]) + 1, len(templates[0][0]) - len(stream[0]) + 1) @@ -704,7 +735,7 @@ def _time_threaded_normxcorr(templates, stream, stack=True, *args, **kwargs): for chan_no, seed_id in enumerate(seed_ids): tr_cc, tr_chans = time_multi_normxcorr( template_dict[seed_id], stream_dict[seed_id], pad_dict[seed_id], - True) + weights=weight_dict[seed_id], threaded=True) if stack: cccsums = np.sum([cccsums, tr_cc], axis=0) else: @@ -774,12 +805,13 @@ def _fftw_stream_xcorr(templates, stream, stack=True, *args, **kwargs): chans = [[] for _i in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=stack) - stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple + stream_dict, template_dict, pad_dict, weight_dict, seed_ids = array_dict_tuple assert set(seed_ids) cccsums, tr_chans = fftw_multi_normxcorr( template_array=template_dict, stream_array=stream_dict, pad_array=pad_dict, seed_ids=seed_ids, cores_inner=num_cores_inner, - cores_outer=num_cores_outer, stack=stack, *args, **kwargs) + cores_outer=num_cores_outer, weight_array=weight_dict, + stack=stack, *args, **kwargs) no_chans = np.sum(np.array(tr_chans).astype(int), axis=0) for seed_id, tr_chan in zip(seed_ids, tr_chans): for chan, state in zip(chans, tr_chan): @@ -799,6 +831,7 @@ def fftw_multi_normxcorr( seed_ids, cores_inner, cores_outer, + weight_array=None, stack=True, *args, **kwargs @@ -814,6 +847,8 @@ def fftw_multi_normxcorr( :param pad_array: :type seed_ids: list :param seed_ids: + :type weight_array: dict + :param weight_array: rtype: np.ndarray, list :return: 3D Array of cross-correlations and list of used channels. @@ -834,6 +869,8 @@ def fftw_multi_normxcorr( flags='C_CONTIGUOUS'), np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), + np.ctypeslib.ndpointer(dtype=np.float32, + flags='C_CONTIGUOUS'), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.intc, flags='C_CONTIGUOUS'), @@ -853,7 +890,8 @@ def fftw_multi_normxcorr( fft-length used channels (stacked as per templates) pad array (stacked as per templates) - num threads inner + weight array (stack as per templates) + num thread inner num threads outer variance warnings missed correlation warnings (usually due to gaps) @@ -910,6 +948,8 @@ def fftw_multi_normxcorr( used_chans_np = np.ascontiguousarray(used_chans, dtype=np.intc) pad_array_np = np.ascontiguousarray( [pad_array[seed_id] for seed_id in seed_ids], dtype=np.intc) + weight_array_np = np.ascontiguousarray( + [weight_array[seed_id] for seed_id in seed_ids], dtype=np.float32) variance_warnings = np.ascontiguousarray( np.zeros(n_channels), dtype=np.intc) missed_correlations = np.ascontiguousarray( @@ -918,7 +958,7 @@ def fftw_multi_normxcorr( # call C function ret = utilslib.multi_normxcorr_fftw( template_array, n_templates, template_len, n_channels, stream_array, - image_len, cccs, fft_len, used_chans_np, pad_array_np, + image_len, cccs, fft_len, used_chans_np, pad_array_np, weight_array_np, cores_inner, cores_outer, variance_warnings, missed_correlations, int(stack)) if ret < 0: @@ -985,7 +1025,8 @@ def _run_fmf_xcorr(template_arr, data_arr, weights, pads, arch, step=1): @register_array_xcorr("fmf") -def fmf_xcorr(templates, stream, pads, arch="precise", *args, **kwargs): +def fmf_xcorr(templates, stream, pads, weights=None, arch="precise", + *args, **kwargs): """ Compute cross-correlations in the time-domain using the FMF routine. @@ -995,6 +1036,8 @@ def fmf_xcorr(templates, stream, pads, arch="precise", *args, **kwargs): :type stream: np.ndarray :param pads: List of ints of pad lengths in the same order as templates :type pads: list + :param weights: 1D Array of weights to match templates + :type weights: np.ndarray :param arch: "gpu" or "precise" to run on GPU or CPU respectively type arch: str @@ -1003,6 +1046,12 @@ def fmf_xcorr(templates, stream, pads, arch="precise", *args, **kwargs): """ assert templates.ndim == 2, "Templates must be 2D" assert stream.ndim == 1, "Stream must be 1D" + # Handle weights + n_templates = templates.shape[0] + if weights is None: + weights = np.ones(n_templates) + assert weights.shape[0] == templates.shape[0], "Weights required for all "\ + "templates" used_chans = ~np.isnan(templates).any(axis=1) @@ -1011,7 +1060,7 @@ def fmf_xcorr(templates, stream, pads, arch="precise", *args, **kwargs): template_arr=templates.reshape( (1, templates.shape[0], templates.shape[1])).swapaxes(0, 1), data_arr=stream.reshape((1, stream.shape[0])), - weights=np.ones((1, templates.shape[0])), + weights=np.array([weights]), pads=np.array([pads]), arch=arch.lower()) @@ -1077,7 +1126,7 @@ def _fmf_multi_xcorr(templates, stream, *args, **kwargs): chans = [[] for _i in range(len(templates))] array_dict_tuple = _get_array_dicts(templates, stream, stack=True) - stream_dict, template_dict, pad_dict, seed_ids = array_dict_tuple + stream_dict, template_dict, pad_dict, weight_dict, seed_ids = array_dict_tuple assert set(seed_ids) # Reshape templates into [templates x traces x time] @@ -1088,7 +1137,8 @@ def _fmf_multi_xcorr(templates, stream, *args, **kwargs): # Moveouts should be [templates x traces] pads = np.array([pad_dict[seed_id] for seed_id in seed_ids]).swapaxes(0, 1) # Weights should be shaped like pads - weights = np.ones_like(pads) + weights = np.array([weight_dict[seed_id] + for seed_id in seed_ids]).swapaxes(0, 1) cccsums = _run_fmf_xcorr( template_arr=t_arr, weights=weights, pads=pads, @@ -1141,6 +1191,14 @@ def get_stream_xcorr(name_or_func=None, concurrency=None): # --------------------------- stream prep functions +def _get_weight(tr, default_weight=1): + if not hasattr(tr.stats, "extra"): + return default_weight + if not hasattr(tr.stats.extra, "weight"): + return default_weight + return tr.stats.extra.weight + + def _get_array_dicts(templates, stream, stack, copy_streams=True): """ prepare templates and stream, return dicts """ # Do some reshaping @@ -1148,6 +1206,7 @@ def _get_array_dicts(templates, stream, stack, copy_streams=True): template_dict = {} stream_dict = {} pad_dict = {} + weight_dict = {} t_starts = [] stream.sort(['network', 'station', 'location', 'channel']) @@ -1164,6 +1223,10 @@ def _get_array_dicts(templates, stream, stack, copy_streams=True): temps_with_seed = [template.traces[i].data for template in templates] t_ar = np.array(temps_with_seed).astype(np.float32) template_dict.update({seed_id: t_ar}) + # Get weights - stored in trace.stats.extra.weight + weights = np.array([_get_weight(template[i]) + for template in templates]) + weight_dict.update({seed_id: weights}) stream_channel = _stream_quick_select(stream, seed_id.split('_')[0])[0] # Normalize data to ensure no float overflow stream_data = stream_channel.data / (np.max( @@ -1189,7 +1252,7 @@ def _get_array_dicts(templates, stream, stack, copy_streams=True): pad_list = [0 for _ in templates] pad_dict.update({seed_id: pad_list}) - return stream_dict, template_dict, pad_dict, seed_ids + return stream_dict, template_dict, pad_dict, weight_dict, seed_ids # Remove fmf if it isn't installed diff --git a/eqcorrscan/utils/src/libutils.h b/eqcorrscan/utils/src/libutils.h index 15d3fd5bc..1dc5b6c7b 100644 --- a/eqcorrscan/utils/src/libutils.h +++ b/eqcorrscan/utils/src/libutils.h @@ -74,7 +74,8 @@ int running_mean_var(double*, double*, int*, float*, long, long); int normxcorr_fftw_main(float*, long, long, float*, long, int, int, float*, long, float*, float*, float*, fftwf_complex*, fftwf_complex*, fftwf_complex*, fftwf_plan, fftwf_plan, fftwf_plan, - int*, int*, int, int*, int*, int); + int*, int*, float*, int, int*, int*, int); + int normxcorr_fftw_threaded( float*, long, long, float*, long, float*, long, int*, int*, int*, int*); @@ -88,7 +89,7 @@ void free_fftw_arrays( fftw_complex**); int multi_normxcorr_fftw( - float*, long, long, long, float*, long, float*, long, int*, int*, int, + float*, long, long, long, float*, long, float*, long, int*, int*, float,* int, int, int*, int*, int); int normxcorr_fftw(float*, long, long, float*, long, float*, long, int*, int*, int*, int*); diff --git a/eqcorrscan/utils/src/multi_corr.c b/eqcorrscan/utils/src/multi_corr.c index 5eca6261b..aa510154f 100644 --- a/eqcorrscan/utils/src/multi_corr.c +++ b/eqcorrscan/utils/src/multi_corr.c @@ -23,7 +23,8 @@ static inline int set_ncc( long t, long i, int chan, int n_chans, long template_len, long image_len, - float value, int *used_chans, int *pad_array, float *ncc, int stack_option); + float value, int *used_chans, int *pad_array, float *weight_array, float *ncc, + int stack_option); // Functions @@ -45,6 +46,8 @@ int normxcorr_fftw_threaded(float *templates, long template_len, long n_template ncc: Output for cross-correlation - should be pointer to memory - must be n_templates x image_len - template_len + 1 fft_len: Size for fft (n1) + + Note - no weighting is applied */ long N2 = fft_len / 2 + 1; long i, t, startind; @@ -55,9 +58,17 @@ int normxcorr_fftw_threaded(float *templates, long template_len, long n_template float * template_ext = (float *) calloc(fft_len * n_templates, sizeof(float)); float * image_ext = (float *) calloc(fft_len, sizeof(float)); float * ccc = (float *) fftwf_malloc(sizeof(float) * fft_len * n_templates); + float * weight_array = (float *) calloc(n_templates, sizeof(float)); fftwf_complex * outa = (fftwf_complex *) fftwf_malloc(sizeof(fftwf_complex) * N2 * n_templates); fftwf_complex * outb = (fftwf_complex *) fftwf_malloc(sizeof(fftwf_complex) * N2); fftwf_complex * out = (fftwf_complex *) fftwf_malloc(sizeof(fftwf_complex) * N2 * n_templates); + + // Initialize weights to one + // memset(weight_array, 1, (size_t) n_templates * sizeof(float)); + for (t = 0; t < n_templates; ++t){ + weight_array[t] = 1; + } + // Initialize threads #ifdef N_THREADS fftwf_init_threads(); @@ -115,7 +126,7 @@ int normxcorr_fftw_threaded(float *templates, long template_len, long n_template if (var >= ACCEPTED_DIFF) { for (t = 0; t < n_templates; ++t){ float c = ((ccc[(t * fft_len) + startind] / (fft_len * n_templates)) - norm_sums[t] * mean) / stdev; - status += set_ncc(t, 0, 0, 1, template_len, image_len, (float) c, used_chans, pad_array, ncc, 0); + status += set_ncc(t, 0, 0, 1, template_len, image_len, (float) c, used_chans, pad_array, weight_array, ncc, 0); } if (var <= WARN_DIFF){ variance_warning[0] = 1; @@ -140,7 +151,7 @@ int normxcorr_fftw_threaded(float *templates, long template_len, long n_template if (var >= ACCEPTED_DIFF && flatline_count < template_len - 1 && stdev * mean >= ACCEPTED_DIFF) { for (t = 0; t < n_templates; ++t){ float c = ((ccc[(t * fft_len) + i + startind] / (fft_len * n_templates)) - norm_sums[t] * mean ) / stdev; - status += set_ncc(t, i, 0, 1, template_len, image_len, (float) c, used_chans, pad_array, ncc, 0); + status += set_ncc(t, i, 0, 1, template_len, image_len, (float) c, used_chans, pad_array, weight_array, ncc, 0); } if (var <= WARN_DIFF){ variance_warning[0] += 1; @@ -189,13 +200,16 @@ int normxcorr_fftw(float *templates, long template_len, long n_templates, This is a wrapper around `normxcorr_fftw_main`, allocating required memory and plans for that function. We have taken this outside the main function because creating plans is not thread-safe and we want to call the main function from within an OpenMP loop. + + Weights are not applied. */ - int status = 0; + int status = 0, t; long N2 = fft_len / 2 + 1; // All memory allocated with `fftw_malloc` to ensure 16-byte aligned float * template_ext = (float*) fftwf_malloc(fft_len * n_templates * sizeof(float)); float * image_ext = (float*) fftwf_malloc(fft_len * sizeof(float)); float * ccc = (float*) fftwf_malloc(fft_len * n_templates * sizeof(float)); + float * weight_array = (float*) calloc(n_templates, sizeof(float)); fftwf_complex * outa = (fftwf_complex*) fftwf_malloc(N2 * n_templates * sizeof(fftwf_complex)); fftwf_complex * outb = (fftwf_complex*) fftwf_malloc(N2 * sizeof(fftwf_complex)); fftwf_complex * out = (fftwf_complex*) fftwf_malloc(N2 * n_templates * sizeof(fftwf_complex)); @@ -207,13 +221,18 @@ int normxcorr_fftw(float *templates, long template_len, long n_templates, // Initialise to zero memset(template_ext, 0, (size_t) fft_len * n_templates * sizeof(float)); memset(image_ext, 0, (size_t) fft_len * sizeof(float)); + // Initialize weights to one + for (t = 0; t < n_templates; ++t){ + weight_array[t] = 1; + } + // memset(weight_array, 1, (size_t) n_templates * sizeof(float)); // Call the function to do the work // Note: forcing inner threads to 1 for now (could be passed from Python) status = normxcorr_fftw_main( templates, template_len, n_templates, image, image_len, 0, 1, ncc, fft_len, template_ext, image_ext, ccc, outa, outb, out, pa, pb, px, - used_chans, pad_array, 1, variance_warning, missed_corr, 0); + used_chans, pad_array, weight_array, 1, variance_warning, missed_corr, 0); // free memory and plans fftwf_destroy_plan(pa); @@ -240,8 +259,8 @@ int normxcorr_fftw_main( long image_len, int chan, int n_chans, float *ncc, long fft_len, float *template_ext, float *image_ext, float *ccc, fftwf_complex *outa, fftwf_complex *outb, fftwf_complex *out, fftwf_plan pa, fftwf_plan pb, - fftwf_plan px, int *used_chans, int *pad_array, int num_threads, - int *variance_warning, int *missed_corr, int stack_option) { + fftwf_plan px, int *used_chans, int *pad_array, float *weight_array, + int num_threads, int *variance_warning, int *missed_corr, int stack_option) { /* Purpose: compute frequency domain normalised cross-correlation of real data using fftw for a single-channel @@ -273,6 +292,7 @@ int normxcorr_fftw_main( used_chans: Array to fill with number of channels used per template - must be n_templates long pad_array: Array of pads, should be n_templates long + weight_array: Array of weights, should be n_templates long num_threads: Number of threads to parallel internal calculations over variance_warning: Pointer to array to store warnings for variance issues missed_corr: Pointer to array to store warnings for unused correlations @@ -372,7 +392,7 @@ int normxcorr_fftw_main( double c = ((ccc[(t * fft_len) + startind] / (fft_len * n_templates)) - norm_sums[t] * mean[offset]); c /= stdev; status += set_ncc(t, offset, chan, n_chans, template_len, image_len, - (float) c, used_chans, pad_array, ncc, stack_option); + (float) c, used_chans, weight, array, pad_array, ncc, stack_option); } if (var[offset] <= WARN_DIFF){ variance_warning[0] = 1; @@ -392,7 +412,7 @@ int normxcorr_fftw_main( double c = ((ccc[(t * fft_len) + i + startind] / (fft_len * n_templates)) - norm_sums[t] * mean[offset + i]); c /= stdev; status += set_ncc(t, i + offset, chan, n_chans, template_len, - image_len, (float) c, used_chans, + image_len, (float) c, used_chans, weight_array, pad_array, ncc, stack_option); } } @@ -441,7 +461,6 @@ int running_mean_var( } var[0] = sum; - // pre-compute the mean and var so we can parallelise the calculation for(i = 1; i < (image_len - template_len + 1); ++i){ // Need to cast to double otherwise we end up with annoying floating @@ -458,11 +477,13 @@ int running_mean_var( } } return 0; + } static inline int set_ncc( long t, long i, int chan, int n_chans, long template_len, long image_len, - float value, int *used_chans, int *pad_array, float *ncc, int stack_option){ + float value, int *used_chans, int *pad_array, float *weight_array, + float *ncc, int stack_option){ int status = 0; @@ -487,6 +508,8 @@ static inline int set_ncc( else if (value < -1.0) { value = -1.0; } + // Apply weight after checks to allow for weights > 1.0 + value *= weight_array[t]; if (stack_option == 1){ #pragma omp atomic ncc[ncc_index] += value; @@ -540,7 +563,7 @@ void free_fftw_arrays(int size, double **template_ext, double **image_ext, doubl int multi_normxcorr_fftw(float *templates, long n_templates, long template_len, long n_channels, float *image, long image_len, float *ncc, long fft_len, int *used_chans, - int *pad_array, int num_threads_inner, int num_threads_outer, + int *pad_array, float *weight_array, int num_threads_inner, int num_threads_outer, int *variance_warning, int *missed_corr, int stack_option) { int i, chan, n_chans; @@ -728,7 +751,9 @@ int multi_normxcorr_fftw(float *templates, long n_templates, long template_len, n_templates, &image[(size_t) image_len * i], image_len, chan, n_chans, ncc, fft_len, template_ext[tid], image_ext[tid], ccc[tid], outa[tid], outb[tid], out[tid], pa, pb, px, &used_chans[(size_t) i * n_templates], - &pad_array[(size_t) i * n_templates], num_threads_inner, &variance_warning[i], + &pad_array[(size_t) i * n_templates], + &weight_array[(size_t) i * n_templates], + num_threads_inner, &variance_warning[i], &missed_corr[i], stack_option); if (results[i] != 0){ printf("WARNING: %i out-of-range correlations on channel %i\n", results[i], i);