Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
277c281
Add a first version of a tutorial to cover power spectrum calculation…
mdenker Feb 2, 2024
1942384
Added further version for spectrum calculation
mdenker Feb 2, 2024
860f6ec
Fix notebook error
mdenker Feb 2, 2024
46c1e43
Fix notebook error
mdenker Feb 2, 2024
fe7f2cd
Added Bos-like spectrum using Elephant Welch
mdenker Feb 2, 2024
db066a1
Added HEP-like spectrum
mdenker Feb 5, 2024
b7a9dad
Adjusted segments of Welch method to match segmented MT
mdenker Feb 5, 2024
93156ef
Added theoretical spectrum
mdenker Feb 8, 2024
987f882
Added peak comparison; cleanup
mdenker Feb 8, 2024
a34495c
Fix wrong smoothing for HEP-style
mdenker Feb 8, 2024
bac76ac
Some comments
mdenker Feb 14, 2024
0f6908f
Some bugs
mdenker Feb 22, 2024
f84f21b
Introduced a double peak
mdenker Mar 12, 2024
acbc693
Simplify spike train power spectra tutorial.
mdenker Mar 26, 2025
bcf54bb
Simplify and clean up spike train power spectra tutorial by reducing …
mdenker Jun 11, 2025
5566c89
Considerable work on tutorial and introduced BinnedSpikeTrain to welc…
mdenker Jun 16, 2025
718c3b5
Continued work on notebook up to pure FFT spectrum
mdenker Jun 16, 2025
5419cc1
Further notebook improvements
mdenker Jun 16, 2025
4e8e50d
Refactor `BinnedSpikeTrain` handling in spectral functions to use cor…
mdenker Jun 17, 2025
3f3c670
Merge branch 'master' into feature/spike-psd
mdenker Jun 17, 2025
a8bb859
Refactor `BinnedSpikeTrain` methods and enhance `spike train to analo…
mdenker Jun 17, 2025
6bcd893
Add segemnted multi-taper to imports
mdenker Sep 11, 2025
3dc49c3
Update spike power spectra tutorial with deprecation warning fixes an…
mdenker Feb 17, 2026
d7842cb
Updated unit checking
mdenker Feb 17, 2026
789c0b2
Settled on units to be used for AnalogSignal
mdenker Feb 18, 2026
705d0fb
Added unit tests for BinnedSpikeTrain inputs to PSD functions.
mdenker Feb 18, 2026
91c642f
Merge branch 'master' into feature/spike-psd
mdenker Feb 18, 2026
faffd2d
Removed tutorial on spike train psd for this PR to simplify.
mdenker Feb 18, 2026
3e3eae7
Separated unit tests to be more specific for one problem.
mdenker Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 123 additions & 3 deletions elephant/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,18 +999,52 @@ def to_bool_array(self):
"""
return self.to_array(dtype=bool)

def to_array(self, dtype=None):
def to_bool_analogsignal(self):
"""
Converts the binned spike train to a neo.AnalogSignal object with binary values.

Returns
-------
neo.AnalogSignal
A boolean analog signal where each time bin contains either 0 (no spike)
or 1 (one or more spikes).

See Also
--------
to_bool_array : Returns the binary representation as a NumPy array.
"""
import neo
bool_arr = self.to_bool_array()
# Create an AnalogSignal with the same time base as the binned spike train
signal = neo.AnalogSignal(
bool_arr.astype(float).T, # Transpose to match AnalogSignal convention
units=pq.dimensionless, # Binary signal has no physical units
sampling_period=self.bin_size,
t_start=self.t_start
)
return signal

def to_array(self, dtype=None, scaling="counts"):
"""
Returns a dense matrix, calculated from the sparse matrix, with counted
time points of spikes. The rows correspond to spike trains and the
columns correspond to bins in a `BinnedSpikeTrain`.
Entries contain the count of spikes that occurred in the given bin of
the given spike train.

Parameters
----------
dtype : type, optional
The desired data-type for the array. Default is None.
scaling : {"counts", "normalized"}, optional
Determines the scaling of the returned array:
* "counts" : raw spike counts (default)
* "normalized" : spike counts divided by the bin size in seconds

Returns
-------
matrix : np.ndarray
Matrix with spike counts. Columns represent the index positions of
Matrix with spike counts or rates. Columns represent the index positions of
the binned spikes and rows represent the spike trains.

Examples
Expand All @@ -1022,20 +1056,106 @@ def to_array(self, dtype=None):
... t_stop=10.0 * pq.s)
>>> x = conv.BinnedSpikeTrain(a, n_bins=10, bin_size=1 * pq.s,
... t_start=0 * pq.s)
>>> print(x.to_array())
>>> print(x.to_array()) # counts
[[2 1 0 1 1 1 1 0 0 0]]
>>> print(x.to_array(scaling="normalized")) # counts/second
[[2. 1. 0. 1. 1. 1. 1. 0. 0. 0.]]

See also
--------
scipy.sparse.csr_matrix
scipy.sparse.csr_matrix.toarray

Raises
------
ValueError
If the scaling parameter is not one of {"counts", "normalized"}.
"""
if scaling not in ("counts", "normalized"):
raise ValueError(
f"Invalid scaling parameter: {scaling}. "
"Available options: 'counts', 'normalized'"
)

array = self.sparse_matrix.toarray()
if dtype is not None:
array = array.astype(dtype)

if scaling == "normalized":
# Convert bin_size to seconds for normalization
bin_size_seconds = self.bin_size.rescale('s').magnitude
array = array / bin_size_seconds

return array

def to_analog_signal(self, dtype=None, scaling="counts"):
"""
Returns the binned spike train as a neo.AnalogSignal. The signal values
represent either spike counts or normalized spike rates depending on the
scaling parameter.

Parameters
----------
dtype : type, optional
The desired data-type for the signal values. Default is None.
scaling : {"counts", "normalized"}, optional
Determines the scaling of the returned signal:
* "counts" : raw spike counts (default)
* "normalized" : spike counts divided by the bin size in seconds,
resulting in firing rates in Hz

Returns
-------
neo.AnalogSignal
Signal containing spike counts or rates. Rows represent time bins
and columns represent different spike trains.
If scaling="counts", the signal has units of 1/b in Hz, where b is
the bin size.
If scaling="normalized", the signal has units of Hz (spikes/second).

Examples
--------
>>> import elephant.conversion as conv
>>> import neo as n
>>> import quantities as pq
>>> a = n.SpikeTrain([0.5, 0.7, 1.2, 3.1, 4.3, 5.5, 6.7] * pq.s,
... t_stop=10.0 * pq.s)
>>> x = conv.BinnedSpikeTrain(a, n_bins=10, bin_size=1 * pq.s,
... t_start=0 * pq.s)
>>> signal = x.to_analog_signal() # counts (1/bin_size) Hz
>>> signal = x.to_analog_signal(scaling="normalized") # rates (Hz)

See also
--------
to_array : Returns the binned spike train as a plain NumPy array
neo.AnalogSignal : The neo analog signal object

Raises
------
ValueError
If the scaling parameter is not one of {"counts", "normalized"}.
"""
# Get the array representation with the desired scaling
array = self.to_array(dtype=dtype, scaling=scaling)

# Determine the appropriate units based on scaling
if scaling == "normalized":
units = pq.Hz
else: # counts
units = pq.CompoundUnit(f"1.0 / {self.bin_size.rescale('s').magnitude} * Hz")

# Create the AnalogSignal
# Note: AnalogSignal expects shape (time_steps, channels),
# so we transpose the array which has shape (channels, time_steps)
signal = neo.AnalogSignal(
array.T,
units=units,
sampling_period=self.bin_size,
t_start=self.t_start
)

return signal

def binarize(self, copy=True):
"""
Clip the internal array (no. of spikes in a bin) to `0` (no spikes) or
Expand Down
26 changes: 23 additions & 3 deletions elephant/spectral.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@

import warnings

import neo
import numpy as np
import quantities as pq
import scipy.signal
import neo
import elephant.conversion

__all__ = [
"welch_psd",
"welch_coherence",
"multitaper_psd",
"segmented_multitaper_psd",
"multitaper_cross_spectrum",
"segmented_multitaper_cross_spectrum",
"multitaper_coherence"
Expand Down Expand Up @@ -62,7 +64,7 @@ def welch_psd(signal, n_segments=8, len_segment=None,

Parameters
----------
signal : neo.AnalogSignal or pq.Quantity or np.ndarray
signal : neo.AnalogSignal or pq.Quantity or np.ndarray or elephant.conversion.BinnedSpikeTrain
Time series data, of which PSD is estimated. When `signal` is
`pq.Quantity` or `np.ndarray`, sampling frequency should be given
through the keyword argument `fs`. Otherwise, the default value is
Expand Down Expand Up @@ -201,6 +203,10 @@ def welch_psd(signal, n_segments=8, len_segment=None,


"""
if isinstance(signal,elephant.conversion.BinnedSpikeTrain):
# Convert spike train to an analog signal with units of Hz
signal = signal.to_analog_signal(scaling="normalized")

# 'hanning' window was removed with release of scipy 1.9.0, it was
# deprecated since 1.1.0.
if window == 'hanning':
Expand Down Expand Up @@ -299,7 +305,7 @@ def multitaper_psd(signal, fs=1, nw=4, num_tapers=None, peak_resolution=None,

Parameters
----------
signal : neo.AnalogSignal or pq.Quantity or np.ndarray
signal : neo.AnalogSignal or pq.Quantity or np.ndarray or elephant.conversion.BinnedSpikeTrain
Time series data of which PSD is estimated. When `signal` is np.ndarray
sampling frequency should be given through keyword argument `fs`.
Signal should be passed as (n_channels, n_samples)
Expand Down Expand Up @@ -351,6 +357,13 @@ def multitaper_psd(signal, fs=1, nw=4, num_tapers=None, peak_resolution=None,

# When the input is AnalogSignal, the data is added after rolling the axis
# for time index to the last
if isinstance(signal,elephant.conversion.BinnedSpikeTrain):
# signal = neo.AnalogSignal(
# signal.to_array().transpose()/signal.bin_size.rescale(pq.s).magnitude*pq.dimensionless,
# t_start=signal.t_start,
# sampling_period=signal.bin_size)
signal = signal.to_analog_signal(scaling="normalized")

data = np.asarray(signal)
if isinstance(signal, neo.AnalogSignal):
data = np.moveaxis(data, 0, 1)
Expand Down Expand Up @@ -521,6 +534,13 @@ def segmented_multitaper_psd(signal, n_segments=1, len_segment=None,
TypeError
If `peak_resolution` is None and `num_tapers` is not an int.
"""
if isinstance(signal,elephant.conversion.BinnedSpikeTrain):
# signal = neo.AnalogSignal(
# signal.to_array().transpose()/signal.bin_size.rescale(pq.s).magnitude*pq.dimensionless,
# t_start=signal.t_start,
# sampling_period=signal.bin_size)
signal = signal.to_analog_signal(scaling="normalized")


# When the input is AnalogSignal, the data is added after rolling the axis
# for time index to the last
Expand Down
99 changes: 99 additions & 0 deletions elephant/test/test_spectral.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,40 @@ def test_welch_psd_input_types(self):
(freqs_neo == freqs_np).all() and (
psd_neo == psd_np).all())

def test_welch_psd_binned_spiketrain(self):
rate = 50 * pq.Hz
t_stop = 10 * pq.s
spiketrain = elephant.spike_train_generation.StationaryPoissonProcess(
rate, t_stop=t_stop).generate_spiketrain()
bin_size = 2 * pq.ms
binned_st = elephant.conversion.BinnedSpikeTrain(
spiketrain, bin_size=bin_size)

freqs, psd = elephant.spectral.welch_psd(binned_st)

# Check units: should be Hz (from Hz^2/Hz)
self.assertEqual(freqs.units, pq.Hz)
self.assertEqual(psd.units, pq.Hz)

# Check scaling: for large frequencies, PSD should approach 2*rate
# (factor 2 because it is a one-sided PSD)
avg_psd = np.mean(psd[0, freqs > 100 * pq.Hz])
self.assertAlmostEqual(avg_psd.rescale('Hz').magnitude,
2*rate.rescale('Hz').magnitude,
delta=0.2 * 2 * rate.rescale('Hz').magnitude)

def test_welch_psd_binned_spiketrain_empty(self):
"""
Test Welch's PSD estimation for an empty binned spiketrain.
"""
# Check behavior for an empty spiketrain
bin_size = 2 * pq.ms
empty_st = neo.SpikeTrain([] * pq.s, t_start=0 * pq.s, t_stop=10 * pq.s)
binned_empty = elephant.conversion.BinnedSpikeTrain(
empty_st, bin_size=bin_size)
freqs_empty, psd_empty = elephant.spectral.welch_psd(binned_empty)
self.assertEqual(np.max(psd_empty), 0 * pq.Hz)

def test_welch_psd_multidim_input(self):
# generate multidimensional data
num_channel = 4
Expand Down Expand Up @@ -341,6 +375,37 @@ def test_multitaper_psd_input_types(self):
(freqs_neo == freqs_np).all() and (
psd_neo == psd_np).all())

def test_multitaper_psd_binned_spiketrain(self):
rate = 50 * pq.Hz
t_stop = 10 * pq.s
spiketrain = elephant.spike_train_generation.StationaryPoissonProcess(
rate, t_stop=t_stop).generate_spiketrain()
bin_size = 2 * pq.ms
binned_st = elephant.conversion.BinnedSpikeTrain(
spiketrain, bin_size=bin_size)

freqs, psd = elephant.spectral.multitaper_psd(binned_st)

self.assertEqual(freqs.units, pq.Hz)
self.assertEqual(psd.units, pq.Hz)

avg_psd = np.mean(psd[0, freqs > 100 * pq.Hz])
self.assertAlmostEqual(avg_psd.rescale('Hz').magnitude,
2 * rate.rescale('Hz').magnitude,
delta=0.2 * 2 * rate.rescale('Hz').magnitude)

def test_multitaper_psd_binned_spiketrain_empty(self):
"""
Test Multitaper PSD estimation for an empty binned spiketrain.
"""
# Check behavior for an empty spiketrain
bin_size = 2 * pq.ms
empty_st = neo.SpikeTrain([] * pq.s, t_start=0 * pq.s, t_stop=10 * pq.s)
binned_empty = elephant.conversion.BinnedSpikeTrain(
empty_st, bin_size=bin_size)
freqs_empty, psd_empty = elephant.spectral.multitaper_psd(binned_empty)
self.assertEqual(np.max(psd_empty), 0 * pq.Hz)


class SegmentedMultitaperPSDTestCase(unittest.TestCase):
# The following assertions test _segmented_apply_func in the context
Expand Down Expand Up @@ -506,6 +571,40 @@ def test_segmented_multitaper_psd_input_types(self):
(freqs_neo == freqs_np).all() and (
psd_neo == psd_np).all())

def test_segmented_multitaper_psd_binned_spiketrain(self):
rate = 50 * pq.Hz
t_stop = 10 * pq.s
spiketrain = elephant.spike_train_generation.StationaryPoissonProcess(
rate, t_stop=t_stop).generate_spiketrain()
bin_size = 2 * pq.ms
binned_st = elephant.conversion.BinnedSpikeTrain(
spiketrain, bin_size=bin_size)

freqs, psd = elephant.spectral.segmented_multitaper_psd(binned_st)

# Check units: should be Hz
self.assertEqual(freqs.units, pq.Hz)
self.assertEqual(psd.units, pq.Hz)

# Check scaling: for large frequencies, PSD should approach 2*rate
# Flattening because segmented_multitaper_psd might have extra dimension
avg_psd = np.mean(psd.flatten()[freqs > 100 * pq.Hz])
self.assertAlmostEqual(avg_psd.rescale('Hz').magnitude,
2 * rate.rescale('Hz').magnitude,
delta=0.2 * 2 * rate.rescale('Hz').magnitude)

def test_segmented_multitaper_psd_binned_spiketrain_empty(self):
"""
Test Segmented Multitaper PSD estimation for an empty binned spiketrain.
"""
# Check behavior for an empty spiketrain
bin_size = 2 * pq.ms
empty_st = neo.SpikeTrain([] * pq.s, t_start=0 * pq.s, t_stop=10 * pq.s)
binned_empty = elephant.conversion.BinnedSpikeTrain(
empty_st, bin_size=bin_size)
freqs_empty, psd_empty = elephant.spectral.multitaper_psd(binned_empty)
self.assertEqual(np.max(psd_empty), 0 * pq.Hz)


class MultitaperCrossSpectrumTestCase(unittest.TestCase):
def test_multitaper_cross_spectrum_errors(self):
Expand Down
Loading