1
1
import numpy as np
2
- from numpy import hamming
2
+ from collections import deque
3
3
from PyQt5 .QtWidgets import QApplication , QVBoxLayout , QHBoxLayout , QMainWindow , QWidget
4
4
from PyQt5 .QtCore import Qt
5
5
from pyqtgraph import PlotWidget
6
6
import pyqtgraph as pg
7
7
import pylsl
8
8
import sys
9
- from scipy .signal import butter , filtfilt , iirnotch
9
+ from scipy .signal import butter , iirnotch , lfilter , lfilter_zi
10
10
from scipy .fft import fft
11
+ import math
11
12
12
13
class EEGMonitor (QMainWindow ):
13
14
def __init__ (self ):
@@ -25,8 +26,8 @@ def __init__(self):
25
26
self .eeg_plot_widget .setBackground ('w' )
26
27
self .eeg_plot_widget .showGrid (x = True , y = True )
27
28
self .eeg_plot_widget .setLabel ('bottom' , 'EEG Plot' )
28
- self .eeg_plot_widget .setYRange (0 , 15000 , padding = 0 )
29
- self .eeg_plot_widget .setXRange (0 , 10 , padding = 0 )
29
+ self .eeg_plot_widget .setYRange (- 5000 , 5000 , padding = 0 )
30
+ self .eeg_plot_widget .setXRange (0 , 2 , padding = 0 )
30
31
self .eeg_plot_widget .setMouseEnabled (x = False , y = True ) # Disable zoom
31
32
self .main_layout .addWidget (self .eeg_plot_widget )
32
33
@@ -38,9 +39,10 @@ def __init__(self):
38
39
self .fft_plot .setBackground ('w' )
39
40
self .fft_plot .showGrid (x = True , y = True )
40
41
self .fft_plot .setLabel ('bottom' , 'FFT' )
41
- self .fft_plot .setYRange (0 , 25000 , padding = 0 )
42
+ # self.fft_plot.setYRange(0, 25000, padding=0)
42
43
self .fft_plot .setXRange (0 , 50 , padding = 0 ) # Set x-axis to 0 to 50 Hz
43
44
self .fft_plot .setMouseEnabled (x = False , y = False ) # Disable zoom
45
+ self .fft_plot .setAutoVisible (y = True ) # Allow y-axis to autoscale
44
46
self .bottom_layout .addWidget (self .fft_plot )
45
47
46
48
# Bar graph for brainwave power bands (right side of the second half)
@@ -71,20 +73,20 @@ def __init__(self):
71
73
self .sampling_rate = int (self .inlet .info ().nominal_srate ())
72
74
print (f"Sampling rate: { self .sampling_rate } Hz" )
73
75
74
- # Data and buffers
75
- self .buffer_size = self .sampling_rate * 10 # Fixed-size buffer for 10 seconds
76
- self .eeg_data = np .zeros (self .buffer_size ) # Fixed-size array for circular buffer
77
- self .time_data = np .linspace (0 , 10 , self .buffer_size ) # Fixed time array for plotting
78
- self .current_index = 0 # Index for overwriting data
76
+ # Data and Buffers
77
+ self .one_second_buffer = deque (maxlen = self .sampling_rate ) # 1-second buffer
78
+ self .buffer_size = self .sampling_rate * 10
79
+ self .moving_window_size = self .sampling_rate * 2 # 2-second window
79
80
80
- # Moving window for brainwave power
81
- self .moving_window_size = self . sampling_rate * 3 # 3-second window
82
- self .moving_window_buffer = np . zeros ( self . moving_window_size )
81
+ self . eeg_data = np . zeros ( self . buffer_size )
82
+ self .time_data = np . linspace ( 0 , 10 , self . buffer_size )
83
+ self .current_index = 0
83
84
84
- # Filters
85
85
self .b_notch , self .a_notch = iirnotch (50 , 30 , self .sampling_rate )
86
- self .b_highpass , self .a_highpass = butter (4 , 1.5 / (0.5 * self .sampling_rate ), btype = 'high' )
87
- self .b_lowpass , self .a_lowpass = butter (4 , 45 / (0.5 * self .sampling_rate ), btype = 'low' )
86
+ self .b_band , self .a_band = butter (4 , [0.5 / (self .sampling_rate / 2 ), 48.0 / (self .sampling_rate / 2 )], btype = 'band' )
87
+
88
+ self .zi_notch = lfilter_zi (self .b_notch , self .a_notch ) * 0
89
+ self .zi_band = lfilter_zi (self .b_band , self .a_band ) * 0
88
90
89
91
# Timer for updating the plot
90
92
self .timer = pg .QtCore .QTimer ()
@@ -95,68 +97,59 @@ def __init__(self):
95
97
self .fft_curve = self .fft_plot .plot (pen = pg .mkPen ('r' , width = 1 )) # FFT Colour is red
96
98
97
99
def update_plot (self ):
98
- samples , _ = self .inlet .pull_chunk (timeout = 0.0 , max_samples = 30 )
100
+ samples , _ = self .inlet .pull_chunk (timeout = 0.0 )
99
101
if samples :
100
102
for sample in samples :
101
- # Overwrite the oldest data point in the buffer
102
- self .eeg_data [self .current_index ] = sample [0 ]
103
- self .current_index = (self .current_index + 1 ) % self .buffer_size # Circular increment
104
-
105
- if self .current_index >= self .buffer_size :
106
- plot_data = self .eeg_data
107
- else :
108
- plot_data = np .concatenate ((self .eeg_data [self .current_index :], self .eeg_data [:self .current_index ]))
109
-
110
- # Apply filters to the full data for EEG plot
111
- filtered_eeg = filtfilt (self .b_notch , self .a_notch , plot_data )
112
- filtered_eeg = filtfilt (self .b_highpass , self .a_highpass , filtered_eeg )
113
- filtered_eeg = filtfilt (self .b_lowpass , self .a_lowpass , filtered_eeg )
114
-
115
- # Update the EEG plot with the filtered data
116
- self .eeg_curve .setData (self .time_data , filtered_eeg )
117
-
118
- # Perform FFT on the latest 1-second slice
119
- latest_data = filtered_eeg [- self .sampling_rate :]
120
- window = hamming (len (latest_data ))
121
- filtered_eeg_windowed = latest_data * window
122
-
123
- # Apply zero-padding
124
- zero_padded_length = 512
125
- filtered_eeg_windowed_padded = np .pad (filtered_eeg_windowed , (0 , zero_padded_length - len (filtered_eeg_windowed )), 'constant' )
126
-
127
- eeg_fft = np .abs (fft (filtered_eeg_windowed_padded ))[:len (filtered_eeg_windowed_padded ) // 2 ]
128
- freqs = np .fft .fftfreq (len (filtered_eeg_windowed_padded ), 1 / self .sampling_rate )[:len (filtered_eeg_windowed_padded ) // 2 ]
129
-
130
- # Update FFT plot
131
- self .fft_curve .setData (freqs , eeg_fft )
132
-
133
- # Update the 3-second moving window buffer
134
- for sample in latest_data :
135
- self .moving_window_buffer = np .roll (self .moving_window_buffer , - 1 )
136
- self .moving_window_buffer [- 1 ] = sample
137
-
138
- # Apply filters to the moving window buffer
139
- filtered_window = filtfilt (self .b_notch , self .a_notch , self .moving_window_buffer )
140
- filtered_window = filtfilt (self .b_highpass , self .a_highpass , filtered_window )
141
- filtered_window = filtfilt (self .b_lowpass , self .a_lowpass , filtered_window )
142
-
143
- # Perform FFT on the moving window buffer
144
- windowed_data = filtered_window * hamming (len (filtered_window ))
145
- fft_data = np .abs (fft (windowed_data ))[:len (windowed_data ) // 2 ]
146
- window_freqs = np .fft .fftfreq (len (windowed_data ), 1 / self .sampling_rate )[:len (windowed_data ) // 2 ]
147
-
148
- brainwave_power = self .calculate_brainwave_power (fft_data , window_freqs )
149
- self .brainwave_bars .setOpts (height = brainwave_power )
103
+ raw_point = sample [0 ]
104
+
105
+ notch_filtered , self .zi_notch = lfilter (self .b_notch , self .a_notch , [raw_point ], zi = self .zi_notch )
106
+ band_filtered , self .zi_band = lfilter (self .b_band , self .a_band , notch_filtered , zi = self .zi_band )
107
+ band_filtered = band_filtered [- 1 ] # Get the current filtered point
108
+
109
+ # Update the EEG plot
110
+ self .eeg_data [self .current_index ] = band_filtered
111
+ self .current_index = (self .current_index + 1 ) % self .buffer_size
112
+
113
+ if self .current_index == 0 :
114
+ plot_data = self .eeg_data
115
+ else :
116
+ plot_data = np .concatenate ((self .eeg_data [self .current_index :], self .eeg_data [:self .current_index ]))
117
+
118
+ recent_data = plot_data [- self .moving_window_size :]
119
+ recent_time = np .linspace (0 , len (recent_data ) / self .sampling_rate , len (recent_data ))
120
+ self .eeg_curve .setData (recent_time , recent_data )
121
+
122
+ self .one_second_buffer .append (band_filtered ) # Add the filtered point to the 1-second buffer
123
+ if len (self .one_second_buffer ) == self .sampling_rate : # Process FFT and brainwave power
124
+ self .process_fft_and_brainpower ()
125
+ self .one_second_buffer .clear ()
126
+
127
+ def process_fft_and_brainpower (self ):
128
+ window = np .hanning (len (self .one_second_buffer )) # Apply Hanning window to the buffer
129
+ buffer_windowed = np .array (self .one_second_buffer ) * window
130
+
131
+ # Perform FFT
132
+ fft_result = np .abs (fft (buffer_windowed ))[:len (buffer_windowed ) // 2 ]
133
+ fft_result /= len (buffer_windowed )
134
+ freqs = np .fft .fftfreq (len (buffer_windowed ), 1 / self .sampling_rate )[:len (buffer_windowed ) // 2 ]
135
+ self .fft_curve .setData (freqs , fft_result )
136
+
137
+ brainwave_power = self .calculate_brainwave_power (fft_result , freqs )
138
+ self .brainwave_bars .setOpts (height = brainwave_power )
150
139
151
140
def calculate_brainwave_power (self , fft_data , freqs ):
152
- delta_power = np .sum (fft_data [(freqs >= 0.5 ) & (freqs <= 4 )])
153
- theta_power = np .sum (fft_data [(freqs >= 4 ) & (freqs <= 8 )])
154
- alpha_power = np .sum (fft_data [(freqs >= 8 ) & (freqs <= 13 )])
155
- beta_power = np .sum (fft_data [(freqs >= 13 ) & (freqs <= 30 )])
156
- gamma_power = np .sum (fft_data [(freqs >= 30 ) & (freqs <= 45 )])
157
-
141
+ delta_power = math .sqrt (np .sum (((fft_data [(freqs >= 0.5 ) & (freqs <= 4 )])** 2 )/ 4 ))
142
+ theta_power = math .sqrt (np .sum (((fft_data [(freqs >= 4 ) & (freqs <= 8 )])** 2 )/ 5 ))
143
+ alpha_power = math .sqrt (np .sum (((fft_data [(freqs >= 8 ) & (freqs <= 13 )])** 2 )/ 6 ))
144
+ beta_power = math .sqrt (np .sum (((fft_data [(freqs >= 13 ) & (freqs <= 30 )])** 2 )/ 18 ))
145
+ gamma_power = math .sqrt (np .sum (((fft_data [(freqs >= 30 ) & (freqs <= 45 )])** 2 )/ 16 ))
146
+ print ("Delta Power" , delta_power )
147
+ print ("Theta Power" , theta_power )
148
+ print ("Alpha Power" , alpha_power )
149
+ print ("Beta Power" , beta_power )
150
+ print ("Gamma Power" , gamma_power )
158
151
return [delta_power , theta_power , alpha_power , beta_power , gamma_power ]
159
-
152
+
160
153
if __name__ == "__main__" :
161
154
app = QApplication (sys .argv )
162
155
window = EEGMonitor ()
0 commit comments